Project

General

Profile

1
//package eu.dnetlib.data.mapreduce.hbase.broker;
2
//
3
//import java.io.IOException;
4
//
5
//import eu.dnetlib.data.mapreduce.util.DedupUtils;
6
//import eu.dnetlib.data.mapreduce.util.DNGFRowKeyDecoder;
7
//import eu.dnetlib.data.proto.FieldTypeProtos.KeyValue;
8
//import eu.dnetlib.data.proto.DNGFProtos.DNGF;
9
//import eu.dnetlib.data.proto.PersonProtos.Person;
10
//import eu.dnetlib.data.proto.PublicationProtos.Publication;
11
//import org.apache.commons.lang3.StringUtils;
12
//import org.apache.hadoop.hbase.client.Result;
13
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
//import org.apache.hadoop.hbase.mapreduce.TableMapper;
15
//import org.apache.hadoop.hbase.util.Bytes;
16
//import org.apache.hadoop.io.Text;
17
//
18
//public class CalculatePersonDistributionStep1Mapper extends TableMapper<Text, Text> {
19
//
20
//	@Override
21
//	protected void setup(final Context context) throws IOException, InterruptedException {
22
//		super.setup(context);
23
//	}
24
//
25
//	@Override
26
//	protected void map(final ImmutableBytesWritable key, final Result result, final Context context) throws IOException,
27
//			InterruptedException {
28
//
29
//		if (DedupUtils.isRoot(key)) { return; }
30
//
31
//		final DNGFRowKeyDecoder kd = DNGFRowKeyDecoder.decode(key.copyBytes());
32
//		context.getCounter("MAPPER: processed rows", kd.getType().name()).increment(1);
33
//
34
//		switch (kd.getType()) {
35
//		case publication:
36
//			processResult(result, context);
37
//			break;
38
//		case datasource:
39
//			processDatasource(result, context);
40
//			break;
41
//		}
42
//	}
43
//
44
//	private void processResult(final Result result,
45
//			final Context context) throws IOException, InterruptedException {
46
//		final byte[] body = result.getValue(Bytes.toBytes("result"), Bytes.toBytes("body"));
47
//		if (body == null) {
48
//			context.getCounter("MAPPER: errors", "result body is null").increment(1);
49
//			return;
50
//		}
51
//
52
//		final DNGF oaf = DNGF.parseFrom(body);
53
//		final Publication r = oaf.getEntity().getPublication();
54
//		if (r != null) {
55
//			for (Person p : r.getAuthorList()) {
56
//				for (KeyValue kv : oaf.getEntity().getCollectedfromList()) {
57
//					final String dsId = kv.getKey();
58
//					final String pValue = CalculatePersonDistributionUtils.createPersonValue(p);
59
//					if (StringUtils.isNotBlank(dsId) && StringUtils.isNotBlank(pValue)) {
60
//						context.write(new Text(dsId), new Text(pValue));
61
//						context.getCounter("MAPPER: emitted entities", "person").increment(1);
62
//					} else if (StringUtils.isNotBlank(dsId)) {
63
//						context.getCounter("MAPPER: errors", "person without firstname/secondnames").increment(1);
64
//					} else {
65
//						context.getCounter("MAPPER: errors", "collectedFrom is blank").increment(1);
66
//					}
67
//				}
68
//			}
69
//		}
70
//	}
71
//
72
//	private void processDatasource(final Result result,
73
//			final Context context) throws IOException, InterruptedException {
74
//		final byte[] body = result.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
75
//		if (body == null) {
76
//			context.getCounter("MAPPER: errors", "datasource body is null").increment(1);
77
//			return;
78
//		}
79
//
80
//		final DNGF oaf = DNGF.parseFrom(body);
81
//		final eu.dnetlib.data.proto.DatasourceProtos.Datasource ds = oaf.getEntity().getDatasource();
82
//		if (ds != null) {
83
//			context.write(new Text(oaf.getEntity().getId()), new Text(CalculatePersonDistributionUtils.createDsTypeValue(ds)));
84
//			context.getCounter("MAPPER: emitted entities", "datasource").increment(1);
85
//		}
86
//	}
87
//}
(1-1/5)