Revision 54037
Added by Miriam Baglioni over 5 years ago
PropagationCountryFromDsOrgResultReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories; |
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 |
import java.util.Iterator; |
|
5 | 4 |
|
6 | 5 |
|
7 |
import com.google.gson.Gson; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
|
7 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
|
8 | 8 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
9 |
import eu.dnetlib.data.proto.*; |
|
10 |
import org.apache.commons.lang3.StringUtils; |
|
11 | 9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
12 | 10 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
13 | 11 |
import org.apache.hadoop.hbase.util.Bytes; |
... | ... | |
18 | 16 |
import org.apache.hadoop.io.Text; |
19 | 17 |
|
20 | 18 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
21 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.*; |
|
22 | 19 |
|
23 | 20 |
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<InstOrgKey, Text, ImmutableBytesWritable>{ |
24 | 21 |
|
25 | 22 |
private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class); |
26 | 23 |
|
27 |
final static String DNETSCHEMA = "dnet:countries"; |
|
28 | 24 |
|
29 |
|
|
30 |
private final static String CLASS_ID = "propagation::country::instrepos"; |
|
31 |
|
|
32 |
|
|
33 | 25 |
private ImmutableBytesWritable keyOut; |
34 | 26 |
|
35 | 27 |
@Override |
... | ... | |
41 | 33 |
@Override |
42 | 34 |
protected void reduce(final InstOrgKey key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { |
43 | 35 |
|
44 |
final Iterator<Text> it = values.iterator(); |
|
45 |
if(!it.hasNext()){ |
|
46 |
context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1); |
|
36 |
|
|
37 |
ResultIterator rh = null; |
|
38 |
try { |
|
39 |
rh = new ResultCountryIterator(values,key.getKeyType().get()); |
|
40 |
} catch (NotValidResultSequenceException e) { |
|
41 |
context.getCounter(COUNTER_PROPAGATION,e.getMessage()).increment(1); |
|
47 | 42 |
return; |
48 | 43 |
} |
49 |
final Value first = Value.fromJson(it.next().toString()); |
|
50 | 44 |
|
51 |
if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) { |
|
52 |
if (key.getKeyType().get() == TypeProtos.Type.organization.getNumber()) |
|
53 |
context.getCounter(COUNTER_PROPAGATION,"First Element in reducer is not type of datasource, but the organization exists").increment(1); |
|
54 |
else { |
|
55 |
context.getCounter(COUNTER_PROPAGATION, "WARNING: unexpected first element").increment(1); |
|
56 |
while (it.hasNext()) { |
|
45 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
|
57 | 46 |
|
58 |
String resultId = Value.fromJson(it.next().toString()).getValue(); |
|
59 |
if (!resultId.startsWith("50|")) { |
|
60 |
context.getCounter(COUNTER_PROPAGATION,"ERROR ORDERING CHECK").increment(1); |
|
61 |
} |
|
62 |
} |
|
63 |
} |
|
64 |
} |
|
47 |
while(rh.hasNext()){ |
|
48 |
byte[] oafUpdate = rh.next().toByteArray(); |
|
65 | 49 |
|
66 |
//ensure we are dealing with an institutional repository |
|
67 |
if (first.getValue().equals(ONE)) { |
|
68 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
|
69 |
if(!it.hasNext()) { |
|
70 |
context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1); |
|
71 |
return; |
|
72 |
} |
|
50 |
byte[] targetRowKey = Bytes.toBytes(rh.getResultId()); |
|
51 |
final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate); |
|
52 |
keyOut.set(targetRowKey); |
|
53 |
context.write(keyOut, put); |
|
54 |
context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1); |
|
73 | 55 |
|
56 |
} |
|
74 | 57 |
|
75 |
final String country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator |
|
76 |
if(country.trim().length() != 2) { |
|
77 |
try { |
|
78 |
Integer.parseInt(country.trim()); |
|
79 |
} catch(Exception e) { } |
|
80 |
context.getCounter(COUNTER_PROPAGATION,"second element in reducer is not country").increment(1); |
|
81 |
return; |
|
82 |
} |
|
58 |
if (!rh.getPropagate()){ |
|
59 |
context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found" ).increment(1); |
|
60 |
} |
|
83 | 61 |
|
84 |
boolean propagate = true; |
|
85 |
while(it.hasNext()) { |
|
86 | 62 |
|
87 |
final String resultId = Value.fromJson(it.next().toString()).getValue(); |
|
88 | 63 |
|
89 |
if (!resultId.startsWith(("50|"))) { |
|
90 |
if(!resultId.equalsIgnoreCase(country)) { |
|
91 |
context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found").increment(1); |
|
92 |
//throw new RuntimeException("Problem in country values of institutional repositories" + targetRowKey); |
|
93 |
propagate = false; |
|
94 |
} |
|
95 |
|
|
96 |
} else { |
|
97 |
if (propagate) { |
|
98 |
|
|
99 |
byte[] oafUpdate = getOafCountry(resultId, country, first.getTrust()); |
|
100 |
byte[] targetRowKey = Bytes.toBytes(resultId); |
|
101 |
final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate); |
|
102 |
keyOut.set(targetRowKey); |
|
103 |
context.write(keyOut, put); |
|
104 |
context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1); |
|
105 |
} |
|
106 |
} |
|
107 |
} |
|
108 |
} else { |
|
109 |
context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1); |
|
110 |
} |
|
111 |
|
|
112 | 64 |
} |
113 | 65 |
|
114 |
private byte[] getOafCountry(String resultId, String countryValue, String trust) { |
|
115 | 66 |
|
116 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry( |
|
117 |
getCountry(countryValue,trust,DNETSCHEMA,CLASS_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,"Propagation of country information from datasources belonging to institutional repositories")); |
|
118 |
final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata); |
|
119 |
final OafProtos.OafEntity.Builder entity = OafProtos.OafEntity.newBuilder() |
|
120 |
.setType(TypeProtos.Type.result) |
|
121 |
.setId(resultId) |
|
122 |
.setResult(result); |
|
123 |
|
|
124 |
return OafProtos.Oaf.newBuilder() |
|
125 |
.setKind(KindProtos.Kind.entity) |
|
126 |
.setEntity(entity) |
|
127 |
.build() |
|
128 |
.toByteArray(); |
|
129 |
} |
|
130 |
|
|
131 | 67 |
} |
Also available in: Unified diff
updated implementation to use iterator