Project

General

Profile

« Previous | Next » 

Revision 45419

work in progress, adapting m/r jobs to the updated graph domain version

View differences:

RootExportMapper.java
1
//package eu.dnetlib.data.mapreduce.hbase.dedup.gt;
2
//
3
//import java.io.IOException;
4
//import java.util.Map;
5
//
6
//import org.apache.hadoop.hbase.client.Result;
7
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
//import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
//import org.apache.hadoop.hbase.util.Bytes;
10
//import org.apache.hadoop.io.Text;
11
//import org.apache.hadoop.mapreduce.Counter;
12
//
13
//import com.google.protobuf.InvalidProtocolBufferException;
14
//import com.googlecode.protobuf.format.JsonFormat;
15
//
16
//import eu.dnetlib.data.mapreduce.util.DedupUtils;
17
//import eu.dnetlib.data.mapreduce.util.DNGFRowKeyDecoder;
18
//import eu.dnetlib.data.proto.KindProtos.Kind;
19
//import eu.dnetlib.data.proto.DNGFProtos.DNGF;
20
//import eu.dnetlib.data.proto.TypeProtos.Type;
21
//
22
//public class RootExportMapper extends TableMapper<Text, Text> {
23
//
24
//	private Text outKey;
25
//
26
//	private Text outValue;
27
//
28
//	@Override
29
//	protected void setup(final Context context) throws IOException, InterruptedException {
30
//		outKey = new Text("");
31
//		outValue = new Text();
32
//	}
33
//
34
//	@Override
35
//	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
36
//
37
//		final DNGFRowKeyDecoder keyDecoder = DNGFRowKeyDecoder.decode(keyIn.copyBytes());
38
//
39
//		final Type type = keyDecoder.getType();
40
//		if (!DedupUtils.isRoot(keyDecoder.getId())) return;
41
//
42
//		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString()));
43
//		final byte[] bodyB = map.get(DedupUtils.BODY_B);
44
//		if (bodyB == null) {
45
//			incrementCounter(context, "missing body (map)", type.toString(), 1);
46
//			return;
47
//		}
48
//
49
//		final DNGF oaf = decodeProto(context, bodyB);
50
//
51
//		if (!isValid(oaf)) {
52
//			incrementCounter(context, "missing body (map)", type.toString(), 1);
53
//			return;
54
//		}
55
//
56
//		if (oaf.getDataInfo().getDeletedbyinference()) {
57
//			incrementCounter(context, "deleted by inference", type.toString(), 1);
58
//			return;
59
//		}
60
//
61
//		emit(new String(keyIn.copyBytes()), context, oaf);
62
//		incrementCounter(context, Kind.entity.toString(), type.toString(), 1);
63
//	}
64
//
65
//	private boolean isValid(final DNGF oaf) {
66
//		return (oaf != null) && oaf.isInitialized();
67
//	}
68
//
69
//	private DNGF decodeProto(final Context context, final byte[] body) {
70
//		try {
71
//			return DNGF.parseFrom(body);
72
//		} catch (final InvalidProtocolBufferException e) {
73
//			e.printStackTrace(System.err);
74
//			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
75
//		}
76
//		return null;
77
//	}
78
//
79
//	private void emit(final String key, final Context context, final DNGF oaf) throws IOException, InterruptedException {
80
//		// outKey.set(key);
81
//		outValue.set(new JsonFormat().printToString(oaf));
82
//
83
//		context.write(outKey, outValue);
84
//	}
85
//
86
//	private void incrementCounter(final Context context, final String k, final String t, final int n) {
87
//		getCounter(context, k, t).increment(n);
88
//	}
89
//
90
//	private Counter getCounter(final Context context, final String k, final String t) {
91
//		return context.getCounter(k, t);
92
//	}
93
//
94
//
95
//}
1
package eu.dnetlib.data.mapreduce.hbase.dedup.gt;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import eu.dnetlib.data.graph.model.DNGFRowKeyDecoder;
7
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12
import org.apache.hadoop.io.Text;
13
import org.apache.hadoop.mapreduce.Counter;
14

  
15
import com.google.protobuf.InvalidProtocolBufferException;
16
import com.googlecode.protobuf.format.JsonFormat;
17

  
18

  
19
import eu.dnetlib.data.proto.KindProtos.Kind;
20
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
21
import eu.dnetlib.data.proto.TypeProtos.Type;
22

  
23
public class RootExportMapper extends TableMapper<Text, Text> {
24

  
25
	private Text outKey;
26

  
27
	private Text outValue;
28

  
29
	@Override
30
	protected void setup(final Context context) throws IOException, InterruptedException {
31
		outKey = new Text("");
32
		outValue = new Text();
33
	}
34

  
35
	@Override
36
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
37

  
38
		final DNGFRowKeyDecoder keyDecoder = DNGFRowKeyDecoder.decode(keyIn.copyBytes());
39

  
40
		final Type type = keyDecoder.getType();
41
		if (!HBaseTableDAO.isRoot(keyDecoder.getId())) return;
42

  
43
		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString()));
44
		final byte[] bodyB = map.get(HBaseTableDAO.cfMetadataByte());
45
		if (bodyB == null) {
46
			incrementCounter(context, "missing body (map)", type.toString(), 1);
47
			return;
48
		}
49

  
50
		final DNGF oaf = decodeProto(context, bodyB);
51

  
52
		if (!isValid(oaf)) {
53
			incrementCounter(context, "missing body (map)", type.toString(), 1);
54
			return;
55
		}
56

  
57
		if (oaf.getDataInfo().getDeletedbyinference()) {
58
			incrementCounter(context, "deleted by inference", type.toString(), 1);
59
			return;
60
		}
61

  
62
		emit(new String(keyIn.copyBytes()), context, oaf);
63
		incrementCounter(context, Kind.entity.toString(), type.toString(), 1);
64
	}
65

  
66
	private boolean isValid(final DNGF oaf) {
67
		return (oaf != null) && oaf.isInitialized();
68
	}
69

  
70
	private DNGF decodeProto(final Context context, final byte[] body) {
71
		try {
72
			return DNGF.parseFrom(body);
73
		} catch (final InvalidProtocolBufferException e) {
74
			e.printStackTrace(System.err);
75
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
76
		}
77
		return null;
78
	}
79

  
80
	private void emit(final String key, final Context context, final DNGF oaf) throws IOException, InterruptedException {
81
		// outKey.set(key);
82
		outValue.set(new JsonFormat().printToString(oaf));
83

  
84
		context.write(outKey, outValue);
85
	}
86

  
87
	private void incrementCounter(final Context context, final String k, final String t, final int n) {
88
		getCounter(context, k, t).increment(n);
89
	}
90

  
91
	private Counter getCounter(final Context context, final String k, final String t) {
92
		return context.getCounter(k, t);
93
	}
94

  
95

  
96
}

Also available in: Unified diff