Revision 54507
Added by Claudio Atzori almost 6 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/DedupFixRelationReducer.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation; |
|
2 |
|
|
3 |
import com.google.common.collect.Iterables; |
|
4 |
import com.google.gson.Gson; |
|
5 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
6 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
7 |
import eu.dnetlib.data.mapreduce.util.OafRelDecoder; |
|
8 |
import eu.dnetlib.data.mapreduce.util.RelDescriptor; |
|
9 |
import eu.dnetlib.data.proto.FieldTypeProtos; |
|
10 |
import eu.dnetlib.data.proto.OafProtos; |
|
11 |
import eu.dnetlib.utils.ontologies.Ontologies; |
|
12 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
13 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
|
14 |
import org.apache.hadoop.hbase.util.Bytes; |
|
15 |
import org.apache.hadoop.io.Writable; |
|
16 |
import org.apache.hadoop.mapreduce.Reducer; |
|
17 |
|
|
18 |
import java.io.IOException; |
|
19 |
import java.util.Iterator; |
|
20 |
|
|
21 |
import static eu.dnetlib.data.mapreduce.util.DedupUtils.isRoot; |
|
22 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.asPut; |
|
23 |
|
|
24 |
/** |
|
25 |
* Created by sandro on 2/24/17. |
|
26 |
*/ |
|
27 |
public class DedupFixRelationReducer extends TableReducer<Key, ImmutableBytesWritable, ImmutableBytesWritable> { |
|
28 |
|
|
29 |
public static final String COUNTER_GROUP = "Fix relations"; |
|
30 |
|
|
31 |
private Ontologies ontologies; |
|
32 |
|
|
33 |
private boolean simulation; |
|
34 |
|
|
35 |
@Override |
|
36 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
37 |
super.setup(context); |
|
38 |
|
|
39 |
simulation = context.getConfiguration().getBoolean("fixrel.simulation", false); |
|
40 |
ontologies = new Gson().fromJson(context.getConfiguration().get(JobParams.ONTOLOGIES), Ontologies.class); |
|
41 |
|
|
42 |
System.out.println("ontologies: " + ontologies.toJson(true)); |
|
43 |
} |
|
44 |
|
|
45 |
@Override |
|
46 |
protected void reduce(Key key, Iterable<ImmutableBytesWritable> values, Context context) { |
|
47 |
if (isRoot(key.toString())) { |
|
48 |
System.err.println("aborting DedupFixRelationReducer, found root key: " + key); |
|
49 |
context.getCounter(COUNTER_GROUP, "aborted").increment(1); |
|
50 |
return; |
|
51 |
} |
|
52 |
|
|
53 |
final Iterator<ImmutableBytesWritable> it = values.iterator(); |
|
54 |
final OafProtos.Oaf first = OafDecoder.decode(it.next().copyBytes()).getOaf(); |
|
55 |
|
|
56 |
if (!first.getRel().getRelClass().equals("merges")) { |
|
57 |
context.getCounter(COUNTER_GROUP, "Relation skipped").increment(1); |
|
58 |
return; |
|
59 |
} |
|
60 |
context.getCounter(COUNTER_GROUP, "Item to fix").increment(1); |
|
61 |
|
|
62 |
final String dedupRoot = first.getRel().getSource(); |
|
63 |
it.forEachRemaining(b -> { |
|
64 |
try { |
|
65 |
handleRels(context, OafDecoder.decode(b.copyBytes()).getOaf(), dedupRoot); |
|
66 |
} catch (Exception e) { |
|
67 |
throw new RuntimeException(e); |
|
68 |
} |
|
69 |
}); |
|
70 |
} |
|
71 |
|
|
72 |
private void handleRels(final Context context, final OafProtos.Oaf oaf, final String dedupRoot) throws IOException, InterruptedException { |
|
73 |
|
|
74 |
final String relType = oaf.getRel().getRelClass(); |
|
75 |
if (relType.contains("merges")) { |
|
76 |
return; |
|
77 |
} |
|
78 |
|
|
79 |
// Set relation deleted by inference from Root to entity that has been merged to another one |
|
80 |
final FieldTypeProtos.DataInfo.Builder dataInfoBuilder = FieldTypeProtos.DataInfo.newBuilder(oaf.getDataInfo()); |
|
81 |
dataInfoBuilder.setDeletedbyinference(true); |
|
82 |
OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf); |
|
83 |
builder.setDataInfo(dataInfoBuilder.build()); |
|
84 |
final String sourceKey = oaf.getRel().getSource(); |
|
85 |
write(context, sourceKey, builder); |
|
86 |
|
|
87 |
String relGroup = oaf.getRel().getRelType().toString() + oaf.getRel().getSubRelType().toString(); |
|
88 |
context.getCounter(COUNTER_GROUP, String.format("%s - Relation set deleted", relGroup)).increment(1); |
|
89 |
|
|
90 |
// Create Relation from Root Entity to its deduplicated Entity |
|
91 |
builder = OafProtos.Oaf.newBuilder(oaf); |
|
92 |
OafProtos.OafRel.Builder relBuilder = OafProtos.OafRel.newBuilder(oaf.getRel()); |
|
93 |
relBuilder.setTarget(dedupRoot); |
|
94 |
builder.setRel(relBuilder.build()); |
|
95 |
write(context, sourceKey, builder); |
|
96 |
|
|
97 |
// Create Relation from deduplicated Entity to Root Entity |
|
98 |
relBuilder = OafProtos.OafRel.newBuilder(oaf.getRel()); |
|
99 |
relBuilder.setTarget(relBuilder.getSource()); |
|
100 |
relBuilder.setSource(dedupRoot); |
|
101 |
|
|
102 |
RelDescriptor rd = OafRelDecoder.decode(oaf.getRel()).getRelDescriptor(); |
|
103 |
|
|
104 |
final String inverseRelation = ontologies.get(rd.getRelType().toString()).inverseOf(rd.getRelClass()); |
|
105 |
relBuilder.setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType()).setRelClass(inverseRelation); |
|
106 |
|
|
107 |
builder.setRel(relBuilder.build()); |
|
108 |
write(context, dedupRoot, builder); |
|
109 |
|
|
110 |
relGroup = oaf.getRel().getRelType().toString() + oaf.getRel().getSubRelType().toString(); |
|
111 |
context.getCounter(COUNTER_GROUP, String.format("%s - Relation fixed", relGroup)).increment(2); |
|
112 |
} |
|
113 |
|
|
114 |
private void write(Context context, String dedupRoot, OafProtos.Oaf.Builder builder) throws IOException, InterruptedException { |
|
115 |
if (!simulation) { |
|
116 |
context.write(new ImmutableBytesWritable(Bytes.toBytes(dedupRoot)), asPut(builder.build())); |
|
117 |
} |
|
118 |
} |
|
119 |
|
|
120 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/Key.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation; |
|
2 |
|
|
3 |
import org.apache.hadoop.io.IntWritable; |
|
4 |
import org.apache.hadoop.io.Text; |
|
5 |
import org.apache.hadoop.io.Writable; |
|
6 |
import org.apache.hadoop.io.WritableComparable; |
|
7 |
|
|
8 |
import java.io.DataInput; |
|
9 |
import java.io.DataOutput; |
|
10 |
import java.io.IOException; |
|
11 |
|
|
12 |
public class Key implements Writable, WritableComparable<Key> { |
|
13 |
|
|
14 |
public final static int MERGES_REL = 0; |
|
15 |
public final static int OTHER_REL = 1; |
|
16 |
|
|
17 |
private IntWritable keyType; |
|
18 |
|
|
19 |
private Text id; |
|
20 |
|
|
21 |
public Key() {} |
|
22 |
|
|
23 |
public Key(final int keyType, final String id) { |
|
24 |
this.id = new Text(id); |
|
25 |
this.keyType = new IntWritable(keyType); |
|
26 |
} |
|
27 |
|
|
28 |
public static Key create(final int keyType, final String id) { |
|
29 |
return new Key(keyType, id); |
|
30 |
} |
|
31 |
|
|
32 |
public static Key mergesRel(final String id) { |
|
33 |
return new Key(MERGES_REL, id); |
|
34 |
} |
|
35 |
|
|
36 |
public static Key otherRel(final String id) { |
|
37 |
return new Key(OTHER_REL, id); |
|
38 |
} |
|
39 |
|
|
40 |
public Text getId() { |
|
41 |
return id; |
|
42 |
} |
|
43 |
|
|
44 |
public void setId(final Text id) { |
|
45 |
this.id = id; |
|
46 |
} |
|
47 |
|
|
48 |
public IntWritable getKeyType() { |
|
49 |
return keyType; |
|
50 |
} |
|
51 |
|
|
52 |
public void setKeyType(final IntWritable keyType) { |
|
53 |
this.keyType = keyType; |
|
54 |
} |
|
55 |
|
|
56 |
@Override |
|
57 |
public int compareTo(final Key o) { |
|
58 |
int compareValue = this.id.toString().compareTo(o.getId().toString()); |
|
59 |
if (compareValue == 0) { |
|
60 |
compareValue = this.keyType.compareTo(o.getKeyType()); |
|
61 |
} |
|
62 |
return compareValue; // sort ascending |
|
63 |
} |
|
64 |
|
|
65 |
@Override |
|
66 |
public void write(final DataOutput out) throws IOException { |
|
67 |
keyType.write(out); |
|
68 |
id.write(out); |
|
69 |
} |
|
70 |
|
|
71 |
@Override |
|
72 |
public void readFields(final DataInput in) throws IOException { |
|
73 |
keyType = new IntWritable(); |
|
74 |
keyType.readFields(in); |
|
75 |
id = new Text(); |
|
76 |
id.readFields(in); |
|
77 |
} |
|
78 |
|
|
79 |
@Override |
|
80 |
public String toString() { |
|
81 |
return (new StringBuilder()) |
|
82 |
.append('{') |
|
83 |
.append(getKeyType().get()) |
|
84 |
.append(',') |
|
85 |
.append(getId()) |
|
86 |
.append('}') |
|
87 |
.toString(); |
|
88 |
} |
|
89 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/KeyPartitioner.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation; |
|
2 |
|
|
3 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
4 |
import org.apache.hadoop.mapreduce.Partitioner; |
|
5 |
|
|
6 |
public class KeyPartitioner extends Partitioner<Key, ImmutableBytesWritable> { |
|
7 |
|
|
8 |
@Override |
|
9 |
public int getPartition(Key key, ImmutableBytesWritable val, int numPartitions) { |
|
10 |
final int res = Math.abs(key.getId().hashCode() % numPartitions); |
|
11 |
return res; |
|
12 |
} |
|
13 |
|
|
14 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/DedupFixRelationMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation; |
|
2 |
|
|
3 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
4 |
import eu.dnetlib.data.proto.OafProtos; |
|
5 |
import eu.dnetlib.data.proto.TypeProtos; |
|
6 |
import org.apache.hadoop.hbase.client.Result; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
|
|
10 |
import java.io.IOException; |
|
11 |
|
|
12 |
import static eu.dnetlib.data.mapreduce.util.DedupUtils.isRoot; |
|
13 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getBody; |
|
14 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.rel; |
|
15 |
|
|
16 |
/** |
|
17 |
* Created by sandro on 2/24/17. |
|
18 |
*/ |
|
19 |
public class DedupFixRelationMapper extends TableMapper<Key, ImmutableBytesWritable> { |
|
20 |
|
|
21 |
public static final String COUNTER_GROUP = "Fix relations"; |
|
22 |
|
|
23 |
private ImmutableBytesWritable ibw; |
|
24 |
|
|
25 |
private TypeProtos.Type expectedType; |
|
26 |
|
|
27 |
@Override |
|
28 |
protected void setup(final Context context) { |
|
29 |
|
|
30 |
expectedType = TypeProtos.Type.valueOf(context.getConfiguration().get("type")); |
|
31 |
|
|
32 |
ibw = new ImmutableBytesWritable(); |
|
33 |
} |
|
34 |
@Override |
|
35 |
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { |
|
36 |
final String rowKey = new String(key.copyBytes()); |
|
37 |
final OafRowKeyDecoder rowKeyDecoder = OafRowKeyDecoder.decode(rowKey); |
|
38 |
final TypeProtos.Type type = rowKeyDecoder.getType(); |
|
39 |
|
|
40 |
if (!type.equals(expectedType) || !isRoot(rowKey)) { |
|
41 |
return; |
|
42 |
} |
|
43 |
|
|
44 |
final OafProtos.Oaf oaf = getBody(value, type); |
|
45 |
if (oaf == null) { |
|
46 |
context.getCounter(COUNTER_GROUP, String.format("%s - missing body", type.toString())).increment(1); |
|
47 |
return; |
|
48 |
} |
|
49 |
|
|
50 |
rel(value).forEach(rel -> { |
|
51 |
final String targetId = rel.getRel().getTarget(); |
|
52 |
if (!isRoot(targetId)) { |
|
53 |
if (rel.getRel().getRelClass().equals("merges")) { |
|
54 |
emit(context, Key.mergesRel(targetId), rel.toByteArray()); |
|
55 |
} else { |
|
56 |
emit(context, Key.otherRel(targetId), rel.toByteArray()); |
|
57 |
} |
|
58 |
} |
|
59 |
}); |
|
60 |
} |
|
61 |
|
|
62 |
private void emit(final Context context, final Key key, final byte[] value) { |
|
63 |
ibw.set(value); |
|
64 |
switch (key.getKeyType().get()) { |
|
65 |
case Key.MERGES_REL: |
|
66 |
context.getCounter(COUNTER_GROUP, String.format("%s - Merge Relationship", expectedType)).increment(1); |
|
67 |
break; |
|
68 |
case Key.OTHER_REL: |
|
69 |
context.getCounter(COUNTER_GROUP, String.format("%s - Other Relationship", expectedType)).increment(1); |
|
70 |
break; |
|
71 |
} |
|
72 |
try { |
|
73 |
context.write(key, ibw); |
|
74 |
} catch (Exception e) { |
|
75 |
throw new RuntimeException(e); |
|
76 |
} |
|
77 |
} |
|
78 |
|
|
79 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/KeyGroupingComparator.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation; |
|
2 |
|
|
3 |
import org.apache.hadoop.io.WritableComparable; |
|
4 |
import org.apache.hadoop.io.WritableComparator; |
|
5 |
|
|
6 |
public class KeyGroupingComparator extends WritableComparator { |
|
7 |
|
|
8 |
protected KeyGroupingComparator() { |
|
9 |
super(Key.class, true); |
|
10 |
} |
|
11 |
|
|
12 |
@Override |
|
13 |
public int compare(WritableComparable w1, WritableComparable w2) { |
|
14 |
final Key k1 = (Key) w1; |
|
15 |
final Key k2 = (Key) w2; |
|
16 |
return k1.getId().compareTo(k2.getId()); |
|
17 |
} |
|
18 |
} |
Also available in: Unified diff
migrated classes for the FixRelation job from MASTER branch