Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.mapreduce.util;
2
3
import java.util.Collections;
4
import java.util.List;
5
import java.util.Map;
6 41517 claudio.at
import java.util.Map.Entry;
7 26600 sandro.lab
8 41517 claudio.at
import com.google.common.collect.Maps;
9 26600 sandro.lab
import org.apache.hadoop.hbase.util.Bytes;
10
import org.apache.hadoop.mapreduce.Mapper.Context;
11
12
import com.google.common.collect.Lists;
13
import com.google.protobuf.InvalidProtocolBufferException;
14
15
import eu.dnetlib.data.proto.OafProtos.Oaf;
16 35133 claudio.at
import eu.dnetlib.data.transform.OafEntityMerger;
17 26600 sandro.lab
18
public class UpdateMerger {
19
20
	private static final String UPDATE_MERGER = "UPDATE_MERGE";
21
	private static final String N_MERGES = "N_MERGES";
22
23
	@SuppressWarnings("rawtypes")
24
	public static Oaf mergeBodyUpdates(final Context context, final Map<byte[], byte[]> map) throws InvalidProtocolBufferException {
25 41517 claudio.at
26
		final Map<String, byte[]> stringMap = Maps.newHashMap();
27
		for(Entry<byte[], byte[]> e : map.entrySet()) {
28
			stringMap.put(Bytes.toString(e.getKey()), e.getValue());
29
		}
30
31
		return doMerge(context, stringMap);
32
	}
33
34
	public static Oaf mergeBodyUpdates(final Map<String, byte[]> map) throws InvalidProtocolBufferException {
35
		return doMerge(null, map);
36
	}
37
38
	private static Oaf doMerge(final Context context, final Map<String, byte[]> map)
39
			throws InvalidProtocolBufferException {
40
41
		final byte[] value = map.get(DedupUtils.BODY_S);
42 28308 claudio.at
		if (value == null) return null;
43 26600 sandro.lab
44
		Oaf.Builder builder = Oaf.newBuilder(Oaf.parseFrom(value));
45
		final List<String> keys = Lists.newArrayList();
46
47
		// we fetch all the body updates
48 41517 claudio.at
		for (final String o : map.keySet()) {
49
			if (o.startsWith("update_")) {
50
				keys.add(o);
51 26600 sandro.lab
			}
52
		}
53
		if (!keys.isEmpty()) {
54
			// we merge all the sorted updates with the body
55
			Collections.sort(keys);
56 35133 claudio.at
			for (final String k : keys) {
57 41517 claudio.at
				final Oaf update = Oaf.parseFrom(map.get(k));
58 26600 sandro.lab
				// System.out.println("\n\nBODY: \n" + body.build().toString());
59
				// System.out.println("UPDATE: \n" + update.toString());
60
				builder.mergeFrom(update);
61
				// System.out.println("UDPATED BODY: \n" + body.build().toString() + "\n\n");
62
			}
63
64
			builder = OafEntityMerger.merge(builder);
65
66 41517 claudio.at
			if (context != null) {
67
				context.getCounter(UPDATE_MERGER, N_MERGES).increment(keys.size());
68
			}
69 26600 sandro.lab
		}
70
		return builder.build();
71
	}
72
73 41517 claudio.at
74 26600 sandro.lab
}