Project

General

Profile

1
package eu.dnetlib.data.mapreduce.util;
2

    
3
import java.util.Collections;
4
import java.util.List;
5
import java.util.Map;
6

    
7
import org.apache.hadoop.hbase.util.Bytes;
8
import org.apache.hadoop.mapreduce.Mapper.Context;
9

    
10
import com.google.common.collect.Lists;
11
import com.google.protobuf.InvalidProtocolBufferException;
12

    
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14

    
15
public class UpdateMerger {
16

    
17
	private static final String UPDATE_MERGER = "UPDATE_MERGE";
18
	private static final String N_MERGES = "N_MERGES";
19

    
20
	@SuppressWarnings("rawtypes")
21
	public static Oaf mergeBodyUpdates(final Context context, final Map<byte[], byte[]> map) throws InvalidProtocolBufferException {
22
		byte[] value = map.get(DedupUtils.BODY_B);
23
		if (value == null) return null;
24

    
25
		Oaf.Builder builder = Oaf.newBuilder(Oaf.parseFrom(value));
26
		final List<String> keys = Lists.newArrayList();
27

    
28
		// we fetch all the body updates
29
		for (byte[] o : map.keySet()) {
30
			final String sKey = Bytes.toString(o);
31
			if (sKey.startsWith("update_")) {
32
				keys.add(sKey);
33
			}
34
		}
35
		if (!keys.isEmpty()) {
36
			// we merge all the sorted updates with the body
37
			Collections.sort(keys);
38
			for (String k : keys) {
39
				Oaf update = Oaf.parseFrom(map.get(Bytes.toBytes(k)));
40
				// System.out.println("\n\nBODY: \n" + body.build().toString());
41
				// System.out.println("UPDATE: \n" + update.toString());
42
				builder.mergeFrom(update);
43
				// System.out.println("UDPATED BODY: \n" + body.build().toString() + "\n\n");
44
			}
45

    
46
			builder = OafEntityMerger.merge(builder);
47

    
48
			context.getCounter(UPDATE_MERGER, N_MERGES).increment(keys.size());
49
		}
50
		return builder.build();
51
	}
52

    
53
}
(9-9/10)