Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException;
4
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator;
5
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
6
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
7
import eu.dnetlib.data.proto.KindProtos;
8
import eu.dnetlib.data.proto.OafProtos;
9
import eu.dnetlib.data.proto.ResultProtos;
10
import eu.dnetlib.data.proto.TypeProtos;
11
import org.apache.hadoop.io.Text;
12

    
13
import java.util.ArrayList;
14
import java.util.Arrays;
15
import java.util.List;
16

    
17
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*;
18
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getCountry;
19

    
20
public class ResultCountryIterator  extends ResultIterator {
21
    private String country;
22

    
23

    
24
    public ResultCountryIterator(final Iterable<Text> values, int key) throws NotValidResultSequenceException {
25
        super(values,key);
26
    }
27

    
28
    @Override
29
    protected void checkSequence() throws NotValidResultSequenceException {
30
        if (!it.hasNext()) {
31
            throw new NotValidResultSequenceException("Empty information for key");
32

    
33
        }
34
        final Value first = Value.fromJson(it.next().toString());
35
        trust = first.getTrust();
36
        if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) {
37
            if (key != TypeProtos.Type.datasource.getNumber()) {
38
                throw new NotValidResultSequenceException("First Element in reducer is not type of datasource,  but the organization exists");
39
            } else {
40
                while (it.hasNext()) {
41

    
42
                    resultId = Value.fromJson(it.next().toString()).getValue();
43
                    if (!resultId.startsWith("50|")) {
44
                        throw new NotValidResultSequenceException("ERROR ORDERING CHECK");
45
                    }
46
                }
47
                throw new NotValidResultSequenceException("WARNING: unexpected first element");
48
            }
49
        }
50

    
51
        //ensure we are dealing with an institutional repository
52
        if (first.getValue().equals(ONE)) {
53
            //context.getCounter(COUNTER_PROPAGATION, "institutional datasource").increment(1);
54
            if (!it.hasNext()) {
55
                throw new NotValidResultSequenceException("No information apart of type of datasource");
56
            }
57

    
58

    
59
            country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator
60
            if (country.trim().length() != 2) {
61
                try {
62
                    Integer.parseInt(country.trim());
63
                } catch (Exception e) {
64

    
65
                }
66
                throw new NotValidResultSequenceException("Second element in reducer is not country");
67
            }
68
            boolean iterate = true;
69
            resultId = TERMINATOR;
70
            while(it.hasNext() && iterate){
71
                resultId = Value.fromJson(it.next().toString()).getValue();
72
                if (!resultId.startsWith(("50|"))) {
73
                    if (!resultId.equalsIgnoreCase(country)) {
74
                        propagate = false;
75
                        iterate = false;
76

    
77
                    }
78
                }else{
79
                    propagate = true;
80
                    iterate = false;
81
                }
82

    
83
            }
84
            if (!resultId.equals(TERMINATOR)){
85
                try {
86
                    OafRowKeyDecoder.decode(resultId);
87
                }catch(IllegalArgumentException e){
88
                    throw new NotValidResultSequenceException("No result in sequence");
89
                }
90
            }
91
        }else
92
            throw new NotValidResultSequenceException("Not allowed dsType institutional datasource");
93
    }
94

    
95
    @Override
96
    public List<OafProtos.Oaf> next() {
97
        final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(
98
                getCountry(country,trust,DNET_COUNTRY_SCHEMA,CLASS_COUNTRY_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,CLASS_NAME_COUNTRY));
99
        final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
100
        final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
101
                .setType(TypeProtos.Type.result)
102
                .setId(resultId)
103
                .setResult(result);
104
        if(it.hasNext())
105
            resultId=Value.fromJson(it.next().toString()).getValue();
106
        else
107
            resultId = TERMINATOR;
108

    
109
        return new ArrayList<OafProtos.Oaf>(Arrays.asList(OafProtos.Oaf.newBuilder()
110
                .setKind(KindProtos.Kind.entity)
111
                .setEntity(entity)
112
                .build()));
113
    }
114

    
115

    
116

    
117
}
(7-7/7)