Revision 57513
Added by Claudio Atzori over 4 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper2DHP.java | ||
---|---|---|
8 | 8 |
import eu.dnetlib.data.proto.OafProtos; |
9 | 9 |
import eu.dnetlib.data.proto.TypeProtos; |
10 | 10 |
import eu.dnetlib.dhp.schema.oaf.Oaf; |
11 |
import eu.dnetlib.dhp.schema.oaf.Relation; |
|
11 | 12 |
import org.apache.hadoop.hbase.client.Result; |
12 | 13 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
13 | 14 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
14 | 15 |
import org.apache.hadoop.hbase.util.Bytes; |
15 | 16 |
import org.apache.hadoop.io.Text; |
17 |
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; |
|
16 | 18 |
|
17 | 19 |
import java.io.IOException; |
18 | 20 |
import java.util.Map; |
... | ... | |
31 | 33 |
|
32 | 34 |
private Text valueOut; |
33 | 35 |
|
36 |
private MultipleOutputs multipleOutputs; |
|
37 |
|
|
34 | 38 |
private Gson g; |
35 | 39 |
|
36 | 40 |
@Override |
... | ... | |
39 | 43 |
|
40 | 44 |
keyOut = new Text(); |
41 | 45 |
valueOut = new Text(); |
46 |
multipleOutputs = new MultipleOutputs(context); |
|
42 | 47 |
g = new Gson(); |
43 | 48 |
} |
44 | 49 |
|
45 | 50 |
@Override |
51 |
protected void cleanup(Context context) throws IOException, InterruptedException { |
|
52 |
multipleOutputs.close(); |
|
53 |
} |
|
54 |
|
|
55 |
@Override |
|
46 | 56 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
47 | 57 |
try { |
48 | 58 |
final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(keyIn.get()); |
49 | 59 |
|
50 |
final OafProtos.Oaf oaf = mergeUpdates(value, context, rkd.getType());
|
|
60 |
final OafProtos.Oaf oaf = UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(rkd.getType().toString())));
|
|
51 | 61 |
|
52 | 62 |
if (oaf == null) { |
53 | 63 |
return; |
... | ... | |
60 | 70 |
for (byte[] cf : row.keySet()) { |
61 | 71 |
|
62 | 72 |
for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) { |
63 |
if(Bytes.toString(q.getKey()).startsWith("update") || q.getKey().equals(DedupUtils.BODY_S)) { |
|
73 |
final String key = new String(q.getKey()); |
|
74 |
if("update".startsWith(key) || DedupUtils.BODY_S.equals(key)) { |
|
64 | 75 |
continue; |
65 | 76 |
} |
66 | 77 |
if (new String(q.getValue()).equals("")) { |
... | ... | |
86 | 97 |
if (result != null) { |
87 | 98 |
keyOut.set(result.getClass().getName()); |
88 | 99 |
valueOut.set(g.toJson(result)); |
89 |
context.write(keyOut, valueOut); |
|
90 | 100 |
|
101 |
final String namedOutput = result.getClass().getSimpleName().toLowerCase(); |
|
102 |
multipleOutputs.write(namedOutput, keyOut, valueOut, namedOutput + "/" + namedOutput); |
|
103 |
|
|
91 | 104 |
context.getCounter("export", result.getClass().getName()).increment(1); |
92 | 105 |
} |
93 | 106 |
} |
94 | 107 |
|
95 |
private OafProtos.Oaf mergeUpdates(final Result value, final Context context, final TypeProtos.Type type) |
|
96 |
throws InvalidProtocolBufferException { |
|
97 |
try { |
|
98 |
return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString()))); |
|
99 |
} catch (final InvalidProtocolBufferException e) { |
|
100 |
throw e; |
|
101 |
} |
|
102 |
} |
|
103 |
|
|
104 | 108 |
} |
Also available in: Unified diff
fixed infospace export procedure, avoid to emit the same result more than once