1
|
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation;
|
2
|
|
3
|
import com.google.gson.Gson;
|
4
|
import eu.dnetlib.data.mapreduce.JobParams;
|
5
|
import eu.dnetlib.data.mapreduce.util.OafDecoder;
|
6
|
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
|
7
|
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
|
8
|
import eu.dnetlib.data.proto.FieldTypeProtos;
|
9
|
import eu.dnetlib.data.proto.OafProtos;
|
10
|
import eu.dnetlib.utils.ontologies.Ontologies;
|
11
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
12
|
import org.apache.hadoop.hbase.mapreduce.TableReducer;
|
13
|
import org.apache.hadoop.hbase.util.Bytes;
|
14
|
|
15
|
import java.io.IOException;
|
16
|
import java.util.Iterator;
|
17
|
|
18
|
import static eu.dnetlib.data.mapreduce.util.DedupUtils.isRoot;
|
19
|
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.asPut;
|
20
|
|
21
|
/**
|
22
|
* Created by sandro on 2/24/17.
|
23
|
*/
|
24
|
public class DedupFixRelationReducer extends TableReducer<Key, ImmutableBytesWritable, ImmutableBytesWritable> {
|
25
|
|
26
|
public static final String COUNTER_GROUP = "Fix relations";
|
27
|
|
28
|
private Ontologies ontologies;
|
29
|
|
30
|
private boolean simulation;
|
31
|
|
32
|
@Override
|
33
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
34
|
super.setup(context);
|
35
|
|
36
|
simulation = context.getConfiguration().getBoolean("fixrel.simulation", false);
|
37
|
ontologies = new Gson().fromJson(context.getConfiguration().get(JobParams.ONTOLOGIES), Ontologies.class);
|
38
|
|
39
|
System.out.println("ontologies: " + ontologies.toJson(true));
|
40
|
}
|
41
|
|
42
|
@Override
|
43
|
protected void reduce(Key key, Iterable<ImmutableBytesWritable> values, Context context) {
|
44
|
if (isRoot(key.toString())) {
|
45
|
System.err.println("aborting DedupFixRelationReducer, found root key: " + key);
|
46
|
context.getCounter(COUNTER_GROUP, "aborted").increment(1);
|
47
|
return;
|
48
|
}
|
49
|
|
50
|
final Iterator<ImmutableBytesWritable> it = values.iterator();
|
51
|
final OafProtos.Oaf first = OafDecoder.decode(it.next().copyBytes()).getOaf();
|
52
|
|
53
|
if (!first.getRel().getRelClass().equals("merges")) {
|
54
|
context.getCounter(COUNTER_GROUP, "Relation skipped").increment(1);
|
55
|
return;
|
56
|
}
|
57
|
context.getCounter(COUNTER_GROUP, "Item to fix").increment(1);
|
58
|
|
59
|
final String dedupRoot = first.getRel().getSource();
|
60
|
it.forEachRemaining(b -> {
|
61
|
try {
|
62
|
handleRels(context, OafDecoder.decode(b.copyBytes()).getOaf(), dedupRoot);
|
63
|
} catch (Exception e) {
|
64
|
throw new RuntimeException(e);
|
65
|
}
|
66
|
});
|
67
|
}
|
68
|
|
69
|
private void handleRels(final Context context, final OafProtos.Oaf oaf, final String dedupRoot) throws IOException, InterruptedException {
|
70
|
|
71
|
final String relType = oaf.getRel().getRelClass();
|
72
|
if (relType.contains("merges")) {
|
73
|
return;
|
74
|
}
|
75
|
|
76
|
// Set relation deleted by inference from Root to entity that has been merged to another one
|
77
|
final FieldTypeProtos.DataInfo.Builder dataInfoBuilder = FieldTypeProtos.DataInfo.newBuilder(oaf.getDataInfo());
|
78
|
dataInfoBuilder.setDeletedbyinference(true);
|
79
|
OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf);
|
80
|
builder.setDataInfo(dataInfoBuilder.build());
|
81
|
final String sourceKey = oaf.getRel().getSource();
|
82
|
write(context, sourceKey, builder);
|
83
|
|
84
|
String relGroup = oaf.getRel().getRelType().toString() + "_" + oaf.getRel().getSubRelType().toString();
|
85
|
context.getCounter(COUNTER_GROUP, String.format("%s - Relation deleted", relGroup)).increment(1);
|
86
|
|
87
|
// Create Relation from Root Entity to its deduplicated Entity
|
88
|
builder = OafProtos.Oaf.newBuilder(oaf);
|
89
|
OafProtos.OafRel.Builder relBuilder = OafProtos.OafRel.newBuilder(oaf.getRel());
|
90
|
relBuilder.setTarget(dedupRoot);
|
91
|
builder.setRel(relBuilder.build());
|
92
|
write(context, sourceKey, builder);
|
93
|
|
94
|
// Create Relation from deduplicated Entity to Root Entity
|
95
|
relBuilder = OafProtos.OafRel.newBuilder(oaf.getRel());
|
96
|
relBuilder.setTarget(relBuilder.getSource());
|
97
|
relBuilder.setSource(dedupRoot);
|
98
|
|
99
|
RelDescriptor rd = OafRelDecoder.decode(oaf.getRel()).getRelDescriptor();
|
100
|
|
101
|
final String inverseRelation = ontologies.get(rd.getRelType().toString()).inverseOf(rd.getRelClass());
|
102
|
relBuilder.setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType()).setRelClass(inverseRelation);
|
103
|
|
104
|
builder.setRel(relBuilder.build());
|
105
|
write(context, dedupRoot, builder);
|
106
|
|
107
|
relGroup = oaf.getRel().getRelType().toString() + "_" + oaf.getRel().getSubRelType().toString();
|
108
|
context.getCounter(COUNTER_GROUP, String.format("%s - Relation fixed", relGroup)).increment(2);
|
109
|
}
|
110
|
|
111
|
private void write(Context context, String dedupRoot, OafProtos.Oaf.Builder builder) throws IOException, InterruptedException {
|
112
|
if (!simulation) {
|
113
|
context.write(new ImmutableBytesWritable(Bytes.toBytes(dedupRoot)), asPut(builder.build()));
|
114
|
}
|
115
|
}
|
116
|
|
117
|
}
|