Project

General

Profile

« Previous | Next » 

Revision 53383

reducer for country propagation that writes on hdfs

View differences:

modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultFileReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories;
2

  
3
import com.google.gson.Gson;
4
import com.googlecode.protobuf.format.JsonFormat;
5
import com.sun.org.apache.commons.logging.Log;
6
import com.sun.org.apache.commons.logging.LogFactory;
7
import eu.dnetlib.data.mapreduce.hbase.propagation.Value;
8
import eu.dnetlib.data.proto.*;
9
import org.apache.commons.lang3.StringUtils;
10
import org.apache.hadoop.hbase.client.Put;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14
import org.apache.hadoop.mapreduce.Reducer;
15

  
16
import java.io.IOException;
17
import java.util.Iterator;
18

  
19
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION;
20
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.ONE;
21
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.ZERO;
22

  
23
public class PropagationCountryFromDsOrgResultFileReducer extends Reducer<InstOrgKey, ImmutableBytesWritable,Text,Text> {
24

  
25
    private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultFileReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
26

  
27
    private Text keyOut;
28

  
29
    private Text outValue;
30

  
31
    final static String DNETCOUNTRYSCHEMA = "dnet:countries";
32
    private final static String DATA_INFO_TYPE = "propagation";
33
    private final static String SCHEMA_NAME = "dnet:provenanceActions";
34
    private final static String CLASS_ID = "propagation::country::instrepos";
35
    private final static String SCHEMA_ID = "dnet:provenanceActions";
36

  
37

  
38
    @Override
39
    protected void setup(final Context context) throws IOException, InterruptedException {
40
        super.setup(context);
41
        keyOut = new Text("");
42
        outValue = new Text();
43

  
44
    }
45

  
46
    private void emit(final Context context, final String key, final String data) {
47
        keyOut.set(key);
48
        outValue.set(data.getBytes());
49
        try {
50
            context.write(keyOut, outValue);
51
        } catch (Exception e) {
52
            e.printStackTrace();
53
            throw new RuntimeException(e);
54
        }
55
    }
56

  
57
    @Override
58
    protected void reduce(InstOrgKey dsId, Iterable<ImmutableBytesWritable> values, Context context)  {
59

  
60
        final Iterator<ImmutableBytesWritable> it = values.iterator();
61
        if(!it.hasNext()){
62
            context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1);
63
            return;
64
        }
65
        Gson gson = new Gson();
66
        Value v = gson.fromJson(Bytes.toString(it.next().get()),Value.class);
67
        final String first  = Bytes.toString(v.getValue().get());
68
        final String trust = v.getTrust();
69
        if (!(first.equals(ZERO) || first.equals(ONE))) {
70
            if (dsId.getKeyType().get() == TypeProtos.Type.organization.getNumber())
71
                context.getCounter(COUNTER_PROPAGATION,"First Element in reducer is not type of datasource,  but the organization exists").increment(1);
72
            else {
73
               // context.getCounter(COUNTER_PROPAGATION,String.format("WARNINGS: First element value is  %s " , first));
74
                while (it.hasNext()) {
75

  
76
                    byte[] targetRowKey = gson.fromJson(Bytes.toString(it.next().get()), Value.class).getValue().get();
77
                    String resultId = Bytes.toString(targetRowKey);
78
                    if (!resultId.startsWith("50|"))
79
                        context.getCounter(COUNTER_PROPAGATION,"ERROR ORDERING CHECK").increment(1);
80
                }
81
            }
82
        }
83

  
84

  
85

  
86
//ensure we are dealing with an institutional repository
87
        if (first.equals(ONE)) {
88
            context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1);
89
            if(!it.hasNext()){
90
                context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1);
91
                return;
92
            }
93

  
94
            final String country = Bytes.toString(gson.fromJson(Bytes.toString(it.next().get()),Value.class).getValue().get()); // need to update the information for the country to each element in the iterator
95
            if(country.trim().length() != 2){
96
                context.getCounter(COUNTER_PROPAGATION,"second element in reducer is not country").increment(1);
97
                return;
98
            }
99

  
100
            boolean propagate = true;
101
            while(it.hasNext()) {
102

  
103
                byte[] targetRowKey = gson.fromJson(Bytes.toString(it.next().get()),Value.class).getValue().get();
104
                String resultId = Bytes.toString(targetRowKey);
105

  
106
                if (!resultId.startsWith(("50|"))) {
107
                    if(!resultId.equalsIgnoreCase(country)) {
108
                        context.getCounter(COUNTER_PROPAGATION, String.format("resultId expected in ordering was not found. First country %s, second country %s", country, resultId)).increment(1);
109
                        //throw new RuntimeException("Problem in country values of institutional repositories" + targetRowKey);
110
                        propagate = false;
111
                    }
112

  
113
                }else{
114
                    if (propagate){
115
                        OafProtos.Oaf oafUpdate = getOafCountry(resultId, country,trust);
116
                        emit(context, resultId, JsonFormat.printToString(oafUpdate));
117

  
118
                        context.getCounter(COUNTER_PROPAGATION, String.format(" added country %s to product from repo %s",country, StringUtils.substringBefore( resultId,"::"))).increment(1);
119
                    }
120

  
121
                }
122

  
123
            }
124
        } else {
125
            context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1);
126
        }
127

  
128

  
129

  
130
    }
131

  
132
    private FieldTypeProtos.DataInfo.Builder getDataInfo(String trust) {
133
        FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder()
134
                .setInferred(true)
135
                .setProvenanceaction(
136
                        FieldTypeProtos.Qualifier.newBuilder()
137
                                .setClassid(CLASS_ID)
138
                                .setClassname("Propagation of country information from datasources belonging to institutional repositories")
139
                                .setSchemeid(SCHEMA_ID)
140
                                .setSchemename(SCHEMA_NAME))
141
                .setInferenceprovenance(DATA_INFO_TYPE)
142
                .setTrust(trust);
143
        return builder;
144

  
145
    }
146

  
147
    private OafProtos.Oaf getOafCountry(String resultId, String countryValue, String trust) {
148

  
149
        final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder()
150
                .setClassid(countryValue)
151
                .setClassname(countryValue)
152
                .setSchemeid(DNETCOUNTRYSCHEMA)
153
                .setSchemename(DNETCOUNTRYSCHEMA);
154
        country.setDataInfo(getDataInfo(trust));
155

  
156
        final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country);
157
        final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata);
158
        final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder()
159
                .setType(TypeProtos.Type.result)
160
                .setId(resultId)
161
                .setResult(result);
162

  
163
        return OafProtos.Oaf.newBuilder()
164
                .setKind(KindProtos.Kind.entity)
165
                .setEntity(entity)
166
                .build();
167
    }
168

  
169

  
170
}

Also available in: Unified diff