Revision 52985
Added by Claudio Atzori over 5 years ago
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsReducer.java | ||
---|---|---|
50 | 50 |
final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun()); |
51 | 51 |
|
52 | 52 |
StreamUtils.toStream(values.iterator()) |
53 |
// .limit(1000) // might cause timeouts in case of large number of items
|
|
53 |
// .limit(1000) // might cause timeouts in case of large number of items |
|
54 | 54 |
.flatMap(v -> v.getEdges().stream()) |
55 | 55 |
.forEach(q -> { |
56 | 56 |
final byte[] qb = Bytes.toBytes(q.toString()); |
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsMapper.java | ||
---|---|---|
5 | 5 |
import java.util.HashSet; |
6 | 6 |
import java.util.Map; |
7 | 7 |
import java.util.Set; |
8 |
import java.util.stream.Collectors; |
|
8 | 9 |
|
9 |
import com.google.common.base.Function; |
|
10 |
import com.google.common.collect.Iterables; |
|
11 |
import com.google.common.collect.Sets; |
|
12 | 10 |
import com.google.protobuf.InvalidProtocolBufferException; |
13 | 11 |
import eu.dnetlib.data.mapreduce.JobParams; |
14 | 12 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
15 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
16 | 13 |
import eu.dnetlib.data.mapreduce.util.UpdateMerger; |
17 | 14 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
18 | 15 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
19 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
20 | 16 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
21 | 17 |
import eu.dnetlib.data.transform.OafUtils; |
22 | 18 |
import eu.dnetlib.pace.config.DedupConfig; |
... | ... | |
51 | 47 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
52 | 48 |
// System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
53 | 49 |
|
54 |
// TODO: remove this hack - here because we don't want to dedup datasets |
|
55 |
if (checkDataset(value)) return; |
|
50 |
if (isInvalid(value)) { |
|
51 |
context.getCounter(dedupConf.getWf().getEntityType(), "not valid").increment(1); |
|
52 |
return; |
|
53 |
} |
|
56 | 54 |
|
57 | 55 |
final Map<byte[], byte[]> mergedIn = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.valueOf(dedupConf.getWf().getEntityType()))); |
58 | 56 |
|
... | ... | |
116 | 114 |
} |
117 | 115 |
|
118 | 116 |
private HashSet<String> getStrings(final Map<byte[], byte[]> mergedIn) { |
119 |
return Sets.newHashSet(Iterables.transform(mergedIn.keySet(), new Function<byte[], String>() { |
|
120 |
|
|
121 |
@Override |
|
122 |
public String apply(final byte[] input) { |
|
123 |
return new String(input); |
|
124 |
} |
|
125 |
})); |
|
117 |
return mergedIn.keySet().stream() |
|
118 |
.map(b -> new String(b)) |
|
119 |
.collect(Collectors.toCollection(HashSet::new)); |
|
126 | 120 |
} |
127 | 121 |
|
128 |
private boolean checkDataset(final Result value) {
|
|
122 |
private boolean isInvalid(final Result value) {
|
|
129 | 123 |
final Map<byte[], byte[]> bodyMap = value.getFamilyMap(dedupConf.getWf().getEntityType().getBytes()); |
130 | 124 |
|
131 | 125 |
if ((bodyMap == null) || bodyMap.isEmpty()) return true; |
... | ... | |
134 | 128 |
|
135 | 129 |
if (bodyB == null) return true; |
136 | 130 |
|
137 |
final OafEntity entity = OafDecoder.decode(bodyB).getEntity(); |
|
138 |
|
|
139 |
if (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("dataset")) return true; |
|
140 |
|
|
141 | 131 |
return false; |
142 | 132 |
} |
143 | 133 |
|
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsReducer.java | ||
---|---|---|
15 | 15 |
import eu.dnetlib.data.transform.OafEntityMerger; |
16 | 16 |
import eu.dnetlib.pace.config.DedupConfig; |
17 | 17 |
import org.apache.commons.lang.StringUtils; |
18 |
import org.apache.hadoop.hbase.client.Durability; |
|
18 | 19 |
import org.apache.hadoop.hbase.client.Put; |
20 |
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; |
|
19 | 21 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
20 | 22 |
import org.apache.hadoop.hbase.mapreduce.TableReducer; |
23 |
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException; |
|
21 | 24 |
import org.apache.hadoop.hbase.util.Bytes; |
22 | 25 |
import org.apache.hadoop.io.Text; |
23 | 26 |
|
... | ... | |
77 | 80 |
context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1); |
78 | 81 |
} |
79 | 82 |
|
80 |
emit(context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root"); |
|
83 |
emit(builder.getEntity().getType().toString(), |
|
84 |
context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root"); |
|
81 | 85 |
|
82 | 86 |
} |
83 | 87 |
|
... | ... | |
89 | 93 |
|
90 | 94 |
// emit relation from the root to the related entities |
91 | 95 |
OafDecoder decoder = rootToEntity(rowkey, oaf); |
92 |
emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]"); |
|
96 |
emit("emit relation from the root to the related entities", |
|
97 |
context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]"); |
|
93 | 98 |
|
94 | 99 |
// emit relation from the related entities to the root |
95 | 100 |
decoder = entityToRoot(rowkey, oaf); |
96 | 101 |
byte[] revKey = Bytes.toBytes(decoder.relSourceId()); |
97 |
emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]"); |
|
102 |
emit("emit relation from the related entities to the root", |
|
103 |
context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]"); |
|
98 | 104 |
|
99 | 105 |
// mark relation from the related entities to the duplicate as deleted |
100 | 106 |
decoder = markDeleted(oaf, true); |
101 | 107 |
revKey = Bytes.toBytes(decoder.relSourceId()); |
102 |
emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]"); |
|
108 |
emit("mark relation from the related entities to the duplicate as deleted", |
|
109 |
context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]"); |
|
103 | 110 |
|
104 | 111 |
// mark relation from the related entities to the duplicate as deleted |
105 | 112 |
decoder = markDeleted(oaf, false); |
106 | 113 |
revKey = Bytes.toBytes(decoder.relSourceId()); |
107 |
emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]"); |
|
114 |
emit("mark relation from the related entities to the duplicate as deleted", |
|
115 |
context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]"); |
|
108 | 116 |
} |
109 | 117 |
|
110 |
private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label) |
|
118 |
private void emit(final String msg, final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
|
|
111 | 119 |
throws IOException, InterruptedException { |
112 | 120 |
final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value); |
113 |
put.setWriteToWAL(JobParams.WRITE_TO_WAL); |
|
114 |
context.write(new ImmutableBytesWritable(rowkey), put); |
|
121 |
put.setDurability(Durability.SKIP_WAL); |
|
122 |
|
|
123 |
try { |
|
124 |
context.write(new ImmutableBytesWritable(rowkey), put); |
|
125 |
} catch (RetriesExhaustedWithDetailsException | NoSuchColumnFamilyException e) { |
|
126 |
System.err.println( |
|
127 |
String.format("%s, rowkey %s, family %s, qualifier %s", |
|
128 |
msg, new String(rowkey), family, qualifier)); |
|
129 |
throw new RuntimeException(e); |
|
130 |
} |
|
131 |
|
|
115 | 132 |
context.getCounter(family, label).increment(1); |
116 | 133 |
} |
117 | 134 |
|
Also available in: Unified diff
do not skip processing datasets in DedupBuildRootsMapper, improved error reporting in DedupBuildRootsReducer