Revision 45419
Added by Claudio Atzori over 7 years ago
RootExportMapper.java | ||
---|---|---|
1 |
//package eu.dnetlib.data.mapreduce.hbase.dedup.gt; |
|
2 |
// |
|
3 |
//import java.io.IOException; |
|
4 |
//import java.util.Map; |
|
5 |
// |
|
6 |
//import org.apache.hadoop.hbase.client.Result; |
|
7 |
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
//import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
//import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
//import org.apache.hadoop.io.Text; |
|
11 |
//import org.apache.hadoop.mapreduce.Counter; |
|
12 |
// |
|
13 |
//import com.google.protobuf.InvalidProtocolBufferException; |
|
14 |
//import com.googlecode.protobuf.format.JsonFormat; |
|
15 |
// |
|
16 |
//import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
17 |
//import eu.dnetlib.data.mapreduce.util.DNGFRowKeyDecoder; |
|
18 |
//import eu.dnetlib.data.proto.KindProtos.Kind; |
|
19 |
//import eu.dnetlib.data.proto.DNGFProtos.DNGF; |
|
20 |
//import eu.dnetlib.data.proto.TypeProtos.Type; |
|
21 |
// |
|
22 |
//public class RootExportMapper extends TableMapper<Text, Text> { |
|
23 |
// |
|
24 |
// private Text outKey; |
|
25 |
// |
|
26 |
// private Text outValue; |
|
27 |
// |
|
28 |
// @Override |
|
29 |
// protected void setup(final Context context) throws IOException, InterruptedException { |
|
30 |
// outKey = new Text(""); |
|
31 |
// outValue = new Text(); |
|
32 |
// } |
|
33 |
// |
|
34 |
// @Override |
|
35 |
// protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
36 |
// |
|
37 |
// final DNGFRowKeyDecoder keyDecoder = DNGFRowKeyDecoder.decode(keyIn.copyBytes()); |
|
38 |
// |
|
39 |
// final Type type = keyDecoder.getType(); |
|
40 |
// if (!DedupUtils.isRoot(keyDecoder.getId())) return; |
|
41 |
// |
|
42 |
// final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString())); |
|
43 |
// final byte[] bodyB = map.get(DedupUtils.BODY_B); |
|
44 |
// if (bodyB == null) { |
|
45 |
// incrementCounter(context, "missing body (map)", type.toString(), 1); |
|
46 |
// return; |
|
47 |
// } |
|
48 |
// |
|
49 |
// final DNGF oaf = decodeProto(context, bodyB); |
|
50 |
// |
|
51 |
// if (!isValid(oaf)) { |
|
52 |
// incrementCounter(context, "missing body (map)", type.toString(), 1); |
|
53 |
// return; |
|
54 |
// } |
|
55 |
// |
|
56 |
// if (oaf.getDataInfo().getDeletedbyinference()) { |
|
57 |
// incrementCounter(context, "deleted by inference", type.toString(), 1); |
|
58 |
// return; |
|
59 |
// } |
|
60 |
// |
|
61 |
// emit(new String(keyIn.copyBytes()), context, oaf); |
|
62 |
// incrementCounter(context, Kind.entity.toString(), type.toString(), 1); |
|
63 |
// } |
|
64 |
// |
|
65 |
// private boolean isValid(final DNGF oaf) { |
|
66 |
// return (oaf != null) && oaf.isInitialized(); |
|
67 |
// } |
|
68 |
// |
|
69 |
// private DNGF decodeProto(final Context context, final byte[] body) { |
|
70 |
// try { |
|
71 |
// return DNGF.parseFrom(body); |
|
72 |
// } catch (final InvalidProtocolBufferException e) { |
|
73 |
// e.printStackTrace(System.err); |
|
74 |
// context.getCounter("decodeProto", e.getClass().getName()).increment(1); |
|
75 |
// } |
|
76 |
// return null; |
|
77 |
// } |
|
78 |
// |
|
79 |
// private void emit(final String key, final Context context, final DNGF oaf) throws IOException, InterruptedException { |
|
80 |
// // outKey.set(key); |
|
81 |
// outValue.set(new JsonFormat().printToString(oaf)); |
|
82 |
// |
|
83 |
// context.write(outKey, outValue); |
|
84 |
// } |
|
85 |
// |
|
86 |
// private void incrementCounter(final Context context, final String k, final String t, final int n) { |
|
87 |
// getCounter(context, k, t).increment(n); |
|
88 |
// } |
|
89 |
// |
|
90 |
// private Counter getCounter(final Context context, final String k, final String t) { |
|
91 |
// return context.getCounter(k, t); |
|
92 |
// } |
|
93 |
// |
|
94 |
// |
|
95 |
//} |
|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.gt; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import eu.dnetlib.data.graph.model.DNGFRowKeyDecoder; |
|
7 |
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO; |
|
8 |
import org.apache.hadoop.hbase.client.Result; |
|
9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
10 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
11 |
import org.apache.hadoop.hbase.util.Bytes; |
|
12 |
import org.apache.hadoop.io.Text; |
|
13 |
import org.apache.hadoop.mapreduce.Counter; |
|
14 |
|
|
15 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
16 |
import com.googlecode.protobuf.format.JsonFormat; |
|
17 |
|
|
18 |
|
|
19 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
|
20 |
import eu.dnetlib.data.proto.DNGFProtos.DNGF; |
|
21 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
22 |
|
|
23 |
public class RootExportMapper extends TableMapper<Text, Text> { |
|
24 |
|
|
25 |
private Text outKey; |
|
26 |
|
|
27 |
private Text outValue; |
|
28 |
|
|
29 |
@Override |
|
30 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
31 |
outKey = new Text(""); |
|
32 |
outValue = new Text(); |
|
33 |
} |
|
34 |
|
|
35 |
@Override |
|
36 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
37 |
|
|
38 |
final DNGFRowKeyDecoder keyDecoder = DNGFRowKeyDecoder.decode(keyIn.copyBytes()); |
|
39 |
|
|
40 |
final Type type = keyDecoder.getType(); |
|
41 |
if (!HBaseTableDAO.isRoot(keyDecoder.getId())) return; |
|
42 |
|
|
43 |
final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString())); |
|
44 |
final byte[] bodyB = map.get(HBaseTableDAO.cfMetadataByte()); |
|
45 |
if (bodyB == null) { |
|
46 |
incrementCounter(context, "missing body (map)", type.toString(), 1); |
|
47 |
return; |
|
48 |
} |
|
49 |
|
|
50 |
final DNGF oaf = decodeProto(context, bodyB); |
|
51 |
|
|
52 |
if (!isValid(oaf)) { |
|
53 |
incrementCounter(context, "missing body (map)", type.toString(), 1); |
|
54 |
return; |
|
55 |
} |
|
56 |
|
|
57 |
if (oaf.getDataInfo().getDeletedbyinference()) { |
|
58 |
incrementCounter(context, "deleted by inference", type.toString(), 1); |
|
59 |
return; |
|
60 |
} |
|
61 |
|
|
62 |
emit(new String(keyIn.copyBytes()), context, oaf); |
|
63 |
incrementCounter(context, Kind.entity.toString(), type.toString(), 1); |
|
64 |
} |
|
65 |
|
|
66 |
private boolean isValid(final DNGF oaf) { |
|
67 |
return (oaf != null) && oaf.isInitialized(); |
|
68 |
} |
|
69 |
|
|
70 |
private DNGF decodeProto(final Context context, final byte[] body) { |
|
71 |
try { |
|
72 |
return DNGF.parseFrom(body); |
|
73 |
} catch (final InvalidProtocolBufferException e) { |
|
74 |
e.printStackTrace(System.err); |
|
75 |
context.getCounter("decodeProto", e.getClass().getName()).increment(1); |
|
76 |
} |
|
77 |
return null; |
|
78 |
} |
|
79 |
|
|
80 |
private void emit(final String key, final Context context, final DNGF oaf) throws IOException, InterruptedException { |
|
81 |
// outKey.set(key); |
|
82 |
outValue.set(new JsonFormat().printToString(oaf)); |
|
83 |
|
|
84 |
context.write(outKey, outValue); |
|
85 |
} |
|
86 |
|
|
87 |
private void incrementCounter(final Context context, final String k, final String t, final int n) { |
|
88 |
getCounter(context, k, t).increment(n); |
|
89 |
} |
|
90 |
|
|
91 |
private Counter getCounter(final Context context, final String k, final String t) { |
|
92 |
return context.getCounter(k, t); |
|
93 |
} |
|
94 |
|
|
95 |
|
|
96 |
} |
Also available in: Unified diff
work in progress, adapting m/r jobs to the updated graph domain version