Revision 53383
Added by Miriam Baglioni over 5 years ago
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultFileReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories; |
|
2 |
|
|
3 |
import com.google.gson.Gson; |
|
4 |
import com.googlecode.protobuf.format.JsonFormat; |
|
5 |
import com.sun.org.apache.commons.logging.Log; |
|
6 |
import com.sun.org.apache.commons.logging.LogFactory; |
|
7 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
|
8 |
import eu.dnetlib.data.proto.*; |
|
9 |
import org.apache.commons.lang3.StringUtils; |
|
10 |
import org.apache.hadoop.hbase.client.Put; |
|
11 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
12 |
import org.apache.hadoop.hbase.util.Bytes; |
|
13 |
import org.apache.hadoop.io.Text; |
|
14 |
import org.apache.hadoop.mapreduce.Reducer; |
|
15 |
|
|
16 |
import java.io.IOException; |
|
17 |
import java.util.Iterator; |
|
18 |
|
|
19 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION; |
|
20 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.ONE; |
|
21 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.ZERO; |
|
22 |
|
|
23 |
public class PropagationCountryFromDsOrgResultFileReducer extends Reducer<InstOrgKey, ImmutableBytesWritable,Text,Text> { |
|
24 |
|
|
25 |
private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultFileReducer.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
26 |
|
|
27 |
private Text keyOut; |
|
28 |
|
|
29 |
private Text outValue; |
|
30 |
|
|
31 |
final static String DNETCOUNTRYSCHEMA = "dnet:countries"; |
|
32 |
private final static String DATA_INFO_TYPE = "propagation"; |
|
33 |
private final static String SCHEMA_NAME = "dnet:provenanceActions"; |
|
34 |
private final static String CLASS_ID = "propagation::country::instrepos"; |
|
35 |
private final static String SCHEMA_ID = "dnet:provenanceActions"; |
|
36 |
|
|
37 |
|
|
38 |
@Override |
|
39 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
40 |
super.setup(context); |
|
41 |
keyOut = new Text(""); |
|
42 |
outValue = new Text(); |
|
43 |
|
|
44 |
} |
|
45 |
|
|
46 |
private void emit(final Context context, final String key, final String data) { |
|
47 |
keyOut.set(key); |
|
48 |
outValue.set(data.getBytes()); |
|
49 |
try { |
|
50 |
context.write(keyOut, outValue); |
|
51 |
} catch (Exception e) { |
|
52 |
e.printStackTrace(); |
|
53 |
throw new RuntimeException(e); |
|
54 |
} |
|
55 |
} |
|
56 |
|
|
57 |
@Override |
|
58 |
protected void reduce(InstOrgKey dsId, Iterable<ImmutableBytesWritable> values, Context context) { |
|
59 |
|
|
60 |
final Iterator<ImmutableBytesWritable> it = values.iterator(); |
|
61 |
if(!it.hasNext()){ |
|
62 |
context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1); |
|
63 |
return; |
|
64 |
} |
|
65 |
Gson gson = new Gson(); |
|
66 |
Value v = gson.fromJson(Bytes.toString(it.next().get()),Value.class); |
|
67 |
final String first = Bytes.toString(v.getValue().get()); |
|
68 |
final String trust = v.getTrust(); |
|
69 |
if (!(first.equals(ZERO) || first.equals(ONE))) { |
|
70 |
if (dsId.getKeyType().get() == TypeProtos.Type.organization.getNumber()) |
|
71 |
context.getCounter(COUNTER_PROPAGATION,"First Element in reducer is not type of datasource, but the organization exists").increment(1); |
|
72 |
else { |
|
73 |
// context.getCounter(COUNTER_PROPAGATION,String.format("WARNINGS: First element value is %s " , first)); |
|
74 |
while (it.hasNext()) { |
|
75 |
|
|
76 |
byte[] targetRowKey = gson.fromJson(Bytes.toString(it.next().get()), Value.class).getValue().get(); |
|
77 |
String resultId = Bytes.toString(targetRowKey); |
|
78 |
if (!resultId.startsWith("50|")) |
|
79 |
context.getCounter(COUNTER_PROPAGATION,"ERROR ORDERING CHECK").increment(1); |
|
80 |
} |
|
81 |
} |
|
82 |
} |
|
83 |
|
|
84 |
|
|
85 |
|
|
86 |
//ensure we are dealing with an institutional repository |
|
87 |
if (first.equals(ONE)) { |
|
88 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
|
89 |
if(!it.hasNext()){ |
|
90 |
context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1); |
|
91 |
return; |
|
92 |
} |
|
93 |
|
|
94 |
final String country = Bytes.toString(gson.fromJson(Bytes.toString(it.next().get()),Value.class).getValue().get()); // need to update the information for the country to each element in the iterator |
|
95 |
if(country.trim().length() != 2){ |
|
96 |
context.getCounter(COUNTER_PROPAGATION,"second element in reducer is not country").increment(1); |
|
97 |
return; |
|
98 |
} |
|
99 |
|
|
100 |
boolean propagate = true; |
|
101 |
while(it.hasNext()) { |
|
102 |
|
|
103 |
byte[] targetRowKey = gson.fromJson(Bytes.toString(it.next().get()),Value.class).getValue().get(); |
|
104 |
String resultId = Bytes.toString(targetRowKey); |
|
105 |
|
|
106 |
if (!resultId.startsWith(("50|"))) { |
|
107 |
if(!resultId.equalsIgnoreCase(country)) { |
|
108 |
context.getCounter(COUNTER_PROPAGATION, String.format("resultId expected in ordering was not found. First country %s, second country %s", country, resultId)).increment(1); |
|
109 |
//throw new RuntimeException("Problem in country values of institutional repositories" + targetRowKey); |
|
110 |
propagate = false; |
|
111 |
} |
|
112 |
|
|
113 |
}else{ |
|
114 |
if (propagate){ |
|
115 |
OafProtos.Oaf oafUpdate = getOafCountry(resultId, country,trust); |
|
116 |
emit(context, resultId, JsonFormat.printToString(oafUpdate)); |
|
117 |
|
|
118 |
context.getCounter(COUNTER_PROPAGATION, String.format(" added country %s to product from repo %s",country, StringUtils.substringBefore( resultId,"::"))).increment(1); |
|
119 |
} |
|
120 |
|
|
121 |
} |
|
122 |
|
|
123 |
} |
|
124 |
} else { |
|
125 |
context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1); |
|
126 |
} |
|
127 |
|
|
128 |
|
|
129 |
|
|
130 |
} |
|
131 |
|
|
132 |
private FieldTypeProtos.DataInfo.Builder getDataInfo(String trust) { |
|
133 |
FieldTypeProtos.DataInfo.Builder builder = FieldTypeProtos.DataInfo.newBuilder() |
|
134 |
.setInferred(true) |
|
135 |
.setProvenanceaction( |
|
136 |
FieldTypeProtos.Qualifier.newBuilder() |
|
137 |
.setClassid(CLASS_ID) |
|
138 |
.setClassname("Propagation of country information from datasources belonging to institutional repositories") |
|
139 |
.setSchemeid(SCHEMA_ID) |
|
140 |
.setSchemename(SCHEMA_NAME)) |
|
141 |
.setInferenceprovenance(DATA_INFO_TYPE) |
|
142 |
.setTrust(trust); |
|
143 |
return builder; |
|
144 |
|
|
145 |
} |
|
146 |
|
|
147 |
private OafProtos.Oaf getOafCountry(String resultId, String countryValue, String trust) { |
|
148 |
|
|
149 |
final FieldTypeProtos.Qualifier.Builder country = FieldTypeProtos.Qualifier.newBuilder() |
|
150 |
.setClassid(countryValue) |
|
151 |
.setClassname(countryValue) |
|
152 |
.setSchemeid(DNETCOUNTRYSCHEMA) |
|
153 |
.setSchemename(DNETCOUNTRYSCHEMA); |
|
154 |
country.setDataInfo(getDataInfo(trust)); |
|
155 |
|
|
156 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry(country); |
|
157 |
final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata); |
|
158 |
final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder() |
|
159 |
.setType(TypeProtos.Type.result) |
|
160 |
.setId(resultId) |
|
161 |
.setResult(result); |
|
162 |
|
|
163 |
return OafProtos.Oaf.newBuilder() |
|
164 |
.setKind(KindProtos.Kind.entity) |
|
165 |
.setEntity(entity) |
|
166 |
.build(); |
|
167 |
} |
|
168 |
|
|
169 |
|
|
170 |
} |
Also available in: Unified diff
reducer for country propagation that writes on hdfs