1
|
package eu.dnetlib.data.mapreduce.hbase.dedup;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.Queue;
|
5
|
|
6
|
import com.google.common.collect.Iterables;
|
7
|
import com.google.common.collect.Lists;
|
8
|
import com.google.protobuf.InvalidProtocolBufferException;
|
9
|
import eu.dnetlib.data.mapreduce.JobParams;
|
10
|
import eu.dnetlib.data.mapreduce.util.DedupUtils;
|
11
|
import eu.dnetlib.data.mapreduce.util.OafDecoder;
|
12
|
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
|
13
|
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
|
14
|
import eu.dnetlib.data.proto.DedupProtos.Dedup;
|
15
|
import eu.dnetlib.data.proto.KindProtos.Kind;
|
16
|
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
17
|
import eu.dnetlib.data.proto.OafProtos.OafRel;
|
18
|
import eu.dnetlib.data.proto.TypeProtos.Type;
|
19
|
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
|
20
|
import eu.dnetlib.pace.config.DedupConfig;
|
21
|
import org.apache.commons.lang.StringUtils;
|
22
|
import org.apache.hadoop.hbase.client.Durability;
|
23
|
import org.apache.hadoop.hbase.client.Put;
|
24
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
25
|
import org.apache.hadoop.hbase.mapreduce.TableReducer;
|
26
|
import org.apache.hadoop.hbase.util.Bytes;
|
27
|
import org.apache.hadoop.io.Text;
|
28
|
|
29
|
public class SimpleDedupPersonReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
|
30
|
|
31
|
private static final int MAX_Q_SIZE = 3000;
|
32
|
|
33
|
private DedupConfig dedupConf;
|
34
|
|
35
|
public static String findMin(final Iterable<String> keys) {
|
36
|
String min = Iterables.getFirst(keys, null);
|
37
|
for (final String iq : keys) {
|
38
|
if (min.compareTo(iq) > 0) {
|
39
|
min = iq;
|
40
|
}
|
41
|
}
|
42
|
return min;
|
43
|
}
|
44
|
|
45
|
@Override
|
46
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
47
|
dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
|
48
|
}
|
49
|
|
50
|
@Override
|
51
|
protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
|
52
|
try {
|
53
|
final Queue<OafDecoder> q = prepare(key, values, context);
|
54
|
if (q.size() > 1) {
|
55
|
|
56
|
if (q.size() < JobParams.MAX_COUNTERS) {
|
57
|
context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(q.size())).increment(1);
|
58
|
} else {
|
59
|
context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
|
60
|
}
|
61
|
final String min = findMin(Iterables.transform(q, eu.dnetlib.data.transform.OafUtils.idDecoder()));
|
62
|
if (min == null) {
|
63
|
context.getCounter(dedupConf.getWf().getEntityType(), "unable to find min").increment(1);
|
64
|
return;
|
65
|
}
|
66
|
final String rootId = DedupUtils.newId(min, dedupConf.getWf().getDedupRun());
|
67
|
|
68
|
while (!q.isEmpty()) {
|
69
|
markDuplicate(context, rootId, q.remove());
|
70
|
}
|
71
|
} else {
|
72
|
context.getCounter(dedupConf.getWf().getEntityType(), "1").increment(1);
|
73
|
}
|
74
|
} catch (final Throwable e) {
|
75
|
System.out.println("GOT EX " + e);
|
76
|
e.printStackTrace(System.err);
|
77
|
context.getCounter(dedupConf.getWf().getEntityType(), e.getClass().toString()).increment(1);
|
78
|
}
|
79
|
}
|
80
|
|
81
|
private Queue<OafDecoder> prepare(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) {
|
82
|
final Queue<OafDecoder> q = Lists.newLinkedList();
|
83
|
for (final OafDecoder decoder : Iterables.transform(values, OafHbaseUtils.decoder())) {
|
84
|
q.add(decoder);
|
85
|
if (q.size() >= MAX_Q_SIZE) {
|
86
|
//COMMENTED OUT TO AVOID THE JOB FAILS BECAUSE OF MAX NUMBER OF COUNTERS REACHED
|
87
|
//context.getCounter("[" + key.toString() + "]", "size > " + MAX_Q_SIZE).increment(1);
|
88
|
break;
|
89
|
}
|
90
|
}
|
91
|
return q;
|
92
|
}
|
93
|
|
94
|
private void markDuplicate(final Context context, final String rootId, final OafDecoder decoder) throws InvalidProtocolBufferException, IOException,
|
95
|
InterruptedException {
|
96
|
|
97
|
final Oaf.Builder builder = Oaf.newBuilder(decoder.getOaf());
|
98
|
builder.getDataInfoBuilder().setDeletedbyinference(true).setInferenceprovenance(dedupConf.getWf().getConfigurationId());
|
99
|
|
100
|
final Oaf oaf = builder.build();
|
101
|
final byte[] oafId = Bytes.toBytes(oaf.getEntity().getId());
|
102
|
|
103
|
// writes the body, marked as deleted
|
104
|
final String entityName = dedupConf.getWf().getEntityType();
|
105
|
emit(context, oafId, entityName, DedupUtils.BODY_B, oaf.toByteArray());
|
106
|
context.getCounter(entityName, "marked as deleted").increment(1);
|
107
|
|
108
|
// writes the dedupRels in both directions
|
109
|
final Type entityType = Type.valueOf(entityName);
|
110
|
final byte[] rowkey = Bytes.toBytes(rootId);
|
111
|
|
112
|
final String merges = DedupUtils.getDedupCF_merges(entityType);
|
113
|
emit(context, rowkey, merges, oafId, buildRel(rowkey, oafId, Dedup.RelName.merges));
|
114
|
|
115
|
final String mergedIn = DedupUtils.getDedupCF_mergedIn(entityType);
|
116
|
emit(context, oafId, mergedIn, rowkey, buildRel(oafId, rowkey, Dedup.RelName.isMergedIn));
|
117
|
|
118
|
context.getCounter(entityName, merges).increment(1);
|
119
|
context.getCounter(entityName, mergedIn).increment(1);
|
120
|
}
|
121
|
|
122
|
private void emit(final Context context, final byte[] rowkey, final String family, final byte[] qualifier, final byte[] value) throws IOException,
|
123
|
InterruptedException {
|
124
|
|
125
|
final Put put = new Put(OafRowKeyDecoder.decode(rowkey).getKey().getBytes());
|
126
|
put.setDurability(Durability.USE_DEFAULT);
|
127
|
put.add(Bytes.toBytes(family), qualifier, value);
|
128
|
|
129
|
context.write(new ImmutableBytesWritable(rowkey), put);
|
130
|
}
|
131
|
|
132
|
private byte[] buildRel(final byte[] from, final byte[] to, final Dedup.RelName relClass) {
|
133
|
final OafRel.Builder oafRel = DedupUtils.getDedup(dedupConf, new String(from), new String(to), relClass);
|
134
|
final Oaf oaf =
|
135
|
Oaf.newBuilder()
|
136
|
.setKind(Kind.relation)
|
137
|
.setLastupdatetimestamp(System.currentTimeMillis())
|
138
|
.setDataInfo(
|
139
|
AbstractDNetXsltFunctions.getDataInfo(null, "", "0.8", false, true).setInferenceprovenance(
|
140
|
dedupConf.getWf().getConfigurationId())).setRel(oafRel)
|
141
|
.build();
|
142
|
return oaf.toByteArray();
|
143
|
}
|
144
|
|
145
|
private String lpad(final int s) {
|
146
|
return StringUtils.leftPad(String.valueOf(s), String.valueOf(MAX_Q_SIZE).length());
|
147
|
}
|
148
|
|
149
|
}
|