Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.HashSet;
5
import java.util.LinkedList;
6
import java.util.Map;
7
import java.util.Queue;
8
import java.util.Set;
9

    
10
import eu.dnetlib.data.mapreduce.JobParams;
11
import org.apache.hadoop.hbase.client.Put;
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.mapreduce.TableReducer;
14
import org.apache.hadoop.hbase.util.Bytes;
15
import org.apache.hadoop.io.Text;
16

    
17
import com.google.common.collect.Maps;
18
import com.google.common.collect.Sets;
19

    
20
import eu.dnetlib.data.mapreduce.hbase.VolatileColumnFamily;
21
import eu.dnetlib.pace.model.PersonComparatorUtils;
22

    
23
public class FindDedupCandidatePersonsReducer extends TableReducer<Text, Text, ImmutableBytesWritable> {
24

    
25
	private static final int LIMIT = 5000;
26

    
27
	@Override
28
	protected void setup(final Context context) throws IOException, InterruptedException {
29

    
30
	}
31

    
32
	@Override
33
	protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
34
		System.out.println("\nReducing key: " + key);
35

    
36
		final Set<String> candidates = Sets.newHashSet();
37
		final Map<String, Set<String>> resultIds = Maps.newHashMap();
38

    
39
		final Queue<DedupPersonBean> queue = prepare(context, key, values);
40
		while (!queue.isEmpty()) {
41

    
42
			final DedupPersonBean pivot = queue.remove();
43

    
44
			for (final DedupPersonBean curr : queue) {
45

    
46
				if (PersonComparatorUtils.areSimilar(pivot.getName(), curr.getName())) {
47
					System.out.println("- Similar persons: [" + pivot.getName() + "] - [" + curr.getName() + "]");
48
					candidates.add(pivot.getId());
49
					candidates.add(curr.getId());
50

    
51
					collectResultIds(resultIds, pivot);
52
					collectResultIds(resultIds, curr);
53
				}
54
			}
55
		}
56

    
57
		emitCandidates(context, candidates);
58
		emitResultCandidates(context, resultIds);
59
	}
60

    
61
	private void collectResultIds(final Map<String, Set<String>> resultIds, final DedupPersonBean person) {
62
		if (!resultIds.containsKey(person.getId())) {
63
			resultIds.put(person.getId(), new HashSet<String>());
64
		}
65
		resultIds.get(person.getId()).addAll(person.getResults());
66
	}
67

    
68
	private Queue<DedupPersonBean> prepare(final Context context, final Text key, final Iterable<Text> values) {
69
		final Queue<DedupPersonBean> queue = new LinkedList<DedupPersonBean>();
70

    
71
		for (final Text i : values) {
72
			queue.add(DedupPersonBean.fromText(i));
73

    
74
			if (queue.size() > LIMIT) {
75
				context.getCounter("Comparison list > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
76
				System.out.println("breaking out after limit (" + LIMIT + ") for key '" + key);
77
				break;
78
			}
79
		}
80

    
81
		return queue;
82
	}
83

    
84
	private void emitCandidates(final Context context, final Set<String> candidates) throws IOException, InterruptedException {
85
		final byte[] cf = Bytes.toBytes(VolatileColumnFamily.dedup.toString());
86
		final byte[] col = Bytes.toBytes("isCandidate");
87
		final byte[] val = Bytes.toBytes("");
88

    
89
		for (final String s : candidates) {
90
			final byte[] id = Bytes.toBytes(s);
91
			final Put put = new Put(id).add(cf, col, val);
92
			put.setWriteToWAL(JobParams.WRITE_TO_WAL);
93
			context.write(new ImmutableBytesWritable(id), put);
94
		}
95
		context.getCounter(getClass().getSimpleName(), "N. Put. (persons)").increment(candidates.size());
96
	}
97

    
98
	private void emitResultCandidates(final Context context, final Map<String, Set<String>> resultIds) throws IOException, InterruptedException {
99
		final byte[] cf = Bytes.toBytes(VolatileColumnFamily.dedupPerson.toString());
100
		final byte[] val = Bytes.toBytes("");
101

    
102
		for (final String personId : resultIds.keySet()) {
103
			final byte[] col = Bytes.toBytes(personId);
104

    
105
			for (final String s : resultIds.get(personId)) {
106
				final byte[] id = Bytes.toBytes(s);
107
				final Put put = new Put(id).add(cf, col, val);
108
				put.setWriteToWAL(JobParams.WRITE_TO_WAL);
109
				context.write(new ImmutableBytesWritable(id), put);
110
			}
111
			context.getCounter(getClass().getSimpleName(), "N. Put. (results)").increment(resultIds.get(personId).size());
112
		}
113
	}
114

    
115
}
(17-17/22)