Revision 54182
Added by Claudio Atzori over 5 years ago
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/DedupFixRelationReducer.java | ||
---|---|---|
36 | 36 |
protected void setup(final Context context) throws IOException, InterruptedException { |
37 | 37 |
super.setup(context); |
38 | 38 |
|
39 |
simulation = context.getConfiguration().getBoolean("simulation", false); |
|
39 |
simulation = context.getConfiguration().getBoolean("fixrel.simulation", false);
|
|
40 | 40 |
ontologies = new Gson().fromJson(context.getConfiguration().get(JobParams.ONTOLOGIES), Ontologies.class); |
41 | 41 |
|
42 | 42 |
System.out.println("ontologies: " + ontologies.toJson(true)); |
... | ... | |
85 | 85 |
write(context, sourceKey, builder); |
86 | 86 |
|
87 | 87 |
String relGroup = oaf.getRel().getRelType().toString() + oaf.getRel().getSubRelType().toString(); |
88 |
context.getCounter(COUNTER_GROUP, String.format("Relation '%s' set deleted", relGroup)).increment(1);
|
|
88 |
context.getCounter(COUNTER_GROUP, String.format("%s - Relation set deleted", relGroup)).increment(1);
|
|
89 | 89 |
|
90 | 90 |
// Create Relation from Root Entity to its deduplicated Entity |
91 | 91 |
builder = OafProtos.Oaf.newBuilder(oaf); |
... | ... | |
108 | 108 |
write(context, dedupRoot, builder); |
109 | 109 |
|
110 | 110 |
relGroup = oaf.getRel().getRelType().toString() + oaf.getRel().getSubRelType().toString(); |
111 |
context.getCounter(COUNTER_GROUP, String.format("Relation '%s' fixed", relGroup)).increment(2);
|
|
111 |
context.getCounter(COUNTER_GROUP, String.format("%s - Relation fixed", relGroup)).increment(2);
|
|
112 | 112 |
} |
113 | 113 |
|
114 | 114 |
private void write(Context context, String dedupRoot, OafProtos.Oaf.Builder builder) throws IOException, InterruptedException { |
... | ... | |
117 | 117 |
} |
118 | 118 |
} |
119 | 119 |
|
120 |
private Iterable<OafProtos.Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) { |
|
121 |
return Iterables.transform(values, ibw -> OafDecoder.decode(ibw.copyBytes()).getOaf()); |
|
122 |
} |
|
123 |
|
|
124 | 120 |
} |
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/DedupFixRelationMapper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation; |
2 | 2 |
|
3 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
4 | 3 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
5 | 4 |
import eu.dnetlib.data.proto.OafProtos; |
6 | 5 |
import eu.dnetlib.data.proto.TypeProtos; |
7 |
import eu.dnetlib.pace.config.DedupConfig; |
|
8 |
import org.apache.commons.lang3.StringUtils; |
|
9 | 6 |
import org.apache.hadoop.hbase.client.Result; |
10 | 7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
11 | 8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
12 | 9 |
|
13 | 10 |
import java.io.IOException; |
14 |
import java.util.List; |
|
15 | 11 |
|
16 | 12 |
import static eu.dnetlib.data.mapreduce.util.DedupUtils.isRoot; |
17 | 13 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getBody; |
... | ... | |
26 | 22 |
|
27 | 23 |
private ImmutableBytesWritable ibw; |
28 | 24 |
|
29 |
private String subEntityType; |
|
30 |
|
|
31 | 25 |
private TypeProtos.Type expectedType; |
32 | 26 |
|
33 | 27 |
@Override |
34 | 28 |
protected void setup(final Context context) { |
35 |
subEntityType = context.getConfiguration().get(JobParams.SUBENTITYTYPE); |
|
36 |
if (StringUtils.isBlank(subEntityType)) { |
|
37 |
throw new RuntimeException(String.format("workflow must provide parameter '%s', got '%s'", JobParams.SUBENTITYTYPE, subEntityType)); |
|
38 |
} |
|
39 | 29 |
|
40 |
switch (subEntityType) { |
|
41 |
case "organization" : |
|
42 |
expectedType = TypeProtos.Type.organization; |
|
43 |
break; |
|
44 |
case "publication" : |
|
45 |
case "dataset" : |
|
46 |
case "software" : |
|
47 |
case "other" : |
|
48 |
expectedType = TypeProtos.Type.result; |
|
49 |
break; |
|
50 |
default: |
|
51 |
throw new RuntimeException("wrong subType: " + subEntityType); |
|
52 |
} |
|
30 |
expectedType = TypeProtos.Type.valueOf(context.getConfiguration().get("type")); |
|
53 | 31 |
|
54 | 32 |
ibw = new ImmutableBytesWritable(); |
55 | 33 |
} |
56 |
|
|
57 | 34 |
@Override |
58 | 35 |
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { |
59 | 36 |
final String rowKey = new String(key.copyBytes()); |
... | ... | |
65 | 42 |
} |
66 | 43 |
|
67 | 44 |
final OafProtos.Oaf oaf = getBody(value, type); |
68 |
|
|
69 |
// we need to check the result subtypes |
|
70 |
if (type.equals(TypeProtos.Type.result) && !subEntityType.equalsIgnoreCase(oaf.getEntity().getResult().getMetadata().getResulttype().getClassid())) { |
|
45 |
if (oaf == null) { |
|
46 |
context.getCounter(COUNTER_GROUP, String.format("%s - missing body", type.toString())).increment(1); |
|
71 | 47 |
return; |
72 | 48 |
} |
73 | 49 |
|
74 |
rel(value).values().forEach(rel -> {
|
|
50 |
rel(value).forEach(rel -> { |
|
75 | 51 |
final String targetId = rel.getRel().getTarget(); |
76 | 52 |
if (!isRoot(targetId)) { |
77 | 53 |
if (rel.getRel().getRelClass().equals("merges")) { |
... | ... | |
87 | 63 |
ibw.set(value); |
88 | 64 |
switch (key.getKeyType().get()) { |
89 | 65 |
case Key.MERGES_REL: |
90 |
context.getCounter(COUNTER_GROUP, "Merges Relationship").increment(1);
|
|
66 |
context.getCounter(COUNTER_GROUP, String.format("%s - Merge Relationship", expectedType)).increment(1);
|
|
91 | 67 |
break; |
92 | 68 |
case Key.OTHER_REL: |
93 |
context.getCounter(COUNTER_GROUP, "Other Relationship").increment(1);
|
|
69 |
context.getCounter(COUNTER_GROUP, String.format("%s - Other Relationship", expectedType)).increment(1);
|
|
94 | 70 |
break; |
95 | 71 |
} |
96 | 72 |
try { |
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/util/OafHbaseUtils.java | ||
---|---|---|
121 | 121 |
return Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(s)); |
122 | 122 |
} |
123 | 123 |
|
124 |
public static Map<String, Oaf> rel(final Result value) {
|
|
124 |
public static List<Oaf> rel(final Result value) {
|
|
125 | 125 |
return value.list().stream() |
126 | 126 |
.filter(kv -> { |
127 | 127 |
final String q = new String(kv.getQualifier()); |
128 | 128 |
return q.matches(OafRowKeyDecoder.ID_REGEX); |
129 |
}).collect(Collectors.toMap(
|
|
130 |
kv -> new String(kv.getQualifier()),
|
|
131 |
kv -> parseProto(kv.getValue())
|
|
132 |
)); |
|
129 |
}) |
|
130 |
.filter(kv -> kv.getValue() != null && kv.getValue().length > 0)
|
|
131 |
.map(kv -> parseProto(kv.getValue()))
|
|
132 |
.collect(Collectors.toList());
|
|
133 | 133 |
} |
134 | 134 |
|
135 | 135 |
public static Oaf parseProto(final byte[] value) { |
Also available in: Unified diff
fixRelations must work on main entities in a single scan pass