Revision 57508
Added by Claudio Atzori almost 5 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper2DHP.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import com.google.gson.Gson; |
4 | 4 |
import com.google.protobuf.InvalidProtocolBufferException; |
5 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
5 | 6 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
6 | 7 |
import eu.dnetlib.data.mapreduce.util.UpdateMerger; |
7 | 8 |
import eu.dnetlib.data.proto.OafProtos; |
... | ... | |
50 | 51 |
|
51 | 52 |
if (oaf == null) { |
52 | 53 |
return; |
54 |
} else { |
|
55 |
emit(context, oaf); |
|
53 | 56 |
} |
54 | 57 |
|
55 | 58 |
final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap(); |
... | ... | |
57 | 60 |
for (byte[] cf : row.keySet()) { |
58 | 61 |
|
59 | 62 |
for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) { |
60 |
|
|
61 |
if(Bytes.toString(q.getKey()).startsWith("update")) { |
|
63 |
if(Bytes.toString(q.getKey()).startsWith("update") || q.getKey().equals(DedupUtils.BODY_S)) { |
|
62 | 64 |
continue; |
63 | 65 |
} |
64 |
|
|
65 |
Oaf result = null; |
|
66 |
try { |
|
67 |
result = ProtoConverter.convert(oaf); |
|
68 |
} catch (Throwable e) { |
|
69 |
context.getCounter("Convert", "error" ).increment(1); |
|
66 |
if (new String(q.getValue()).equals("")) { |
|
67 |
context.getCounter("export", "skipped relation " + new String(cf)).increment(1); |
|
68 |
} else { |
|
69 |
emit(context, OafProtos.Oaf.parseFrom(q.getValue())); |
|
70 | 70 |
} |
71 |
if (result != null) { |
|
72 |
emit(result.getClass().getName(), result, context); |
|
73 |
} |
|
74 | 71 |
} |
75 | 72 |
} |
76 | 73 |
} catch (final Throwable e) { |
... | ... | |
78 | 75 |
throw new RuntimeException(e); |
79 | 76 |
} |
80 | 77 |
} |
81 |
private void emit(final String type, final Oaf result, final Context context) throws IOException, InterruptedException { |
|
82 |
keyOut.set(type); |
|
83 |
valueOut.set(g.toJson(result)); |
|
84 |
context.write(keyOut, valueOut); |
|
85 | 78 |
|
86 |
context.getCounter("export", type).increment(1); |
|
79 |
private void emit(Context context, OafProtos.Oaf oaf) throws IOException, InterruptedException { |
|
80 |
Oaf result = null; |
|
81 |
try { |
|
82 |
result = ProtoConverter.convert(oaf); |
|
83 |
} catch (Throwable e) { |
|
84 |
context.getCounter("export", "error:" + e.getClass().getName()).increment(1); |
|
85 |
} |
|
86 |
if (result != null) { |
|
87 |
keyOut.set(result.getClass().getName()); |
|
88 |
valueOut.set(g.toJson(result)); |
|
89 |
context.write(keyOut, valueOut); |
|
90 |
|
|
91 |
context.getCounter("export", result.getClass().getName()).increment(1); |
|
92 |
} |
|
87 | 93 |
} |
88 | 94 |
|
89 | 95 |
private OafProtos.Oaf mergeUpdates(final Result value, final Context context, final TypeProtos.Type type) |
Also available in: Unified diff
fixed export procedure: include also relationships