Project

General

Profile

1
package eu.dnetlib.data.mapreduce.util;
2

    
3
import java.util.Collections;
4
import java.util.List;
5
import java.util.Map;
6

    
7
import org.apache.hadoop.hbase.util.Bytes;
8
import org.apache.hadoop.mapreduce.Mapper.Context;
9

    
10
import com.google.common.collect.Lists;
11
import com.google.protobuf.InvalidProtocolBufferException;
12

    
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.transform.OafEntityMerger;
15

    
16
public class UpdateMerger {
17

    
18
	private static final String UPDATE_MERGER = "UPDATE_MERGE";
19
	private static final String N_MERGES = "N_MERGES";
20

    
21
	@SuppressWarnings("rawtypes")
22
	public static Oaf mergeBodyUpdates(final Context context, final Map<byte[], byte[]> map) throws InvalidProtocolBufferException {
23
		final byte[] value = map.get(DedupUtils.BODY_B);
24
		if (value == null) return null;
25

    
26
		Oaf.Builder builder = Oaf.newBuilder(Oaf.parseFrom(value));
27
		final List<String> keys = Lists.newArrayList();
28

    
29
		// we fetch all the body updates
30
		for (final byte[] o : map.keySet()) {
31
			final String sKey = Bytes.toString(o);
32
			if (sKey.startsWith("update_")) {
33
				keys.add(sKey);
34
			}
35
		}
36
		if (!keys.isEmpty()) {
37
			// we merge all the sorted updates with the body
38
			Collections.sort(keys);
39
			for (final String k : keys) {
40
				final Oaf update = Oaf.parseFrom(map.get(Bytes.toBytes(k)));
41
				// System.out.println("\n\nBODY: \n" + body.build().toString());
42
				// System.out.println("UPDATE: \n" + update.toString());
43
				builder.mergeFrom(update);
44
				// System.out.println("UDPATED BODY: \n" + body.build().toString() + "\n\n");
45
			}
46

    
47
			builder = OafEntityMerger.merge(builder);
48

    
49
			context.getCounter(UPDATE_MERGER, N_MERGES).increment(keys.size());
50
		}
51
		return builder.build();
52
	}
53

    
54
}
(7-7/8)