Revision 46317
Added by Claudio Atzori over 7 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/kv/DNGFKey.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.kv; |
|
2 |
|
|
3 |
import java.io.DataInput; |
|
4 |
import java.io.DataOutput; |
|
5 |
import java.io.IOException; |
|
6 |
|
|
7 |
import com.google.common.collect.ComparisonChain; |
|
8 |
import org.apache.hadoop.io.IntWritable; |
|
9 |
import org.apache.hadoop.io.Text; |
|
10 |
import org.apache.hadoop.io.WritableComparable; |
|
11 |
|
|
12 |
/** |
|
13 |
* Created by claudio on 13/03/2017. |
|
14 |
*/ |
|
15 |
public class DNGFKey implements WritableComparable<DNGFKey> { |
|
16 |
|
|
17 |
public final static int MERGES_REL = 0; |
|
18 |
public final static int OTHER_REL = 1; |
|
19 |
|
|
20 |
private IntWritable keyType; |
|
21 |
|
|
22 |
private Text id; |
|
23 |
|
|
24 |
public DNGFKey() {} |
|
25 |
|
|
26 |
public static DNGFKey create(final int keyType, final String id) { |
|
27 |
return new DNGFKey(keyType, id); |
|
28 |
} |
|
29 |
|
|
30 |
public static DNGFKey mergesRel(final String id) { |
|
31 |
return new DNGFKey(MERGES_REL, id); |
|
32 |
} |
|
33 |
|
|
34 |
public static DNGFKey otherRel(final String id) { |
|
35 |
return new DNGFKey(OTHER_REL, id); |
|
36 |
} |
|
37 |
|
|
38 |
public DNGFKey(final int keyType, final String id) { |
|
39 |
this.id = new Text(id); |
|
40 |
this.keyType = new IntWritable(keyType); |
|
41 |
} |
|
42 |
|
|
43 |
public void setKeyType(final IntWritable keyType) { |
|
44 |
this.keyType = keyType; |
|
45 |
} |
|
46 |
|
|
47 |
public void setId(final Text id) { |
|
48 |
this.id = id; |
|
49 |
} |
|
50 |
|
|
51 |
public Text getId() { |
|
52 |
return id; |
|
53 |
} |
|
54 |
|
|
55 |
public IntWritable getKeyType() { |
|
56 |
return keyType; |
|
57 |
} |
|
58 |
|
|
59 |
@Override |
|
60 |
public int compareTo(final DNGFKey o) { |
|
61 |
return ComparisonChain.start() |
|
62 |
.compare(getId(), o.getId()) |
|
63 |
.compare(getKeyType(), o.getKeyType()) |
|
64 |
.result(); |
|
65 |
} |
|
66 |
|
|
67 |
@Override |
|
68 |
public void write(final DataOutput out) throws IOException { |
|
69 |
keyType.write(out); |
|
70 |
id.write(out); |
|
71 |
} |
|
72 |
|
|
73 |
@Override |
|
74 |
public void readFields(final DataInput in) throws IOException { |
|
75 |
keyType = new IntWritable(); |
|
76 |
keyType.readFields(in); |
|
77 |
id = new Text(); |
|
78 |
id.readFields(in); |
|
79 |
} |
|
80 |
|
|
81 |
@Override |
|
82 |
public String toString() { |
|
83 |
return (new StringBuilder()) |
|
84 |
.append('{') |
|
85 |
.append(getKeyType().get()) |
|
86 |
.append(',') |
|
87 |
.append(getId()) |
|
88 |
.append('}') |
|
89 |
.toString(); |
|
90 |
} |
|
91 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/kv/DNGFKeyPartitioner.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.kv; |
|
2 |
|
|
3 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
4 |
import org.apache.hadoop.mapreduce.Partitioner; |
|
5 |
|
|
6 |
/** |
|
7 |
* Created by claudio on 13/03/2017. |
|
8 |
*/ |
|
9 |
public class DNGFKeyPartitioner extends Partitioner<DNGFKey, ImmutableBytesWritable > { |
|
10 |
|
|
11 |
@Override |
|
12 |
public int getPartition(DNGFKey key, ImmutableBytesWritable val, int numPartitions) { |
|
13 |
return Math.abs(key.getId().hashCode() % numPartitions); |
|
14 |
} |
|
15 |
|
|
16 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/kv/DNGFKeyGroupingComparator.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.kv; |
|
2 |
|
|
3 |
import org.apache.hadoop.io.WritableComparable; |
|
4 |
import org.apache.hadoop.io.WritableComparator; |
|
5 |
|
|
6 |
/** |
|
7 |
* Created by claudio on 13/03/2017. |
|
8 |
*/ |
|
9 |
public class DNGFKeyGroupingComparator extends WritableComparator { |
|
10 |
|
|
11 |
protected DNGFKeyGroupingComparator() { |
|
12 |
super(DNGFKey.class, true); |
|
13 |
} |
|
14 |
|
|
15 |
@Override |
|
16 |
public int compare(WritableComparable w1, WritableComparable w2) { |
|
17 |
final DNGFKey k1 = (DNGFKey) w1; |
|
18 |
final DNGFKey k2 = (DNGFKey) w2; |
|
19 |
|
|
20 |
return k1.getId().compareTo(k2.getId()); |
|
21 |
} |
|
22 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFixRelationMapper.java | ||
---|---|---|
4 | 4 |
|
5 | 5 |
import eu.dnetlib.data.graph.model.DNGFRowKeyDecoder; |
6 | 6 |
import eu.dnetlib.data.mapreduce.JobParams; |
7 |
import eu.dnetlib.data.mapreduce.hbase.dedup.kv.DNGFKey; |
|
7 | 8 |
import eu.dnetlib.data.proto.TypeProtos; |
8 | 9 |
import eu.dnetlib.pace.config.DedupConfig; |
9 | 10 |
import org.apache.hadoop.hbase.client.Result; |
10 | 11 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
11 | 12 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
12 |
import org.apache.hadoop.io.Text; |
|
13 | 13 |
|
14 | 14 |
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.isRoot; |
15 | 15 |
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.rel; |
... | ... | |
17 | 17 |
/** |
18 | 18 |
* Created by sandro on 2/24/17. |
19 | 19 |
*/ |
20 |
public class DedupFixRelationMapper extends TableMapper<Text, ImmutableBytesWritable> {
|
|
20 |
public class DedupFixRelationMapper extends TableMapper<DNGFKey, ImmutableBytesWritable> {
|
|
21 | 21 |
|
22 | 22 |
private DedupConfig dedupConf; |
23 | 23 |
|
... | ... | |
36 | 36 |
final TypeProtos.Type type = TypeProtos.Type.valueOf(dedupConf.getWf().getEntityType()); |
37 | 37 |
final String rowKey = new String(key.copyBytes()); |
38 | 38 |
final DNGFRowKeyDecoder rowKeyDecoder = DNGFRowKeyDecoder.decode(rowKey); |
39 |
if ((rowKeyDecoder.getType() != type) || !isRoot(rowKey)) {
|
|
39 |
if ((!rowKeyDecoder.getType().equals(type)) || !isRoot(rowKey)) {
|
|
40 | 40 |
return; |
41 | 41 |
} |
42 | 42 |
rel(value).values().forEach(dngf -> { |
43 | 43 |
final String targetId = dngf.getRel().getTarget(); |
44 | 44 |
if (!isRoot(targetId)) { |
45 |
final Text rootId = new Text(targetId); |
|
46 |
emit(context, rootId, dngf.toByteArray()); |
|
45 |
if (dngf.getRel().getRelType().getClassid().equals("merges")) { |
|
46 |
emit(context, DNGFKey.mergesRel(targetId), dngf.toByteArray()); |
|
47 |
} else { |
|
48 |
emit(context, DNGFKey.otherRel(targetId), dngf.toByteArray()); |
|
49 |
} |
|
47 | 50 |
} |
48 | 51 |
}); |
49 | 52 |
} |
50 | 53 |
|
51 |
private void emit(final Context context, final Text rootId, final byte[] value) {
|
|
54 |
private void emit(final Context context, final DNGFKey key, final byte[] value) {
|
|
52 | 55 |
ibw.set(value); |
53 | 56 |
try { |
54 |
context.write(rootId, ibw);
|
|
57 |
context.write(key, ibw);
|
|
55 | 58 |
} catch (Exception e) { |
56 | 59 |
throw new RuntimeException(e); |
57 | 60 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFixRelationReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.dedup; |
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 |
import java.util.List;
|
|
4 |
import java.util.Iterator;
|
|
5 | 5 |
|
6 | 6 |
import com.google.common.collect.Iterables; |
7 |
import com.google.common.collect.Lists; |
|
8 | 7 |
import eu.dnetlib.data.graph.model.DNGFDecoder; |
9 | 8 |
import eu.dnetlib.data.graph.model.DNGFRelDecoder; |
10 | 9 |
import eu.dnetlib.data.mapreduce.JobParams; |
... | ... | |
12 | 11 |
import eu.dnetlib.data.proto.DNGFProtos; |
13 | 12 |
import eu.dnetlib.data.proto.DNGFProtos.DNGF; |
14 | 13 |
import eu.dnetlib.data.proto.FieldTypeProtos; |
15 |
import eu.dnetlib.data.proto.KindProtos; |
|
16 | 14 |
import eu.dnetlib.data.transform.Ontologies; |
17 | 15 |
import eu.dnetlib.data.transform.OntologyLoader; |
18 | 16 |
import eu.dnetlib.pace.config.DedupConfig; |
... | ... | |
52 | 50 |
context.getCounter(COUNTER_GROUP, "aborted").increment(1); |
53 | 51 |
return; |
54 | 52 |
} |
55 |
int relCount = 0; |
|
56 |
String dedupRoot = null; |
|
57 | 53 |
|
58 |
final List<DNGF> dngfs = Lists.newArrayList(toDNGF(values)); |
|
54 |
final Iterator<ImmutableBytesWritable> it = values.iterator(); |
|
55 |
final DNGF first = DNGFDecoder.decode(it.next().copyBytes()).getDNGF(); |
|
59 | 56 |
|
60 |
// The reducer should fix if it founds a relation merges and other relations |
|
61 |
for (final DNGFProtos.DNGF dngf : dngfs) { |
|
62 |
if (dngf.getKind() == KindProtos.Kind.entity) { |
|
63 |
throw new RuntimeException("aborting unexpected entities on this reducer"); |
|
64 |
} |
|
65 |
final String relType = dngf.getRel().getRelType().getClassid(); |
|
66 |
if (relType.contains("merges")) { |
|
67 |
dedupRoot = dngf.getRel().getSource(); |
|
68 |
} |
|
69 |
relCount++; |
|
57 |
if (!first.getRel().getRelType().equals("merges")) { |
|
58 |
context.getCounter(COUNTER_GROUP, "Relation skipped").increment(1); |
|
59 |
return; |
|
70 | 60 |
} |
61 |
context.getCounter(COUNTER_GROUP, "Item to fix").increment(1); |
|
71 | 62 |
|
72 |
if (dedupRoot != null && relCount > 1) { |
|
73 |
context.getCounter(COUNTER_GROUP, "Item to fix").increment(1); |
|
74 |
for (final DNGFProtos.DNGF dngf : dngfs) { |
|
75 |
handleRels(context, dngf, dedupRoot); |
|
63 |
final String dedupRoot = first.getRel().getSource(); |
|
64 |
it.forEachRemaining(b -> { |
|
65 |
try { |
|
66 |
handleRels(context, DNGFDecoder.decode(b.copyBytes()).getDNGF(), dedupRoot); |
|
67 |
} catch (Exception e) { |
|
68 |
throw new RuntimeException(e); |
|
76 | 69 |
} |
77 |
} else { |
|
78 |
context.getCounter(COUNTER_GROUP, "Relation skipped").increment(1); |
|
79 |
} |
|
70 |
}); |
|
80 | 71 |
} |
81 | 72 |
|
82 | 73 |
private void handleRels(final Context context, final DNGFProtos.DNGF dngf, final String dedupRoot) throws IOException, InterruptedException { |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dli/PrepareScholixDataMapper.java | ||
---|---|---|
20 | 20 |
*/ |
21 | 21 |
public class PrepareScholixDataMapper extends TableMapper<DliKey, ImmutableBytesWritable> { |
22 | 22 |
|
23 |
private final static String SCHOLIX = "Scholix"; |
|
24 |
private final static String DNGF = "DNGF"; |
|
25 |
|
|
23 | 26 |
@Override |
24 | 27 |
protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException { |
25 | 28 |
|
... | ... | |
31 | 34 |
|
32 | 35 |
final Scholix.Builder source = DngfToScholixConverter.withSource(entity.getEntity()); |
33 | 36 |
emit(DliKey.ENTITY, entity.getEntity().getId(), context, source.build().toByteArray()); |
34 |
context.getCounter("scholix", "source " + source.getSource().getObjectType()).increment(1);
|
|
37 |
context.getCounter(SCHOLIX, "Source: " + source.getSource().getObjectType()).increment(1);
|
|
35 | 38 |
|
36 | 39 |
rel(value, "isMergedIn", "merges", "isSimilarTo").values().forEach(r -> { |
37 | 40 |
if (!deletedByInference(r)) { |
38 | 41 |
final Scholix.Builder target = DngfToScholixConverter.withTarget(entity.getEntity(), r.getRel()); |
39 | 42 |
emit(DliKey.REL, r.getRel().getTarget(), context, target.build().toByteArray()); |
40 |
context.getCounter("scholix", "rel " + target.getTarget().getObjectType()).increment(1);
|
|
43 |
context.getCounter(SCHOLIX, "Rel type: " + target.getTarget().getObjectType()).increment(1);
|
|
41 | 44 |
} else { |
42 |
context.getCounter("DNGF", type.name() + " rel deletedbyinference").increment(1);
|
|
45 |
context.getCounter(DNGF, "Rel deletedbyinference: " + type.name()).increment(1);
|
|
43 | 46 |
} |
44 | 47 |
}); |
45 | 48 |
} else { |
46 |
context.getCounter("DNGF", type.name() + " deletedbyinference").increment(1);
|
|
49 |
context.getCounter(DNGF, "Deletedbyinference: " + type.name()).increment(1);
|
|
47 | 50 |
} |
48 | 51 |
} else { |
49 |
context.getCounter("DNGF", type.name() + " invalid").increment(1);
|
|
52 |
context.getCounter(DNGF, "Invalid: " + type.name()).increment(1);
|
|
50 | 53 |
} |
51 | 54 |
} |
52 | 55 |
|
... | ... | |
60 | 63 |
|
61 | 64 |
private void emit(final int keyType, final String id, final Context context, final byte[] data) { |
62 | 65 |
try { |
63 |
context.getCounter("scholix", keyType == DliKey.ENTITY ? "ENTITY" : "REL").increment(1);
|
|
66 |
context.getCounter(SCHOLIX, keyType == DliKey.ENTITY ? "ENTITY" : "REL").increment(1);
|
|
64 | 67 |
context.write(DliKey.create(keyType, id), new ImmutableBytesWritable(data)); |
65 | 68 |
} catch (Exception e) { |
66 | 69 |
e.printStackTrace(); |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dli/PrepareSummaryDataMapper.java | ||
---|---|---|
21 | 21 |
|
22 | 22 |
public class PrepareSummaryDataMapper extends TableMapper<Text, Text> { |
23 | 23 |
|
24 |
private final static String SCHOLIX = "Scholix"; |
|
25 |
|
|
24 | 26 |
private Ontologies ontologies = null; |
25 | 27 |
|
26 | 28 |
private Text outKey; |
... | ... | |
63 | 65 |
emit(new String(keyIn.copyBytes()), context, converter.convertAsJson()); |
64 | 66 |
} |
65 | 67 |
} else { |
66 |
incrementCounter(context, "deleted by inference", type.toString(), 1);
|
|
68 |
incrementCounter(context, SCHOLIX, "Deletedbyinference: " + type.toString(), 1);
|
|
67 | 69 |
} |
68 | 70 |
} else { |
69 |
incrementCounter(context, "missing body (map)", type.toString(), 1);
|
|
71 |
incrementCounter(context, SCHOLIX, "Missing body: " + type.toString(), 1);
|
|
70 | 72 |
} |
71 | 73 |
} |
72 | 74 |
|
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dli/PrepareScholixDataReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.dli; |
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 |
import java.util.Arrays; |
|
4 | 5 |
import java.util.Iterator; |
6 |
import java.util.LinkedList; |
|
5 | 7 |
import java.util.concurrent.atomic.AtomicInteger; |
6 | 8 |
|
9 |
import com.google.common.base.Splitter; |
|
10 |
import com.google.common.collect.Lists; |
|
7 | 11 |
import com.google.protobuf.InvalidProtocolBufferException; |
8 | 12 |
import com.googlecode.protobuf.format.JsonFormat; |
9 | 13 |
import eu.dnetlib.data.mapreduce.hbase.dli.kv.DliKey; |
... | ... | |
56 | 60 |
} |
57 | 61 |
}); |
58 | 62 |
|
59 |
context.getCounter("scholix", "group size " + groupSize.get()).increment(1); |
|
63 |
groupSizeCounter(context, groupSize.get(), |
|
64 |
"1,1", |
|
65 |
"1,10", |
|
66 |
"10,20", |
|
67 |
"20,100", |
|
68 |
"100,200", |
|
69 |
"200,500", |
|
70 |
"500,1000", |
|
71 |
"1000,2000", |
|
72 |
"2000,5000", |
|
73 |
"5000,10000", |
|
74 |
"10000,20000", |
|
75 |
"20000,*"); |
|
60 | 76 |
} |
61 | 77 |
|
78 |
private void groupSizeCounter(final Context context, final int groupSize, final String... groups) { |
|
79 |
Arrays.asList(groups).forEach(g -> { |
|
80 |
final LinkedList<String> i = Lists.newLinkedList(Splitter.on(",").split(g)); |
|
81 |
int min = Integer.parseInt(i.getFirst()); |
|
82 |
int max = i.getLast().equals("*") ? Integer.MAX_VALUE : Integer.parseInt(i.getLast()); |
|
83 |
groupSizeCounter(context, groupSize, min, max); |
|
84 |
}); |
|
85 |
|
|
86 |
} |
|
87 |
|
|
88 |
private void groupSizeCounter(final Context context, final int groupSize, Integer min, Integer max) { |
|
89 |
if (groupSize > min & groupSize <= max) { |
|
90 |
context.getCounter("scholix groups", String.format("group size (%s,%s)", min, max)).increment(1); |
|
91 |
} |
|
92 |
} |
|
93 |
|
|
62 | 94 |
private Scholix.Builder parse(DliKey key, final ImmutableBytesWritable value) { |
63 | 95 |
try { |
64 | 96 |
return Scholix.newBuilder(Scholix.parseFrom(value.copyBytes())); |
Also available in: Unified diff
sorting values on reduce side, cleanup