Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.*;
5

    
6
import com.google.common.collect.Lists;
7
import eu.dnetlib.data.mapreduce.JobParams;
8
import eu.dnetlib.data.mapreduce.util.DedupUtils;
9
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
10
import eu.dnetlib.data.proto.TypeProtos.Type;
11
import eu.dnetlib.pace.clustering.NGramUtils;
12
import eu.dnetlib.pace.config.DedupConfig;
13
import eu.dnetlib.pace.distance.PaceDocumentDistance;
14
import eu.dnetlib.pace.model.Field;
15
import eu.dnetlib.pace.model.MapDocument;
16
import eu.dnetlib.pace.model.MapDocumentComparator;
17
import eu.dnetlib.pace.model.MapDocumentSerializer;
18
import org.apache.commons.lang.StringUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.apache.hadoop.hbase.client.Put;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23
import org.apache.hadoop.hbase.mapreduce.TableReducer;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.apache.hadoop.io.Text;
26
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
27

    
28
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
29

    
30
	private static final Log log = LogFactory.getLog(DedupReducer.class);
31

    
32
	private static final boolean WRITE_TO_WAL = false;
33

    
34
	private DedupConfig dedupConf;
35

    
36
	private ImmutableBytesWritable ibw;
37

    
38
	@Override
39
	protected void setup(final Context context) throws IOException, InterruptedException {
40

    
41
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
42
		ibw = new ImmutableBytesWritable();
43

    
44
		log.info("dedup reduce phase \npace conf: " + dedupConf.toString());
45
	}
46

    
47
	@Override
48
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
49

    
50
		final Queue<MapDocument> q = prepare(context, key, values);
51

    
52
		if (q.size() > 1) {
53
			log.info("reducing key: '" + key + "' records: " + q.size());
54

    
55
			switch (Type.valueOf(dedupConf.getWf().getEntityType())) {
56
			case person:
57
				process(q, context);
58
				break;
59
			case result:
60
				process(simplifyQueue(q, key.toString(), context), context);
61
				break;
62
			case organization:
63
				process(q, context);
64
				break;
65
			default:
66
				throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType());
67
			}
68
		} else {
69
			context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1);
70
		}
71
	}
72

    
73
	private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
74
		final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
75

    
76
		final Set<String> seen = new HashSet<String>();
77
		final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
78
		int count = 0;
79
		boolean logged = false;
80

    
81
		for (final ImmutableBytesWritable i : values) {
82
			count++;
83

    
84
			if (queue.size() <= queueMaxSize) {
85
				final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
86
				final String id = doc.getIdentifier();
87

    
88
				if (!seen.contains(id)) {
89
					seen.add(id);
90
					queue.add(doc);
91
				}
92

    
93
			} else {
94
				if (!logged) {
95
					// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
96
					context.getCounter("ngram size > " + queueMaxSize, "N").increment(1);
97
					log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'");
98
					logged = true;
99
				}
100
			}
101
		}
102

    
103
		log.info(String.format("cluster key '%s' size '%s'", key, count));
104

    
105
		return queue;
106
	}
107

    
108
	private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
109
		final Queue<MapDocument> q = new LinkedList<MapDocument>();
110

    
111
		String fieldRef = "";
112
		final List<MapDocument> tempResults = Lists.newArrayList();
113

    
114
		while (!queue.isEmpty()) {
115
			final MapDocument result = queue.remove();
116

    
117
			final String orderFieldName = dedupConf.getWf().getOrderField();
118
			final Field orderFieldValue = result.values(orderFieldName);
119
			if (!orderFieldValue.isEmpty()) {
120
				final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
121
				if (field.equals(fieldRef)) {
122
					tempResults.add(result);
123
				} else {
124
					populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
125
					tempResults.clear();
126
					tempResults.add(result);
127
					fieldRef = field;
128
				}
129
			} else {
130
				context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1);
131
			}
132
		}
133
		populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
134

    
135
		return q;
136
	}
137

    
138
	private void populateSimplifiedQueue(final Queue<MapDocument> q,
139
			final List<MapDocument> tempResults,
140
			final Context context,
141
			final String fieldRef,
142
			final String ngram) {
143
		if (tempResults.size() < dedupConf.getWf().getGroupMaxSize()) {
144
			q.addAll(tempResults);
145
		} else {
146
			context.getCounter(dedupConf.getWf().getEntityType(),
147
					"Skipped records for count(" + dedupConf.getWf().getOrderField() + ") >= " + dedupConf.getWf().getGroupMaxSize())
148
					.increment(tempResults.size());
149
			log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
150
		}
151
	}
152

    
153
	private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
154

    
155
		final PaceDocumentDistance algo = new PaceDocumentDistance();
156

    
157
		while (!queue.isEmpty()) {
158

    
159
			final MapDocument pivot = queue.remove();
160
			final String idPivot = pivot.getIdentifier();
161

    
162
			final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField());
163
			final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
164

    
165
			if (fieldPivot != null) {
166
				// System.out.println(idPivot + " --> " + fieldPivot);
167

    
168
				int i = 0;
169
				for (final MapDocument curr : queue) {
170
					final String idCurr = curr.getIdentifier();
171

    
172
					if (mustSkip(idCurr)) {
173
						context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1);
174
						break;
175
					}
176

    
177
					if (i > dedupConf.getWf().getSlidingWindowSize()) {
178
						break;
179
					}
180

    
181
					final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField());
182
					final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
183

    
184
					if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
185

    
186
						final double d = similarity(algo, pivot, curr);
187

    
188
						if (d >= dedupConf.getWf().getThreshold()) {
189
							writeSimilarity(context, idPivot, idCurr);
190
							context.getCounter(dedupConf.getWf().getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
191
						} else {
192
							context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1);
193
						}
194
						i++;
195
					}
196
				}
197
			}
198
		}
199
	}
200

    
201
	private double similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
202
		try {
203
			return algo.between(a, b, dedupConf);
204
		} catch(Throwable e) {
205
			log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
206
			throw new IllegalArgumentException(e);
207
		}
208
	}
209

    
210
	private boolean mustSkip(final String idPivot) {
211
		return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
212
	}
213

    
214
	private String getNsPrefix(final String id) {
215
		return StringUtils.substringBetween(id, "|", "::");
216
	}
217

    
218
	private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException {
219
		final byte[] rowKey = Bytes.toBytes(idPivot);
220
		final byte[] target = Bytes.toBytes(id);
221

    
222
		//log.info("writing similarity: " + idPivot + " <-> " + id);
223

    
224
		emitRel(context, rowKey, target);
225
		emitRel(context, target, rowKey);
226
	}
227

    
228
	private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException {
229
		final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getWf().getEntityType())), to, Bytes.toBytes(""));
230
		put.setWriteToWAL(WRITE_TO_WAL);
231
		ibw.set(from);
232
		context.write(ibw, put);
233
	}
234
}
(11-11/21)