Project

General

Profile

« Previous | Next » 

Revision 52985

do not skip processing datasets in DedupBuildRootsMapper, improved error reporting in DedupBuildRootsReducer

View differences:

DedupBuildRootsMapper.java
5 5
import java.util.HashSet;
6 6
import java.util.Map;
7 7
import java.util.Set;
8
import java.util.stream.Collectors;
8 9

  
9
import com.google.common.base.Function;
10
import com.google.common.collect.Iterables;
11
import com.google.common.collect.Sets;
12 10
import com.google.protobuf.InvalidProtocolBufferException;
13 11
import eu.dnetlib.data.mapreduce.JobParams;
14 12
import eu.dnetlib.data.mapreduce.util.DedupUtils;
15
import eu.dnetlib.data.mapreduce.util.OafDecoder;
16 13
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
17 14
import eu.dnetlib.data.proto.KindProtos.Kind;
18 15
import eu.dnetlib.data.proto.OafProtos.Oaf;
19
import eu.dnetlib.data.proto.OafProtos.OafEntity;
20 16
import eu.dnetlib.data.proto.TypeProtos.Type;
21 17
import eu.dnetlib.data.transform.OafUtils;
22 18
import eu.dnetlib.pace.config.DedupConfig;
......
51 47
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
52 48
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
53 49

  
54
		// TODO: remove this hack - here because we don't want to dedup datasets
55
		if (checkDataset(value)) return;
50
		if (isInvalid(value)) {
51
			context.getCounter(dedupConf.getWf().getEntityType(), "not valid").increment(1);
52
			return;
53
		}
56 54

  
57 55
		final Map<byte[], byte[]> mergedIn = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.valueOf(dedupConf.getWf().getEntityType())));
58 56

  
......
116 114
	}
117 115

  
118 116
	private HashSet<String> getStrings(final Map<byte[], byte[]> mergedIn) {
119
		return Sets.newHashSet(Iterables.transform(mergedIn.keySet(), new Function<byte[], String>() {
120

  
121
			@Override
122
			public String apply(final byte[] input) {
123
				return new String(input);
124
			}
125
		}));
117
		return mergedIn.keySet().stream()
118
                   .map(b -> new String(b))
119
                   .collect(Collectors.toCollection(HashSet::new));
126 120
	}
127 121

  
128
	private boolean checkDataset(final Result value) {
122
	private boolean isInvalid(final Result value) {
129 123
		final Map<byte[], byte[]> bodyMap = value.getFamilyMap(dedupConf.getWf().getEntityType().getBytes());
130 124

  
131 125
		if ((bodyMap == null) || bodyMap.isEmpty()) return true;
......
134 128

  
135 129
		if (bodyB == null) return true;
136 130

  
137
		final OafEntity entity = OafDecoder.decode(bodyB).getEntity();
138

  
139
		if (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("dataset")) return true;
140

  
141 131
		return false;
142 132
	}
143 133

  

Also available in: Unified diff