Project

General

Profile

« Previous | Next » 

Revision 52985

do not skip processing datasets in DedupBuildRootsMapper, improved error reporting in DedupBuildRootsReducer

View differences:

modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/cc/ConnectedComponentsReducer.java
50 50
		final byte[] root = DedupUtils.newIdBytes(ByteBuffer.wrap(Bytes.toBytes(key.toString())), dedupConf.getWf().getDedupRun());
51 51

  
52 52
		StreamUtils.toStream(values.iterator())
53
		//		.limit(1000)  // might cause timeouts in case of large number of items 
53
		//		.limit(1000)  // might cause timeouts in case of large number of items
54 54
				.flatMap(v -> v.getEdges().stream())
55 55
				.forEach(q -> {
56 56
					final byte[] qb = Bytes.toBytes(q.toString());
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsMapper.java
5 5
import java.util.HashSet;
6 6
import java.util.Map;
7 7
import java.util.Set;
8
import java.util.stream.Collectors;
8 9

  
9
import com.google.common.base.Function;
10
import com.google.common.collect.Iterables;
11
import com.google.common.collect.Sets;
12 10
import com.google.protobuf.InvalidProtocolBufferException;
13 11
import eu.dnetlib.data.mapreduce.JobParams;
14 12
import eu.dnetlib.data.mapreduce.util.DedupUtils;
15
import eu.dnetlib.data.mapreduce.util.OafDecoder;
16 13
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
17 14
import eu.dnetlib.data.proto.KindProtos.Kind;
18 15
import eu.dnetlib.data.proto.OafProtos.Oaf;
19
import eu.dnetlib.data.proto.OafProtos.OafEntity;
20 16
import eu.dnetlib.data.proto.TypeProtos.Type;
21 17
import eu.dnetlib.data.transform.OafUtils;
22 18
import eu.dnetlib.pace.config.DedupConfig;
......
51 47
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
52 48
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
53 49

  
54
		// TODO: remove this hack - here because we don't want to dedup datasets
55
		if (checkDataset(value)) return;
50
		if (isInvalid(value)) {
51
			context.getCounter(dedupConf.getWf().getEntityType(), "not valid").increment(1);
52
			return;
53
		}
56 54

  
57 55
		final Map<byte[], byte[]> mergedIn = value.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(Type.valueOf(dedupConf.getWf().getEntityType())));
58 56

  
......
116 114
	}
117 115

  
118 116
	private HashSet<String> getStrings(final Map<byte[], byte[]> mergedIn) {
119
		return Sets.newHashSet(Iterables.transform(mergedIn.keySet(), new Function<byte[], String>() {
120

  
121
			@Override
122
			public String apply(final byte[] input) {
123
				return new String(input);
124
			}
125
		}));
117
		return mergedIn.keySet().stream()
118
                   .map(b -> new String(b))
119
                   .collect(Collectors.toCollection(HashSet::new));
126 120
	}
127 121

  
128
	private boolean checkDataset(final Result value) {
122
	private boolean isInvalid(final Result value) {
129 123
		final Map<byte[], byte[]> bodyMap = value.getFamilyMap(dedupConf.getWf().getEntityType().getBytes());
130 124

  
131 125
		if ((bodyMap == null) || bodyMap.isEmpty()) return true;
......
134 128

  
135 129
		if (bodyB == null) return true;
136 130

  
137
		final OafEntity entity = OafDecoder.decode(bodyB).getEntity();
138

  
139
		if (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("dataset")) return true;
140

  
141 131
		return false;
142 132
	}
143 133

  
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupBuildRootsReducer.java
15 15
import eu.dnetlib.data.transform.OafEntityMerger;
16 16
import eu.dnetlib.pace.config.DedupConfig;
17 17
import org.apache.commons.lang.StringUtils;
18
import org.apache.hadoop.hbase.client.Durability;
18 19
import org.apache.hadoop.hbase.client.Put;
20
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
19 21
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20 22
import org.apache.hadoop.hbase.mapreduce.TableReducer;
23
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
21 24
import org.apache.hadoop.hbase.util.Bytes;
22 25
import org.apache.hadoop.io.Text;
23 26

  
......
77 80
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
78 81
		}
79 82

  
80
		emit(context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
83
		emit(builder.getEntity().getType().toString(),
84
				context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
81 85

  
82 86
	}
83 87

  
......
89 93

  
90 94
		// emit relation from the root to the related entities
91 95
		OafDecoder decoder = rootToEntity(rowkey, oaf);
92
		emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
96
		emit("emit relation from the root to the related entities",
97
				context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
93 98

  
94 99
		// emit relation from the related entities to the root
95 100
		decoder = entityToRoot(rowkey, oaf);
96 101
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
97
		emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
102
		emit("emit relation from the related entities to the root",
103
				context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
98 104

  
99 105
		// mark relation from the related entities to the duplicate as deleted
100 106
		decoder = markDeleted(oaf, true);
101 107
		revKey = Bytes.toBytes(decoder.relSourceId());
102
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
108
		emit("mark relation from the related entities to the duplicate as deleted",
109
				context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
103 110

  
104 111
		// mark relation from the related entities to the duplicate as deleted
105 112
		decoder = markDeleted(oaf, false);
106 113
		revKey = Bytes.toBytes(decoder.relSourceId());
107
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
114
		emit("mark relation from the related entities to the duplicate as deleted",
115
				context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
108 116
	}
109 117

  
110
	private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
118
	private void emit(final String msg, final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
111 119
			throws IOException, InterruptedException {
112 120
		final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
113
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
114
		context.write(new ImmutableBytesWritable(rowkey), put);
121
		put.setDurability(Durability.SKIP_WAL);
122

  
123
		try {
124
			context.write(new ImmutableBytesWritable(rowkey), put);
125
		} catch (RetriesExhaustedWithDetailsException | NoSuchColumnFamilyException e) {
126
			System.err.println(
127
					String.format("%s, rowkey %s, family %s, qualifier %s",
128
							msg, new String(rowkey), family, qualifier));
129
			throw new RuntimeException(e);
130
		}
131

  
115 132
		context.getCounter(family, label).increment(1);
116 133
	}
117 134

  

Also available in: Unified diff