1 |
26600
|
sandro.lab
|
package eu.dnetlib.data.mapreduce.hbase.dedup;
|
2 |
|
|
|
3 |
|
|
import java.io.IOException;
|
4 |
|
|
import java.util.Queue;
|
5 |
|
|
|
6 |
|
|
import com.google.common.collect.Iterables;
|
7 |
|
|
import com.google.common.collect.Lists;
|
8 |
|
|
import com.google.protobuf.InvalidProtocolBufferException;
|
9 |
29702
|
claudio.at
|
import eu.dnetlib.data.mapreduce.JobParams;
|
10 |
28094
|
claudio.at
|
import eu.dnetlib.data.mapreduce.util.DedupUtils;
|
11 |
26600
|
sandro.lab
|
import eu.dnetlib.data.mapreduce.util.OafDecoder;
|
12 |
35128
|
claudio.at
|
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
|
13 |
26600
|
sandro.lab
|
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
|
14 |
28094
|
claudio.at
|
import eu.dnetlib.data.proto.DedupProtos.Dedup;
|
15 |
26600
|
sandro.lab
|
import eu.dnetlib.data.proto.KindProtos.Kind;
|
16 |
|
|
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
17 |
|
|
import eu.dnetlib.data.proto.OafProtos.OafRel;
|
18 |
28094
|
claudio.at
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
19 |
40205
|
claudio.at
|
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
|
20 |
36670
|
claudio.at
|
import eu.dnetlib.pace.config.DedupConfig;
|
21 |
40205
|
claudio.at
|
import org.apache.commons.lang.StringUtils;
|
22 |
|
|
import org.apache.hadoop.hbase.client.Put;
|
23 |
|
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
24 |
|
|
import org.apache.hadoop.hbase.mapreduce.TableReducer;
|
25 |
|
|
import org.apache.hadoop.hbase.util.Bytes;
|
26 |
|
|
import org.apache.hadoop.io.Text;
|
27 |
26600
|
sandro.lab
|
|
28 |
38059
|
claudio.at
|
public class SimpleDedupPersonReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
|
29 |
26600
|
sandro.lab
|
|
30 |
|
|
private static final int MAX_Q_SIZE = 3000;
|
31 |
|
|
|
32 |
|
|
private DedupConfig dedupConf;
|
33 |
|
|
|
34 |
40205
|
claudio.at
|
public static String findMin(final Iterable<String> keys) {
|
35 |
|
|
String min = Iterables.getFirst(keys, null);
|
36 |
|
|
for (final String iq : keys) {
|
37 |
|
|
if (min.compareTo(iq) > 0) {
|
38 |
|
|
min = iq;
|
39 |
|
|
}
|
40 |
|
|
}
|
41 |
|
|
return min;
|
42 |
|
|
}
|
43 |
|
|
|
44 |
26600
|
sandro.lab
|
@Override
|
45 |
|
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
46 |
36670
|
claudio.at
|
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
|
47 |
26600
|
sandro.lab
|
}
|
48 |
|
|
|
49 |
|
|
@Override
|
50 |
|
|
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
|
51 |
|
|
try {
|
52 |
35128
|
claudio.at
|
final Queue<OafDecoder> q = prepare(key, values, context);
|
53 |
26600
|
sandro.lab
|
if (q.size() > 1) {
|
54 |
29657
|
claudio.at
|
|
55 |
29702
|
claudio.at
|
if (q.size() < JobParams.MAX_COUNTERS) {
|
56 |
36670
|
claudio.at
|
context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(q.size())).increment(1);
|
57 |
29657
|
claudio.at
|
} else {
|
58 |
36670
|
claudio.at
|
context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
|
59 |
29657
|
claudio.at
|
}
|
60 |
35128
|
claudio.at
|
final String min = findMin(Iterables.transform(q, eu.dnetlib.data.transform.OafUtils.idDecoder()));
|
61 |
26600
|
sandro.lab
|
if (min == null) {
|
62 |
36670
|
claudio.at
|
context.getCounter(dedupConf.getWf().getEntityType(), "unable to find min").increment(1);
|
63 |
26600
|
sandro.lab
|
return;
|
64 |
|
|
}
|
65 |
36670
|
claudio.at
|
final String rootId = DedupUtils.newId(min, dedupConf.getWf().getDedupRun());
|
66 |
26600
|
sandro.lab
|
|
67 |
|
|
while (!q.isEmpty()) {
|
68 |
|
|
markDuplicate(context, rootId, q.remove());
|
69 |
|
|
}
|
70 |
|
|
} else {
|
71 |
36670
|
claudio.at
|
context.getCounter(dedupConf.getWf().getEntityType(), "1").increment(1);
|
72 |
26600
|
sandro.lab
|
}
|
73 |
35128
|
claudio.at
|
} catch (final Throwable e) {
|
74 |
26600
|
sandro.lab
|
System.out.println("GOT EX " + e);
|
75 |
|
|
e.printStackTrace(System.err);
|
76 |
36670
|
claudio.at
|
context.getCounter(dedupConf.getWf().getEntityType(), e.getClass().toString()).increment(1);
|
77 |
26600
|
sandro.lab
|
}
|
78 |
|
|
}
|
79 |
|
|
|
80 |
|
|
private Queue<OafDecoder> prepare(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) {
|
81 |
35128
|
claudio.at
|
final Queue<OafDecoder> q = Lists.newLinkedList();
|
82 |
|
|
for (final OafDecoder decoder : Iterables.transform(values, OafHbaseUtils.decoder())) {
|
83 |
26600
|
sandro.lab
|
q.add(decoder);
|
84 |
|
|
if (q.size() >= MAX_Q_SIZE) {
|
85 |
43534
|
alessia.ba
|
//COMMENTED OUT TO AVOID THE JOB FAILS BECAUSE OF MAX NUMBER OF COUNTERS REACHED
|
86 |
|
|
//context.getCounter("[" + key.toString() + "]", "size > " + MAX_Q_SIZE).increment(1);
|
87 |
26600
|
sandro.lab
|
break;
|
88 |
|
|
}
|
89 |
|
|
}
|
90 |
|
|
return q;
|
91 |
|
|
}
|
92 |
|
|
|
93 |
|
|
private void markDuplicate(final Context context, final String rootId, final OafDecoder decoder) throws InvalidProtocolBufferException, IOException,
|
94 |
|
|
InterruptedException {
|
95 |
|
|
|
96 |
35128
|
claudio.at
|
final Oaf.Builder builder = Oaf.newBuilder(decoder.getOaf());
|
97 |
37824
|
claudio.at
|
builder.getDataInfoBuilder().setDeletedbyinference(true).setInferenceprovenance(dedupConf.getWf().getConfigurationId());
|
98 |
26600
|
sandro.lab
|
|
99 |
35128
|
claudio.at
|
final Oaf oaf = builder.build();
|
100 |
|
|
final byte[] oafId = Bytes.toBytes(oaf.getEntity().getId());
|
101 |
26600
|
sandro.lab
|
|
102 |
|
|
// writes the body, marked as deleted
|
103 |
36670
|
claudio.at
|
final String entityName = dedupConf.getWf().getEntityType();
|
104 |
28308
|
claudio.at
|
emit(context, oafId, entityName, DedupUtils.BODY_B, oaf.toByteArray());
|
105 |
28094
|
claudio.at
|
context.getCounter(entityName, "marked as deleted").increment(1);
|
106 |
26600
|
sandro.lab
|
|
107 |
|
|
// writes the dedupRels in both directions
|
108 |
35128
|
claudio.at
|
final Type entityType = Type.valueOf(entityName);
|
109 |
|
|
final byte[] rowkey = Bytes.toBytes(rootId);
|
110 |
28094
|
claudio.at
|
|
111 |
35128
|
claudio.at
|
final String merges = DedupUtils.getDedupCF_merges(entityType);
|
112 |
28308
|
claudio.at
|
emit(context, rowkey, merges, oafId, buildRel(rowkey, oafId, Dedup.RelName.merges));
|
113 |
28094
|
claudio.at
|
|
114 |
35128
|
claudio.at
|
final String mergedIn = DedupUtils.getDedupCF_mergedIn(entityType);
|
115 |
28308
|
claudio.at
|
emit(context, oafId, mergedIn, rowkey, buildRel(oafId, rowkey, Dedup.RelName.isMergedIn));
|
116 |
28094
|
claudio.at
|
|
117 |
28411
|
claudio.at
|
context.getCounter(entityName, merges).increment(1);
|
118 |
|
|
context.getCounter(entityName, mergedIn).increment(1);
|
119 |
26600
|
sandro.lab
|
}
|
120 |
|
|
|
121 |
|
|
private void emit(final Context context, final byte[] rowkey, final String family, final byte[] qualifier, final byte[] value) throws IOException,
|
122 |
|
|
InterruptedException {
|
123 |
|
|
|
124 |
35128
|
claudio.at
|
final Put put = new Put(OafRowKeyDecoder.decode(rowkey).getKey().getBytes());
|
125 |
42590
|
claudio.at
|
put.setWriteToWAL(JobParams.WRITE_TO_WAL);
|
126 |
26600
|
sandro.lab
|
put.add(Bytes.toBytes(family), qualifier, value);
|
127 |
|
|
|
128 |
|
|
context.write(new ImmutableBytesWritable(rowkey), put);
|
129 |
|
|
}
|
130 |
|
|
|
131 |
28308
|
claudio.at
|
private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
|
132 |
35128
|
claudio.at
|
final OafRel.Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
|
133 |
|
|
final Oaf oaf =
|
134 |
37824
|
claudio.at
|
Oaf.newBuilder()
|
135 |
|
|
.setKind(Kind.relation)
|
136 |
40314
|
claudio.at
|
.setLastupdatetimestamp(System.currentTimeMillis())
|
137 |
37824
|
claudio.at
|
.setDataInfo(
|
138 |
40205
|
claudio.at
|
AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance(
|
139 |
37824
|
claudio.at
|
dedupConf.getWf().getConfigurationId())).setRel(oafRel)
|
140 |
|
|
.build();
|
141 |
26600
|
sandro.lab
|
return oaf.toByteArray();
|
142 |
|
|
}
|
143 |
|
|
|
144 |
|
|
private String lpad(final int s) {
|
145 |
|
|
return StringUtils.leftPad(String.valueOf(s), String.valueOf(MAX_Q_SIZE).length());
|
146 |
|
|
}
|
147 |
|
|
|
148 |
|
|
}
|