Revision 57505
Added by Claudio Atzori over 4 years ago
ExportInformationSpaceMapper2DHP.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.dataexport; |
2 | 2 |
|
3 |
import com.google.common.base.Joiner; |
|
4 | 3 |
import com.google.gson.Gson; |
5 |
import com.googlecode.protobuf.format.JsonFormat; |
|
6 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
4 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
5 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
6 |
import eu.dnetlib.data.mapreduce.util.UpdateMerger; |
|
7 | 7 |
import eu.dnetlib.data.proto.OafProtos; |
8 |
import eu.dnetlib.data.proto.TypeProtos; |
|
8 | 9 |
import eu.dnetlib.dhp.schema.oaf.Oaf; |
9 |
import eu.dnetlib.dhp.schema.util.ProtoConverter; |
|
10 |
import eu.dnetlib.dhp.schema.util.ProtoUtils; |
|
11 |
import org.apache.commons.logging.Log; |
|
12 |
import org.apache.commons.logging.LogFactory; |
|
13 | 10 |
import org.apache.hadoop.hbase.client.Result; |
14 | 11 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
15 | 12 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
13 |
import org.apache.hadoop.hbase.util.Bytes; |
|
16 | 14 |
import org.apache.hadoop.io.Text; |
17 | 15 |
|
18 | 16 |
import java.io.IOException; |
... | ... | |
28 | 26 |
*/ |
29 | 27 |
public class ExportInformationSpaceMapper2DHP extends TableMapper<Text, Text> { |
30 | 28 |
|
31 |
/** |
|
32 |
* logger. |
|
33 |
*/ |
|
34 |
private static final Log log = LogFactory.getLog(ExportInformationSpaceMapper2DHP.class); |
|
35 |
|
|
36 |
|
|
37 | 29 |
private Text keyOut; |
38 | 30 |
|
39 | 31 |
private Text valueOut; |
... | ... | |
52 | 44 |
@Override |
53 | 45 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
54 | 46 |
try { |
55 |
Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
|
|
47 |
final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(keyIn.get());
|
|
56 | 48 |
|
49 |
final OafProtos.Oaf oaf = mergeUpdates(value, context, rkd.getType()); |
|
50 |
|
|
51 |
if (oaf == null) { |
|
52 |
return; |
|
53 |
} |
|
54 |
|
|
55 |
final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap(); |
|
56 |
|
|
57 | 57 |
for (byte[] cf : row.keySet()) { |
58 | 58 |
|
59 | 59 |
for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) { |
60 |
final OafProtos.Oaf oaf = OafDecoder.decode(q.getValue()).getOaf(); |
|
60 |
|
|
61 |
if(Bytes.toString(q.getKey()).startsWith("update")) { |
|
62 |
continue; |
|
63 |
} |
|
64 |
|
|
61 | 65 |
Oaf result = null; |
62 | 66 |
try { |
63 |
result = ProtoConverter.convert(JsonFormat.printToString(oaf));
|
|
67 |
result = ProtoConverter.convert(oaf);
|
|
64 | 68 |
} catch (Throwable e) { |
65 | 69 |
context.getCounter("Convert", "error" ).increment(1); |
66 | 70 |
} |
... | ... | |
70 | 74 |
} |
71 | 75 |
} |
72 | 76 |
} catch (final Throwable e) { |
73 |
log.error("error exporting the following record from HBase: " + value.toString(), e); |
|
74 |
context.getCounter("error", e.getClass().getName()).increment(1); |
|
77 |
context.getCounter("export", "error: " + e.getClass().getName()).increment(1); |
|
75 | 78 |
throw new RuntimeException(e); |
76 | 79 |
} |
77 | 80 |
} |
... | ... | |
79 | 82 |
keyOut.set(type); |
80 | 83 |
valueOut.set(g.toJson(result)); |
81 | 84 |
context.write(keyOut, valueOut); |
85 |
|
|
86 |
context.getCounter("export", type).increment(1); |
|
82 | 87 |
} |
88 |
|
|
89 |
private OafProtos.Oaf mergeUpdates(final Result value, final Context context, final TypeProtos.Type type) |
|
90 |
throws InvalidProtocolBufferException { |
|
91 |
try { |
|
92 |
return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString()))); |
|
93 |
} catch (final InvalidProtocolBufferException e) { |
|
94 |
throw e; |
|
95 |
} |
|
96 |
} |
|
97 |
|
|
83 | 98 |
} |
Also available in: Unified diff
included infospace mapping towards the new OAF DHP model