Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.HashSet;
5
import java.util.LinkedList;
6
import java.util.List;
7
import java.util.PriorityQueue;
8
import java.util.Queue;
9
import java.util.Set;
10

    
11
import org.apache.commons.lang.StringUtils;
12
import org.apache.hadoop.hbase.client.Put;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableReducer;
15
import org.apache.hadoop.hbase.util.Bytes;
16
import org.apache.hadoop.io.Text;
17

    
18
import com.google.common.collect.Lists;
19

    
20
import eu.dnetlib.data.mapreduce.util.DedupUtils;
21
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
22
import eu.dnetlib.data.proto.TypeProtos.Type;
23
import eu.dnetlib.pace.clustering.NGramUtils;
24
import eu.dnetlib.pace.config.Config;
25
import eu.dnetlib.pace.config.DynConf;
26
import eu.dnetlib.pace.distance.PaceDocumentDistance;
27
import eu.dnetlib.pace.model.FieldList;
28
import eu.dnetlib.pace.model.MapDocument;
29
import eu.dnetlib.pace.model.MapDocumentComparator;
30
import eu.dnetlib.pace.model.MapDocumentSerializer;
31
import eu.dnetlib.pace.util.DedupConfig;
32
import eu.dnetlib.pace.util.DedupConfigLoader;
33

    
34
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
35

    
36
	private static final boolean WRITE_TO_WAL = false;
37
	// private static final int LIMIT = 2000;
38
	// private static final int FIELD_LIMIT = 10;
39
	// private static final int WINDOW_SIZE = 200;
40

    
41
	private Config paceConf;
42
	private DedupConfig dedupConf;
43

    
44
	private ImmutableBytesWritable ibw;
45

    
46
	@Override
47
	protected void setup(final Context context) throws IOException, InterruptedException {
48
		paceConf = DynConf.load(context.getConfiguration().get("dedup.pace.conf"));
49
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
50
		ibw = new ImmutableBytesWritable();
51

    
52
		System.out.println("dedup reduce phase \npace conf: " + paceConf.fields() + "\nwf conf: " + dedupConf.toString());
53
	}
54

    
55
	@Override
56
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
57
		System.out.println("\nReducing key: '" + key + "'");
58

    
59
		final Queue<MapDocument> q = prepare(context, key, values);
60
		switch (Type.valueOf(dedupConf.getEntityType())) {
61
		case person:
62
			process(q, context);
63
			break;
64
		case result:
65
			process(simplifyQueue(q, key.toString(), context), context);
66
			break;
67
		case organization:
68
			process(q, context);
69
			break;
70
		default:
71
			throw new IllegalArgumentException("dedup not implemented for type: " + dedupConf.getEntityType());
72
		}
73
	}
74

    
75
	private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
76
		final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getOrderField()));
77

    
78
		final Set<String> seen = new HashSet<String>();
79

    
80
		for (ImmutableBytesWritable i : values) {
81
			MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
82
			String id = doc.getIdentifier();
83

    
84
			if (!seen.contains(id)) {
85
				seen.add(id);
86
				queue.add(doc);
87
			}
88

    
89
			if (queue.size() > dedupConf.getQueueMaxSize()) {
90
				// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
91
				context.getCounter("ngram size > " + dedupConf.getQueueMaxSize(), "N").increment(1);
92
				System.out.println("breaking out after limit (" + dedupConf.getQueueMaxSize() + ") for ngram '" + key);
93
				break;
94
			}
95
		}
96

    
97
		return queue;
98
	}
99

    
100
	private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
101
		final Queue<MapDocument> q = new LinkedList<MapDocument>();
102

    
103
		String fieldRef = "";
104
		List<MapDocument> tempResults = Lists.newArrayList();
105

    
106
		while (!queue.isEmpty()) {
107
			MapDocument result = queue.remove();
108

    
109
			if (!result.values(dedupConf.getOrderField()).isEmpty()) {
110
				String field = NGramUtils.cleanupForOrdering(result.values(dedupConf.getOrderField()).stringValue());
111
				if (field.equals(fieldRef)) {
112
					tempResults.add(result);
113
				} else {
114
					populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
115
					tempResults.clear();
116
					tempResults.add(result);
117
					fieldRef = field;
118
				}
119
			} else {
120
				context.getCounter(dedupConf.getEntityType(), "missing " + dedupConf.getOrderField()).increment(1);
121
			}
122
		}
123
		populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
124

    
125
		return q;
126
	}
127

    
128
	private void populateSimplifiedQueue(final Queue<MapDocument> q,
129
			final List<MapDocument> tempResults,
130
			final Context context,
131
			final String fieldRef,
132
			final String ngram) {
133
		if (tempResults.size() < dedupConf.getGroupMaxSize()) {
134
			q.addAll(tempResults);
135
		} else {
136
			context.getCounter(dedupConf.getEntityType(), "Skipped records for count(" + dedupConf.getOrderField() + ") >= " + dedupConf.getGroupMaxSize())
137
			.increment(tempResults.size());
138
			System.out.println("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
139
		}
140
	}
141

    
142
	private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
143

    
144
		final PaceDocumentDistance algo = new PaceDocumentDistance();
145

    
146
		while (!queue.isEmpty()) {
147

    
148
			final MapDocument pivot = queue.remove();
149
			final String idPivot = pivot.getIdentifier();
150

    
151
			final FieldList fieldsPivot = pivot.values(dedupConf.getOrderField());
152
			final String fieldPivot = fieldsPivot == null || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
153

    
154
			if (fieldPivot != null) {
155
				// System.out.println(idPivot + " --> " + fieldPivot);
156

    
157
				int i = 0;
158
				for (MapDocument curr : queue) {
159
					final String idCurr = curr.getIdentifier();
160

    
161
					if (mustSkip(idCurr)) {
162
						context.getCounter(dedupConf.getEntityType(), "skip list").increment(1);
163
						break;
164
					}
165

    
166
					if (i > dedupConf.getSlidingWindowSize()) {
167
						break;
168
					}
169

    
170
					final FieldList fieldsCurr = curr.values(dedupConf.getOrderField());
171
					final String fieldCurr = fieldsCurr == null || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
172

    
173
					if (!idCurr.equals(idPivot) && fieldCurr != null) {
174

    
175
						double d = algo.between(pivot, curr, paceConf);
176

    
177
						if (d >= dedupConf.getThreshold()) {
178
							writeSimilarity(context, idPivot, idCurr);
179
							context.getCounter(dedupConf.getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
180
						} else {
181
							context.getCounter(dedupConf.getEntityType(), "d < " + dedupConf.getThreshold()).increment(1);
182
						}
183
						i++;
184
					}
185
				}
186
			}
187
		}
188
	}
189

    
190
	private boolean mustSkip(final String idPivot) {
191
		return dedupConf.getSkipList().contains(getNsPrefix(idPivot));
192
	}
193

    
194
	private String getNsPrefix(final String id) {
195
		return StringUtils.substringBetween(id, "|", "::");
196
	}
197

    
198
	private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException {
199
		byte[] rowKey = Bytes.toBytes(idPivot);
200
		byte[] target = Bytes.toBytes(id);
201

    
202
		emitRel(context, rowKey, target);
203
		emitRel(context, target, rowKey);
204
	}
205

    
206
	private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException {
207
		Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getEntityType())), to, Bytes.toBytes(""));
208
		put.setWriteToWAL(WRITE_TO_WAL);
209
		ibw.set(from);
210
		context.write(ibw, put);
211
	}
212
}
(14-14/23)