Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.*;
5

    
6
import com.google.common.collect.Lists;
7
import eu.dnetlib.data.mapreduce.JobParams;
8
import eu.dnetlib.data.mapreduce.util.DedupUtils;
9
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
10
import eu.dnetlib.data.proto.TypeProtos.Type;
11
import eu.dnetlib.pace.clustering.NGramUtils;
12
import eu.dnetlib.pace.config.DedupConfig;
13
import eu.dnetlib.pace.distance.PaceDocumentDistance;
14
import eu.dnetlib.pace.distance.eval.ScoreResult;
15
import eu.dnetlib.pace.model.Field;
16
import eu.dnetlib.pace.model.MapDocument;
17
import eu.dnetlib.pace.model.MapDocumentComparator;
18
import eu.dnetlib.pace.model.MapDocumentSerializer;
19
import org.apache.commons.lang.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.hadoop.hbase.client.Put;
23
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
24
import org.apache.hadoop.hbase.mapreduce.TableReducer;
25
import org.apache.hadoop.hbase.util.Bytes;
26
import org.apache.hadoop.io.Text;
27

    
28
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
29

    
30
	private static final Log log = LogFactory.getLog(DedupReducer.class);
31

    
32
	private DedupConfig dedupConf;
33

    
34
	private ImmutableBytesWritable ibw;
35

    
36
	@Override
37
	protected void setup(final Context context) throws IOException, InterruptedException {
38

    
39
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
40
		ibw = new ImmutableBytesWritable();
41

    
42
		log.info("dedup reduce phase \npace conf: " + dedupConf.toString());
43
	}
44

    
45
	@Override
46
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
47

    
48
		final Queue<MapDocument> q = prepare(context, key, values);
49

    
50
		if (q.size() > 1) {
51
			log.info("reducing key: '" + key + "' records: " + q.size());
52

    
53
			switch (Type.valueOf(dedupConf.getWf().getEntityType())) {
54
			case result:
55
				process(simplifyQueue(q, key.toString(), context), context);
56
				break;
57
			case organization:
58
				process(q, context);
59
				break;
60
			default:
61
				throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType());
62
			}
63
		} else {
64
			context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1);
65
		}
66
	}
67

    
68
	private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
69
		final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
70

    
71
		final Set<String> seen = new HashSet<String>();
72
		final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
73

    
74
		boolean logged = false;
75

    
76
		for (final ImmutableBytesWritable i : values) {
77
			if (queue.size() <= queueMaxSize) {
78
				final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
79
				final String id = doc.getIdentifier();
80

    
81
				if (!seen.contains(id)) {
82
					seen.add(id);
83
					queue.add(doc);
84
				}
85

    
86
			} else {
87
				if (!logged) {
88
					// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
89
					context.getCounter("ngram size > " + queueMaxSize, "N").increment(1);
90
					log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'");
91
					logged = true;
92
				}
93
			}
94
		}
95

    
96
		return queue;
97
	}
98

    
99
	private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
100
		final Queue<MapDocument> q = new LinkedList<MapDocument>();
101

    
102
		String fieldRef = "";
103
		final List<MapDocument> tempResults = Lists.newArrayList();
104

    
105
		while (!queue.isEmpty()) {
106
			final MapDocument result = queue.remove();
107

    
108
			final String orderFieldName = dedupConf.getWf().getOrderField();
109
			final Field orderFieldValue = result.values(orderFieldName);
110
			if (!orderFieldValue.isEmpty()) {
111
				final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
112
				if (field.equals(fieldRef)) {
113
					tempResults.add(result);
114
				} else {
115
					populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
116
					tempResults.clear();
117
					tempResults.add(result);
118
					fieldRef = field;
119
				}
120
			} else {
121
				context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1);
122
			}
123
		}
124
		populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
125

    
126
		return q;
127
	}
128

    
129
	private void populateSimplifiedQueue(final Queue<MapDocument> q,
130
			final List<MapDocument> tempResults,
131
			final Context context,
132
			final String fieldRef,
133
			final String ngram) {
134
		if (tempResults.size() < dedupConf.getWf().getGroupMaxSize()) {
135
			q.addAll(tempResults);
136
		} else {
137
			context.getCounter(dedupConf.getWf().getEntityType(),
138
					"Skipped records for count(" + dedupConf.getWf().getOrderField() + ") >= " + dedupConf.getWf().getGroupMaxSize())
139
					.increment(tempResults.size());
140
			log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
141
		}
142
	}
143

    
144
	private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
145

    
146
		final PaceDocumentDistance algo = new PaceDocumentDistance();
147

    
148
		while (!queue.isEmpty()) {
149

    
150
			final MapDocument pivot = queue.remove();
151
			final String idPivot = pivot.getIdentifier();
152

    
153
			final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField());
154
			final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
155

    
156
			if (fieldPivot != null) {
157
				// System.out.println(idPivot + " --> " + fieldPivot);
158

    
159
				int i = 0;
160
				for (final MapDocument curr : queue) {
161
					final String idCurr = curr.getIdentifier();
162

    
163
					if (mustSkip(idCurr)) {
164
						context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1);
165
						break;
166
					}
167

    
168
					if (i > dedupConf.getWf().getSlidingWindowSize()) {
169
						break;
170
					}
171

    
172
					final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField());
173
					final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
174

    
175
					if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
176

    
177
						final ScoreResult sr = similarity(algo, pivot, curr);
178
						emitOutput(sr, idPivot, idCurr, context);
179
						i++;
180
					}
181
				}
182
			}
183
		}
184
	}
185

    
186
	private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Context context) throws IOException, InterruptedException {
187
		final double d = sr.getScore();
188

    
189
		if (d >= dedupConf.getWf().getThreshold()) {
190
			writeSimilarity(context, idPivot, idCurr, d);
191
			context.getCounter(dedupConf.getWf().getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
192
		} else {
193
			context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1);
194
		}
195
	}
196

    
197
	private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
198
		try {
199
			return algo.between(a, b, dedupConf);
200
		} catch(Throwable e) {
201
			log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
202
			throw new IllegalArgumentException(e);
203
		}
204
	}
205

    
206
	private boolean mustSkip(final String idPivot) {
207
		return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
208
	}
209

    
210
	private String getNsPrefix(final String id) {
211
		return StringUtils.substringBetween(id, "|", "::");
212
	}
213

    
214
	private void writeSimilarity(final Context context, final String idPivot, final String id, final double d) throws IOException, InterruptedException {
215
		final byte[] rowKey = Bytes.toBytes(idPivot);
216
		final byte[] target = Bytes.toBytes(id);
217

    
218
		//log.info("writing similarity: " + idPivot + " <-> " + id);
219

    
220
		emitRel(context, rowKey, target, d);
221
		emitRel(context, target, rowKey, d);
222
	}
223

    
224
	private void emitRel(final Context context, final byte[] from, final byte[] to, final double d) throws IOException, InterruptedException {
225
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
226

    
227
		//final OafRel.Builder rel = DedupUtils.getDedupSimilarity(dedupConf, new String(from), new String(to));
228
		//final Oaf.Builder oaf = DedupUtils.buildRel(dedupConf, rel, d);
229

    
230
		//final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, oaf.build().toByteArray());
231
		final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, Bytes.toBytes(""));
232
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
233
		ibw.set(from);
234
		context.write(ibw, put);
235
	}
236
}
(10-10/16)