Revision 53386
Added by Claudio Atzori over 5 years ago
PropagationCountryFromDsOrgResultReducer.java | ||
---|---|---|
15 | 15 |
|
16 | 16 |
import org.apache.commons.logging.Log; |
17 | 17 |
import org.apache.commons.logging.LogFactory; |
18 |
import org.apache.hadoop.io.Text; |
|
18 | 19 |
|
19 | 20 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
20 | 21 |
|
21 |
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<InstOrgKey, ImmutableBytesWritable, ImmutableBytesWritable>{
|
|
22 |
public class PropagationCountryFromDsOrgResultReducer extends TableReducer<InstOrgKey, Text, ImmutableBytesWritable>{
|
|
22 | 23 |
|
23 | 24 |
private static final Log log = LogFactory.getLog(PropagationCountryFromDsOrgResultReducer.class); |
24 | 25 |
|
... | ... | |
27 | 28 |
private final static String SCHEMA_NAME = "dnet:provenanceActions"; |
28 | 29 |
private final static String CLASS_ID = "propagation::country::instrepos"; |
29 | 30 |
private final static String SCHEMA_ID = "dnet:provenanceActions"; |
30 |
// private final static String TRUST = "0.9"; |
|
31 | 31 |
|
32 | 32 |
private ImmutableBytesWritable keyOut; |
33 |
//private Text keyOut; |
|
34 | 33 |
|
35 | 34 |
@Override |
36 | 35 |
protected void setup(final Context context) throws IOException, InterruptedException { |
37 | 36 |
super.setup(context); |
38 |
keyOut = new ImmutableBytesWritable(); |
|
39 |
//keyOut = new Text(); |
|
37 |
keyOut = new ImmutableBytesWritable(); |
|
40 | 38 |
} |
41 | 39 |
|
42 | 40 |
@Override |
43 |
protected void reduce(final InstOrgKey dsId, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
|
|
41 |
protected void reduce(final InstOrgKey key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
|
|
44 | 42 |
|
45 |
final Iterator<ImmutableBytesWritable> it = values.iterator();
|
|
43 |
final Iterator<Text> it = values.iterator();
|
|
46 | 44 |
if(!it.hasNext()){ |
47 | 45 |
context.getCounter(COUNTER_PROPAGATION,"empty information for key").increment(1); |
48 | 46 |
return; |
49 | 47 |
} |
50 |
Gson gson = new Gson(); |
|
51 |
Value v = gson.fromJson(Bytes.toString(it.next().get()),Value.class); |
|
52 |
final String first = Bytes.toString(v.getValue().get()); |
|
53 |
final String trust = v.getTrust(); |
|
54 |
if (!(first.equals(ZERO) || first.equals(ONE))) { |
|
55 |
if (dsId.getKeyType().get() == TypeProtos.Type.organization.getNumber()) |
|
56 |
throw new RuntimeException(String.format("First Element in reducer is not type of datasource, key: '%s', value: '%s' but the organization exists", dsId.toString(), first)); |
|
48 |
final Value first = Value.fromJson(it.next().toString()); |
|
49 |
|
|
50 |
if (!(first.getValue().equals(ZERO) || first.getValue().equals(ONE))) { |
|
51 |
if (key.getKeyType().get() == TypeProtos.Type.organization.getNumber()) |
|
52 |
throw new RuntimeException(String.format("First Element in reducer is not type of datasource, key: '%s', value: '%s' but the organization exists", key.toString(), first.getValue())); |
|
57 | 53 |
else { |
58 |
context.getCounter(COUNTER_PROPAGATION,String.format("WARNINGS: First element value is %s " , first));
|
|
54 |
context.getCounter(COUNTER_PROPAGATION, "WARNING: unexpected first element");
|
|
59 | 55 |
while (it.hasNext()) { |
60 | 56 |
|
61 |
byte[] targetRowKey = gson.fromJson(Bytes.toString(it.next().get()), Value.class).getValue().get(); |
|
62 |
String resultId = Bytes.toString(targetRowKey); |
|
63 |
if (!resultId.startsWith("50|")) |
|
57 |
String resultId = Value.fromJson(it.next().toString()).getValue(); |
|
58 |
if (!resultId.startsWith("50|")) { |
|
64 | 59 |
throw new RuntimeException("Wrong ordering. CHECK!!!"); |
60 |
} |
|
65 | 61 |
} |
66 | 62 |
} |
67 | 63 |
} |
68 | 64 |
|
69 | 65 |
//ensure we are dealing with an institutional repository |
70 |
if (first.equals(ONE)) { |
|
66 |
if (first.getValue().equals(ONE)) {
|
|
71 | 67 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
72 |
if(!it.hasNext()){ |
|
68 |
if(!it.hasNext()) {
|
|
73 | 69 |
context.getCounter(COUNTER_PROPAGATION,"no information apart of type of datasource").increment(1); |
74 | 70 |
return; |
75 | 71 |
} |
76 | 72 |
|
77 |
final String country = Bytes.toString(gson.fromJson(Bytes.toString(it.next().get()),Value.class).getValue().get()); // need to update the information for the country to each element in the iterator |
|
78 |
if(country.trim().length() != 2){ |
|
79 |
try{ |
|
73 |
|
|
74 |
final String country = Value.fromJson(it.next().toString()).getValue(); // need to update the information for the country to each element in the iterator |
|
75 |
if(country.trim().length() != 2) { |
|
76 |
try { |
|
80 | 77 |
Integer.parseInt(country.trim()); |
81 |
}catch(Exception e){ |
|
82 |
throw new RuntimeException("Second Element in reducer is not country " + country); |
|
83 |
} |
|
78 |
} catch(Exception e) { } |
|
84 | 79 |
throw new RuntimeException("Second Element in reducer is not country " + country); |
85 | 80 |
} |
86 | 81 |
|
87 | 82 |
boolean propagate = true; |
88 | 83 |
while(it.hasNext()) { |
89 | 84 |
|
90 |
byte[] targetRowKey = gson.fromJson(Bytes.toString(it.next().get()),Value.class).getValue().get(); |
|
91 |
String resultId = Bytes.toString(targetRowKey); |
|
85 |
final String resultId = Value.fromJson(it.next().toString()).getValue(); |
|
92 | 86 |
|
93 | 87 |
if (!resultId.startsWith(("50|"))) { |
94 | 88 |
if(!resultId.equalsIgnoreCase(country)) { |
95 |
context.getCounter(COUNTER_PROPAGATION, String.format("resultId expected in ordering was not found. First country %s, second country %s", country, resultId)).increment(1);
|
|
89 |
context.getCounter(COUNTER_PROPAGATION, "resultId expected in ordering was not found").increment(1);
|
|
96 | 90 |
//throw new RuntimeException("Problem in country values of institutional repositories" + targetRowKey); |
97 | 91 |
propagate = false; |
98 | 92 |
} |
99 | 93 |
|
100 |
}else{ |
|
101 |
if (propagate){ |
|
102 |
byte[] oafUpdate = getOafCountry(resultId, country,trust); |
|
103 |
final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_"+System.nanoTime()), oafUpdate); |
|
94 |
} else { |
|
95 |
if (propagate) { |
|
104 | 96 |
|
97 |
byte[] oafUpdate = getOafCountry(resultId, country, first.getTrust()); |
|
98 |
byte[] targetRowKey = Bytes.toBytes(resultId); |
|
99 |
final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate); |
|
105 | 100 |
keyOut.set(targetRowKey); |
106 | 101 |
context.write(keyOut, put); |
107 |
context.getCounter(COUNTER_PROPAGATION, String.format(" added country %s to product from repo %s",country, StringUtils.substringBefore( resultId,"::"))).increment(1);
|
|
102 |
context.getCounter(COUNTER_PROPAGATION, "added country to product").increment(1);
|
|
108 | 103 |
} |
109 |
|
|
110 | 104 |
} |
111 |
|
|
112 | 105 |
} |
113 | 106 |
} else { |
114 | 107 |
context.getCounter(COUNTER_PROPAGATION,"not allowed dsType institutional datasource").increment(1); |
Also available in: Unified diff
fixing and testing propagation implementation