Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.organizationtoresult.datasource;
2

    
3
import com.google.gson.Gson;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
6
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.organizationtoresult.Emit;
8
import eu.dnetlib.data.proto.*;
9
import org.apache.hadoop.io.Text;
10

    
11
import java.util.*;
12

    
13
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
14
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getDataInfo;
15
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getQualifier;
16

    
17
public class ResultOrganizationIterator extends ResultIterator {
18

    
19
    private Set<String> toBeAffiliated;
20
    private String organizationToPropagate ;
21

    
22
    private Iterator<String> roi;
23

    
24
    public ResultOrganizationIterator(final Iterable<Text> values, final int key) throws NotValidResultSequenceException {
25
        super(values,key);
26

    
27
        init();
28

    
29

    
30
    }
31

    
32
private void init(){
33
        roi = toBeAffiliated.iterator();
34
        getNext();
35
}
36

    
37
@Override
38
protected void checkSequence() throws NotValidResultSequenceException {
39
    if (!it.hasNext()) {
40
        throw new NotValidResultSequenceException("Empty information for key");
41

    
42
    }
43
    final Value first = Value.fromJson(it.next().toString());
44
    trust = first.getTrust();
45

    
46
    if ( key != TypeProtos.Type.datasource.getNumber()){
47
        throw new NotValidResultSequenceException("First Element in reducer has not datasource as type, means no association with datasource was found for the organization ");
48
    }
49

    
50
    //ensure we are dealing with an institutional repository
51
    if (first.getValue().equals(ONE)) {
52
        //context.getCounter(COUNTER_PROPAGATION, "institutional datasource").increment(1);
53
        if (!it.hasNext()) {
54
            throw new NotValidResultSequenceException("No information apart of type of datasource");
55
        }
56

    
57
        organizationToPropagate = Value.fromJson(it.next().toString()).getValue();
58
        if (!organizationToPropagate.startsWith("20|")) {
59

    
60
            throw new NotValidResultSequenceException("Second element in reducer is not the organization");
61
        }
62
        Emit e;
63

    
64
        toBeAffiliated = new HashSet<>();
65
        while (it.hasNext()) {
66
            final Value v = Value.fromJson(it.next().toString());
67
            if (!v.getType().equals(Type.fromresult)) {
68
                throw new NotValidResultSequenceException("result in reducer mixed up with other types ");
69
            }
70
            e = new Gson().fromJson(v.getValue(), Emit.class);
71
            if (!e.getResult_list().contains(organizationToPropagate))
72
                toBeAffiliated.add(e.getId());
73

    
74
        }
75

    
76

    
77

    
78
    }else
79
        throw new NotValidResultSequenceException("Not allowed dsType institutional datasource");
80
}
81

    
82

    
83

    
84

    
85
    private void getNext(){
86
        if (roi.hasNext())
87
            resultId = roi.next();
88
        else
89
            resultId = TERMINATOR;
90
    }
91

    
92

    
93
    @Override
94
    public List<OafProtos.Oaf> next() {
95
        ArrayList<OafProtos.Oaf> ret = new ArrayList<OafProtos.Oaf>();
96

    
97
             ret.add(getOafRel(resultId, organizationToPropagate, REL_RESULT_ORGANIZATION));
98
             ret.add(getOafRel(organizationToPropagate, resultId, REL_ORGANIZATIO_RESULT));
99

    
100

    
101
        getNext();
102

    
103
        return ret;
104
    }
105

    
106
    private OafProtos.Oaf getOafRel(String source, String target, String semantics){
107
        final ResultOrganizationProtos.ResultOrganization.Builder rob = ResultOrganizationProtos.ResultOrganization.newBuilder()
108
                .setAffiliation(ResultOrganizationProtos.ResultOrganization.Affiliation.newBuilder()
109
                        .setRelMetadata(RelMetadataProtos.RelMetadata.newBuilder()
110
                                .setSemantics(getQualifier(semantics,DNET_RELATION_SCHEMA_ORGANIZATION, semantics, DNET_RELATION_SCHEMA_ORGANIZATION)
111
                                )
112
                        )
113
                );
114

    
115

    
116
        final OafProtos.OafRel.Builder relation = OafProtos.OafRel.newBuilder()
117
                .setChild(false)
118
                .setSubRelType(SUBREL_TYPE_ORGANIZATION)
119
                .setRelType(REL_TYPE_ORGANIZATION)
120
                .setRelClass(semantics)
121
                .setTarget(target)
122
                .setSource(source)
123
                .setResultOrganization(rob);
124

    
125
        return OafProtos.Oaf.newBuilder().setKind(KindProtos.Kind.relation)
126

    
127
                .setRel(relation)
128
                .setDataInfo(getDataInfo(trust, CLASS_ORGANIZATION_RESULT_ID, SCHEMA_ID, SCHEMA_NAME, DATA_INFO_TYPE, CLASS_ORGANIZATION_RESULT_NAME))
129
                .build();
130
    }
131

    
132
}
(4-4/4)