Revision 54064
Added by Miriam Baglioni over 5 years ago
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultFileReducer.java | ||
---|---|---|
10 | 10 |
import org.apache.hadoop.mapreduce.Reducer; |
11 | 11 |
|
12 | 12 |
import java.io.IOException; |
13 |
import java.util.List; |
|
13 | 14 |
|
14 | 15 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.COUNTER_PROPAGATION; |
15 | 16 |
|
... | ... | |
33 | 34 |
|
34 | 35 |
outValue.set(data.getBytes()); |
35 | 36 |
try { |
37 |
keyOut.set(key); |
|
36 | 38 |
context.write(keyOut, outValue); |
37 | 39 |
} catch (Exception e) { |
38 | 40 |
e.printStackTrace(); |
... | ... | |
53 | 55 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
54 | 56 |
|
55 | 57 |
while(rh.hasNext()){ |
56 |
OafProtos.Oaf oafUpdate = rh.next(); |
|
57 |
emit(context, rh.getResultId(), JsonFormat.printToString(oafUpdate));
|
|
58 |
OafProtos.Oaf oafUpdate = rh.next().get(0);
|
|
59 |
emit(context, oafUpdate.getEntity().getId(), JsonFormat.printToString(oafUpdate));
|
|
58 | 60 |
context.getCounter(COUNTER_PROPAGATION, " added country to product ").increment(1); |
59 | 61 |
} |
60 | 62 |
|
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/PropagationCountryFromDsOrgResultReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.propagation.country.institutionalrepositories; |
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 |
import java.util.List; |
|
4 | 5 |
|
5 | 6 |
|
6 | 7 |
import eu.dnetlib.data.mapreduce.hbase.propagation.NotValidResultSequenceException; |
7 | 8 |
import eu.dnetlib.data.mapreduce.hbase.propagation.ResultIterator; |
8 | 9 |
import eu.dnetlib.data.mapreduce.hbase.propagation.Value; |
10 |
import eu.dnetlib.data.proto.OafProtos; |
|
9 | 11 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
10 | 12 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
11 | 13 |
import org.apache.hadoop.hbase.util.Bytes; |
... | ... | |
45 | 47 |
context.getCounter(COUNTER_PROPAGATION,"institutional datasource").increment(1); |
46 | 48 |
|
47 | 49 |
while(rh.hasNext()){ |
48 |
byte[] oafUpdate = rh.next().toByteArray(); |
|
50 |
List<OafProtos.Oaf> oap = rh.next(); |
|
51 |
byte[] oafUpdate = oap.get(0).toByteArray(); |
|
49 | 52 |
|
50 |
byte[] targetRowKey = Bytes.toBytes(rh.getResultId());
|
|
53 |
byte[] targetRowKey = Bytes.toBytes(oap.get(0).getEntity().getId());
|
|
51 | 54 |
final Put put = new Put(targetRowKey).add(Bytes.toBytes("result"), Bytes.toBytes("update_" + System.nanoTime()), oafUpdate); |
52 | 55 |
keyOut.set(targetRowKey); |
53 | 56 |
context.write(keyOut, put); |
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/propagation/country/institutionalrepositories/ResultCountryIterator.java | ||
---|---|---|
9 | 9 |
import eu.dnetlib.data.proto.TypeProtos; |
10 | 10 |
import org.apache.hadoop.io.Text; |
11 | 11 |
|
12 |
import java.util.ArrayList; |
|
13 |
import java.util.Arrays; |
|
14 |
import java.util.List; |
|
15 |
|
|
12 | 16 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.PropagationConstants.*; |
13 | 17 |
import static eu.dnetlib.data.mapreduce.hbase.propagation.Utils.getCountry; |
14 | 18 |
|
15 | 19 |
public class ResultCountryIterator extends ResultIterator { |
16 | 20 |
private String country; |
17 |
private String trust; |
|
18 | 21 |
|
22 |
|
|
19 | 23 |
public ResultCountryIterator(final Iterable<Text> values, int key) throws NotValidResultSequenceException { |
20 | 24 |
super(values,key); |
21 | 25 |
} |
... | ... | |
79 | 83 |
} |
80 | 84 |
|
81 | 85 |
@Override |
82 |
public OafProtos.Oaf next() {
|
|
86 |
public List<OafProtos.Oaf> next() {
|
|
83 | 87 |
final ResultProtos.Result.Metadata.Builder metadata = ResultProtos.Result.Metadata.newBuilder().addCountry( |
84 | 88 |
getCountry(country,trust,DNET_COUNTRY_SCHEMA,CLASS_COUNTRY_ID,SCHEMA_ID,SCHEMA_NAME,DATA_INFO_TYPE,"Propagation of country information from datasources belonging to institutional repositories")); |
85 | 89 |
final ResultProtos.Result.Builder result = ResultProtos.Result.newBuilder().setMetadata(metadata); |
... | ... | |
92 | 96 |
else |
93 | 97 |
resultId = TERMINATOR; |
94 | 98 |
|
95 |
return OafProtos.Oaf.newBuilder() |
|
99 |
return new ArrayList<OafProtos.Oaf>(Arrays.asList(OafProtos.Oaf.newBuilder()
|
|
96 | 100 |
.setKind(KindProtos.Kind.entity) |
97 | 101 |
.setEntity(entity) |
98 |
.build(); |
|
102 |
.build()));
|
|
99 | 103 |
} |
100 | 104 |
|
101 | 105 |
|
Also available in: Unified diff
fixed issue