Revision 52985
Added by Claudio Atzori over 5 years ago
DedupBuildRootsMapper.java | ||
---|---|---|
5 | 5 |
import java.util.HashSet; |
6 | 6 |
import java.util.Map; |
7 | 7 |
import java.util.Set; |
8 |
import java.util.stream.Collectors; |
|
8 | 9 |
|
9 |
import com.google.common.base.Function; |
|
10 |
import com.google.common.collect.Iterables; |
|
11 |
import com.google.common.collect.Sets; |
|
12 | 10 |
import com.google.protobuf.InvalidProtocolBufferException; |
13 | 11 |
import eu.dnetlib.data.mapreduce.JobParams; |
14 | 12 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
15 |
import eu.dnetlib.data.mapreduce.util.OafDecoder; |
|
16 | 13 |
import eu.dnetlib.data.mapreduce.util.UpdateMerger; |
17 | 14 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
18 | 15 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
19 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
|
20 | 16 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
21 | 17 |
import eu.dnetlib.data.transform.OafUtils; |
22 | 18 |
import eu.dnetlib.pace.config.DedupConfig; |
... | ... | |
51 | 47 |
protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException { |
52 | 48 |
// System.out.println("Find root mapping: " + new String(rowkey.copyBytes())); |
53 | 49 |
|
54 |
// TODO: remove this hack - here because we don't want to dedup datasets |
|
55 |
if (checkDataset(value)) return; |
|
50 |
if (isInvalid(value)) { |
|
51 |
context.getCounter(dedupConf.getWf().getEntityType(), "not valid").increment(1); |
|
52 |
return; |
|
53 |
} |
|
56 | 54 |
|
57 | 55 |
final Map<byte[], byte[]> mergedIn = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.valueOf(dedupConf.getWf().getEntityType()))); |
58 | 56 |
|
... | ... | |
116 | 114 |
} |
117 | 115 |
|
118 | 116 |
private HashSet<String> getStrings(final Map<byte[], byte[]> mergedIn) { |
119 |
return Sets.newHashSet(Iterables.transform(mergedIn.keySet(), new Function<byte[], String>() { |
|
120 |
|
|
121 |
@Override |
|
122 |
public String apply(final byte[] input) { |
|
123 |
return new String(input); |
|
124 |
} |
|
125 |
})); |
|
117 |
return mergedIn.keySet().stream() |
|
118 |
.map(b -> new String(b)) |
|
119 |
.collect(Collectors.toCollection(HashSet::new)); |
|
126 | 120 |
} |
127 | 121 |
|
128 |
private boolean checkDataset(final Result value) {
|
|
122 |
private boolean isInvalid(final Result value) {
|
|
129 | 123 |
final Map<byte[], byte[]> bodyMap = value.getFamilyMap(dedupConf.getWf().getEntityType().getBytes()); |
130 | 124 |
|
131 | 125 |
if ((bodyMap == null) || bodyMap.isEmpty()) return true; |
... | ... | |
134 | 128 |
|
135 | 129 |
if (bodyB == null) return true; |
136 | 130 |
|
137 |
final OafEntity entity = OafDecoder.decode(bodyB).getEntity(); |
|
138 |
|
|
139 |
if (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("dataset")) return true; |
|
140 |
|
|
141 | 131 |
return false; |
142 | 132 |
} |
143 | 133 |
|
Also available in: Unified diff
do not skip processing datasets in DedupBuildRootsMapper, improved error reporting in DedupBuildRootsReducer