Project

General

Profile

1 57510 michele.ar
package eu.dnetlib.data.mapreduce.hbase.openorgs;
2
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.Set;
6
7
import org.apache.commons.lang3.StringUtils;
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12
13
import com.google.common.collect.Iterables;
14
15
import eu.dnetlib.data.mapreduce.util.DedupUtils;
16
import eu.dnetlib.data.proto.OafProtos.Oaf;
17
18
public abstract class AbstractOpenOrgsMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
19
20
	private ImmutableBytesWritable keyOut;
21
	private ImmutableBytesWritable valueOut;
22
23
	@Override
24
	protected void setup(final Context context) throws IOException, InterruptedException {
25
		super.setup(context);
26
		keyOut = new ImmutableBytesWritable();
27
		valueOut = new ImmutableBytesWritable();
28
	}
29
30
	@Override
31
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
32
		try {
33
			if (DedupUtils.isRoot(keyIn)) {
34
				context.getCounter("organization", "dedupwf_____ row skipped").increment(1);
35
				return;
36
			}
37
38
			if (OpenOrgsCommon.isOpenOrgsMesh(keyIn)) {
39
				context.getCounter("organization", "openorgsmesh row skipped").increment(1);
40
				return;
41
			}
42
43
			final byte[] bytes = value.getFamilyMap(Bytes.toBytes("organization")).get(Bytes.toBytes("body"));
44
			if (bytes == null) {
45
				context.getCounter("organization", "empty body").increment(1);
46
				return;
47
			}
48
49
			final Set<byte[]> roots = value.getFamilyMap(Bytes.toBytes("organizationOrganization_dedup_isMergedIn")).keySet();
50
51
			if (roots.isEmpty()) {
52
				context.getCounter("organization", "missing mergedIn relationship").increment(1);
53
			}
54
			if (roots.size() > 1) {
55
				context.getCounter("organization", "too many mergedIn relationship").increment(1);
56
			}
57
58
			final String root = roots.isEmpty() ? null : new String(Iterables.getOnlyElement(roots), Charset.forName("UTF-8"));
59
60
			final String keyOutString = calculateKeyOut(Oaf.parseFrom(bytes), root, context);
61
62
			if (StringUtils.isNotBlank(keyOutString)) {
63
				emit(keyOutString, bytes, context);
64
			}
65
66
		} catch (final Exception e) {
67
			throw new RuntimeException(e);
68
		}
69
	}
70
71
	protected abstract String calculateKeyOut(Oaf oaf, String rootId, Context context);
72
73
	private void emit(final String key, final byte[] value, final Context context) {
74
		keyOut.set(key.getBytes());
75
		valueOut.set(value);
76
		try {
77
			context.write(keyOut, valueOut);
78
		} catch (IOException | InterruptedException e) {
79
			throw new RuntimeException(e);
80
		}
81
	}
82
83
}