Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
6
import eu.dnetlib.data.proto.KindProtos;
7
import eu.dnetlib.data.proto.OafProtos;
8
import eu.dnetlib.data.proto.ResultProtos;
9
import eu.dnetlib.data.proto.TypeProtos;
10
import org.apache.hadoop.io.Text;
11

    
12
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
13
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getCountry;
14

    
15
public class ResultCountryIterator  extends ResultIterator {
16
    private String country;
17
    private String trust;
18

    
19
    public ResultCountryIterator(final Iterable<Text> values, int key) throws NotValidResultSequenceException {
20
        super(values,key);
21
    }
22

    
23
    @Override
24
    public void checkSequence() throws NotValidResultSequenceException {
25
        if (!it.hasNext()) {
26
            throw new NotValidResultSequenceException("Empty information for key");
27

    
28
        }
29
        final Value first = Value.fromJson(it.next().toString());
30
        trust = first.getTrust();
31
        if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) {
32
            if (key == TypeProtos.Type.organization.getNumber()) {
33
                throw new NotValidResultSequenceException("First Element in reducer is not type of datasource,  but the organization exists");
34
            } else {
35
                while (it.hasNext()) {
36

    
37
                    String resultId = Value.fromJson(it.next().toString()).getValue();
38
                    if (!resultId.startsWith("50|")) {
39
                        throw new NotValidResultSequenceException("ERROR ORDERING CHECK");
40
                    }
41
                }
42
                throw new NotValidResultSequenceException("WARNING: unexpected first element");
43
            }
44
        }
45

    
46
        //ensure we are dealing with an institutional repository
47
        if (first.getValue().equals(ONE)) {
48
            //context.getCounter(COUNTER_PROPAGATION, "institutional datasource").increment(1);
49
            if (!it.hasNext()) {
50
                throw new NotValidResultSequenceException("No information apart of type of datasource");
51
            }
52

    
53

    
54
            country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator
55
            if (country.trim().length() != 2) {
56
                try {
57
                    Integer.parseInt(country.trim());
58
                } catch (Exception e) {
59

    
60
                }
61
                throw new NotValidResultSequenceException("Second element in reducer is not country");
62
            }
63
            boolean iterate = true;
64
            while(it.hasNext() && iterate){
65
                resultId = Value.fromJson(it.next().toString()).getValue();
66
                if (!resultId.startsWith(("50|"))) {
67
                    if (!resultId.equalsIgnoreCase(country)) {
68
                        propagate = false;
69
                        iterate = false;
70

    
71
                    }
72
                }else{
73
                    propagate = true;
74
                    iterate = false;
75
                }
76
            }
77
        }else
78
            throw new NotValidResultSequenceException("Not allowed dsType institutional datasource");
79
    }
80

    
81
    @Override
82
    public OafProtos.Oaf next() {
83
        final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(
84
                getCountry(country,trust,DNET_COUNTRY_SCHEMA,CLASS_COUNTRY_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,"Propagation of country information from datasources belonging to institutional repositories"));
85
        final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
86
        final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
87
                .setType(TypeProtos.Type.result)
88
                .setId(resultId)
89
                .setResult(result);
90
        if(it.hasNext())
91
            resultId=Value.fromJson(it.next().toString()).getValue();
92
        else
93
            resultId = TERMINATOR;
94

    
95
        return OafProtos.Oaf.newBuilder()
96
                .setKind(KindProtos.Kind.entity)
97
                .setEntity(entity)
98
                .build();
99
    }
100

    
101

    
102

    
103
}
(7-7/7)