Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.mapreduce.hbase.dedup;
2
3
import java.io.IOException;
4
import java.util.Queue;
5
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8
import com.google.protobuf.InvalidProtocolBufferException;
9 29702 claudio.at
import eu.dnetlib.data.mapreduce.JobParams;
10 28094 claudio.at
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11 26600 sandro.lab
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12 35128 claudio.at
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
13 26600 sandro.lab
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
14 28094 claudio.at
import eu.dnetlib.data.proto.DedupProtos.Dedup;
15 26600 sandro.lab
import eu.dnetlib.data.proto.KindProtos.Kind;
16
import eu.dnetlib.data.proto.OafProtos.Oaf;
17
import eu.dnetlib.data.proto.OafProtos.OafRel;
18 28094 claudio.at
import eu.dnetlib.data.proto.TypeProtos.Type;
19 40205 claudio.at
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
20 36670 claudio.at
import eu.dnetlib.pace.config.DedupConfig;
21 40205 claudio.at
import org.apache.commons.lang.StringUtils;
22
import org.apache.hadoop.hbase.client.Put;
23
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
24
import org.apache.hadoop.hbase.mapreduce.TableReducer;
25
import org.apache.hadoop.hbase.util.Bytes;
26
import org.apache.hadoop.io.Text;
27 26600 sandro.lab
28 38059 claudio.at
public class SimpleDedupPersonReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
29 26600 sandro.lab
30
	private static final int MAX_Q_SIZE = 3000;
31
32
	private DedupConfig dedupConf;
33
34 40205 claudio.at
	public static String findMin(final Iterable<String> keys) {
35
		String min = Iterables.getFirst(keys, null);
36
		for (final String iq : keys) {
37
			if (min.compareTo(iq) > 0) {
38
				min = iq;
39
			}
40
		}
41
		return min;
42
	}
43
44 26600 sandro.lab
	@Override
45
	protected void setup(final Context context) throws IOException, InterruptedException {
46 36670 claudio.at
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
47 26600 sandro.lab
	}
48
49
	@Override
50
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
51
		try {
52 35128 claudio.at
			final Queue<OafDecoder> q = prepare(key, values, context);
53 26600 sandro.lab
			if (q.size() > 1) {
54 29657 claudio.at
55 29702 claudio.at
				if (q.size() < JobParams.MAX_COUNTERS) {
56 36670 claudio.at
					context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(q.size())).increment(1);
57 29657 claudio.at
				} else {
58 36670 claudio.at
					context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
59 29657 claudio.at
				}
60 35128 claudio.at
				final String min = findMin(Iterables.transform(q, eu.dnetlib.data.transform.OafUtils.idDecoder()));
61 26600 sandro.lab
				if (min == null) {
62 36670 claudio.at
					context.getCounter(dedupConf.getWf().getEntityType(), "unable to find min").increment(1);
63 26600 sandro.lab
					return;
64
				}
65 36670 claudio.at
				final String rootId = DedupUtils.newId(min, dedupConf.getWf().getDedupRun());
66 26600 sandro.lab
67
				while (!q.isEmpty()) {
68
					markDuplicate(context, rootId, q.remove());
69
				}
70
			} else {
71 36670 claudio.at
				context.getCounter(dedupConf.getWf().getEntityType(), "1").increment(1);
72 26600 sandro.lab
			}
73 35128 claudio.at
		} catch (final Throwable e) {
74 26600 sandro.lab
			System.out.println("GOT EX " + e);
75
			e.printStackTrace(System.err);
76 36670 claudio.at
			context.getCounter(dedupConf.getWf().getEntityType(), e.getClass().toString()).increment(1);
77 26600 sandro.lab
		}
78
	}
79
80
	private Queue<OafDecoder> prepare(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) {
81 35128 claudio.at
		final Queue<OafDecoder> q = Lists.newLinkedList();
82
		for (final OafDecoder decoder : Iterables.transform(values, OafHbaseUtils.decoder())) {
83 26600 sandro.lab
			q.add(decoder);
84
			if (q.size() >= MAX_Q_SIZE) {
85 43534 alessia.ba
				//COMMENTED OUT TO AVOID THE JOB FAILS BECAUSE OF MAX NUMBER OF COUNTERS REACHED
86
				//context.getCounter("[" + key.toString() + "]", "size > " + MAX_Q_SIZE).increment(1);
87 26600 sandro.lab
				break;
88
			}
89
		}
90
		return q;
91
	}
92
93
	private void markDuplicate(final Context context, final String rootId, final OafDecoder decoder) throws InvalidProtocolBufferException, IOException,
94
			InterruptedException {
95
96 35128 claudio.at
		final Oaf.Builder builder = Oaf.newBuilder(decoder.getOaf());
97 37824 claudio.at
		builder.getDataInfoBuilder().setDeletedbyinference(true).setInferenceprovenance(dedupConf.getWf().getConfigurationId());
98 26600 sandro.lab
99 35128 claudio.at
		final Oaf oaf = builder.build();
100
		final byte[] oafId = Bytes.toBytes(oaf.getEntity().getId());
101 26600 sandro.lab
102
		// writes the body, marked as deleted
103 36670 claudio.at
		final String entityName = dedupConf.getWf().getEntityType();
104 28308 claudio.at
		emit(context, oafId, entityName, DedupUtils.BODY_B, oaf.toByteArray());
105 28094 claudio.at
		context.getCounter(entityName, "marked as deleted").increment(1);
106 26600 sandro.lab
107
		// writes the dedupRels in both directions
108 35128 claudio.at
		final Type entityType = Type.valueOf(entityName);
109
		final byte[] rowkey = Bytes.toBytes(rootId);
110 28094 claudio.at
111 35128 claudio.at
		final String merges = DedupUtils.getDedupCF_merges(entityType);
112 28308 claudio.at
		emit(context, rowkey, merges, oafId, buildRel(rowkey, oafId, Dedup.RelName.merges));
113 28094 claudio.at
114 35128 claudio.at
		final String mergedIn = DedupUtils.getDedupCF_mergedIn(entityType);
115 28308 claudio.at
		emit(context, oafId, mergedIn, rowkey, buildRel(oafId, rowkey, Dedup.RelName.isMergedIn));
116 28094 claudio.at
117 28411 claudio.at
		context.getCounter(entityName, merges).increment(1);
118
		context.getCounter(entityName, mergedIn).increment(1);
119 26600 sandro.lab
	}
120
121
	private void emit(final Context context, final byte[] rowkey, final String family, final byte[] qualifier, final byte[] value) throws IOException,
122
			InterruptedException {
123
124 35128 claudio.at
		final Put put = new Put(OafRowKeyDecoder.decode(rowkey).getKey().getBytes());
125 42590 claudio.at
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
126 26600 sandro.lab
		put.add(Bytes.toBytes(family), qualifier, value);
127
128
		context.write(new ImmutableBytesWritable(rowkey), put);
129
	}
130
131 28308 claudio.at
	private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
132 35128 claudio.at
		final OafRel.Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
133
		final Oaf oaf =
134 37824 claudio.at
				Oaf.newBuilder()
135
				.setKind(Kind.relation)
136 40314 claudio.at
				.setLastupdatetimestamp(System.currentTimeMillis())
137 37824 claudio.at
						.setDataInfo(
138 40205 claudio.at
								AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance(
139 37824 claudio.at
								dedupConf.getWf().getConfigurationId())).setRel(oafRel)
140
								.build();
141 26600 sandro.lab
		return oaf.toByteArray();
142
	}
143
144
	private String lpad(final int s) {
145
		return StringUtils.leftPad(String.valueOf(s), String.valueOf(MAX_Q_SIZE).length());
146
	}
147
148
}