Project

General

Profile

« Previous | Next » 

Revision 45419

work in progress, adapting m/r jobs to the updated graph domain version

View differences:

DedupReducer.java
1
//package eu.dnetlib.data.mapreduce.hbase.dedup;
2
//
3
//import java.io.IOException;
4
//import java.util.*;
5
//
6
//import com.google.common.collect.Lists;
7
//import eu.dnetlib.data.mapreduce.JobParams;
8
//import eu.dnetlib.data.mapreduce.util.DedupUtils;
9
//import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
10
//import eu.dnetlib.data.proto.TypeProtos.Type;
11
//import eu.dnetlib.pace.clustering.NGramUtils;
12
//import eu.dnetlib.pace.config.DedupConfig;
13
//import eu.dnetlib.pace.distance.PaceDocumentDistance;
14
//import eu.dnetlib.pace.distance.eval.ScoreResult;
15
//import eu.dnetlib.pace.model.Field;
16
//import eu.dnetlib.pace.model.MapDocument;
17
//import eu.dnetlib.pace.model.MapDocumentComparator;
18
//import eu.dnetlib.pace.model.MapDocumentSerializer;
19
//import org.apache.commons.lang3.StringUtils;
20
//import org.apache.commons.logging.Log;
21
//import org.apache.commons.logging.LogFactory;
22
//import org.apache.hadoop.hbase.client.Put;
23
//import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
24
//import org.apache.hadoop.hbase.mapreduce.TableReducer;
25
//import org.apache.hadoop.hbase.util.Bytes;
26
//import org.apache.hadoop.io.Text;
27
//
28
//public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
29
//
30
//	private static final Log log = LogFactory.getLog(DedupReducer.class);
31
//
32
//	private DedupConfig dedupConf;
33
//
34
//	private ImmutableBytesWritable ibw;
35
//
36
//	@Override
37
//	protected void setup(final Context context) throws IOException, InterruptedException {
38
//
39
//		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
40
//		ibw = new ImmutableBytesWritable();
41
//
42
//		log.info("dedup reduce phase \npace conf: " + dedupConf.toString());
43
//	}
44
//
45
//	@Override
46
//	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
47
//
48
//		final Queue<MapDocument> q = prepare(context, key, values);
49
//
50
//		if (q.size() > 1) {
51
//			log.info("reducing key: '" + key + "' records: " + q.size());
52
//
53
//			switch (Type.valueOf(dedupConf.getWf().getEntityType())) {
54
//			case person:
55
//				process(q, context);
56
//				break;
57
//			case publication:
58
//			case dataset:
59
//				process(simplifyQueue(q, key.toString(), context), context);
60
//				break;
61
//			case organization:
62
//				process(q, context);
63
//				break;
64
//			default:
65
//				throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType());
66
//			}
67
//		} else {
68
//			context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1);
69
//		}
70
//	}
71
//
72
//	private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
73
//		final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
74
//
75
//		final Set<String> seen = new HashSet<String>();
76
//		final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
77
//		int count = 0;
78
//		boolean logged = false;
79
//
80
//		for (final ImmutableBytesWritable i : values) {
81
//			count++;
82
//
83
//			if (queue.size() <= queueMaxSize) {
84
//				final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
85
//				final String id = doc.getIdentifier();
86
//
87
//				if (!seen.contains(id)) {
88
//					seen.add(id);
89
//					queue.add(doc);
90
//				}
91
//
92
//			} else {
93
//				if (!logged) {
94
//					// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
95
//					context.getCounter("ngram size > " + queueMaxSize, "N").increment(1);
96
//					log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'");
97
//					logged = true;
98
//				}
99
//			}
100
//		}
101
//
102
//		log.info(String.format("cluster key '%s' size '%s'", key, count));
103
//
104
//		return queue;
105
//	}
106
//
107
//	private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
108
//		final Queue<MapDocument> q = new LinkedList<MapDocument>();
109
//
110
//		String fieldRef = "";
111
//		final List<MapDocument> tempResults = Lists.newArrayList();
112
//
113
//		while (!queue.isEmpty()) {
114
//			final MapDocument result = queue.remove();
115
//
116
//			final String orderFieldName = dedupConf.getWf().getOrderField();
117
//			final Field orderFieldValue = result.values(orderFieldName);
118
//			if (!orderFieldValue.isEmpty()) {
119
//				final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
120
//				if (field.equals(fieldRef)) {
121
//					tempResults.add(result);
122
//				} else {
123
//					populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
124
//					tempResults.clear();
125
//					tempResults.add(result);
126
//					fieldRef = field;
127
//				}
128
//			} else {
129
//				context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1);
130
//			}
131
//		}
132
//		populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
133
//
134
//		return q;
135
//	}
136
//
137
//	private void populateSimplifiedQueue(final Queue<MapDocument> q,
138
//			final List<MapDocument> tempResults,
139
//			final Context context,
140
//			final String fieldRef,
141
//			final String ngram) {
142
//		if (tempResults.size() < dedupConf.getWf().getGroupMaxSize()) {
143
//			q.addAll(tempResults);
144
//		} else {
145
//			context.getCounter(dedupConf.getWf().getEntityType(),
146
//					"Skipped records for count(" + dedupConf.getWf().getOrderField() + ") >= " + dedupConf.getWf().getGroupMaxSize())
147
//					.increment(tempResults.size());
148
//			log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
149
//		}
150
//	}
151
//
152
//	private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
153
//
154
//		final PaceDocumentDistance algo = new PaceDocumentDistance();
155
//
156
//		while (!queue.isEmpty()) {
157
//
158
//			final MapDocument pivot = queue.remove();
159
//			final String idPivot = pivot.getIdentifier();
160
//
161
//			final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField());
162
//			final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
163
//
164
//			if (fieldPivot != null) {
165
//				// System.out.println(idPivot + " --> " + fieldPivot);
166
//
167
//				int i = 0;
168
//				for (final MapDocument curr : queue) {
169
//					final String idCurr = curr.getIdentifier();
170
//
171
//					if (mustSkip(idCurr)) {
172
//						context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1);
173
//						break;
174
//					}
175
//
176
//					if (i > dedupConf.getWf().getSlidingWindowSize()) {
177
//						break;
178
//					}
179
//
180
//					final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField());
181
//					final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
182
//
183
//					if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
184
//
185
//						final ScoreResult sr = similarity(algo, pivot, curr);
186
//						emitOutput(sr, idPivot, idCurr, context);
187
//						i++;
188
//					}
189
//				}
190
//			}
191
//		}
192
//	}
193
//
194
//	private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Context context) throws IOException, InterruptedException {
195
//		final double d = sr.getScore();
196
//
197
//		if (d >= dedupConf.getWf().getThreshold()) {
198
//			writeSimilarity(context, idPivot, idCurr);
199
//			context.getCounter(dedupConf.getWf().getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
200
//		} else {
201
//			context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1);
202
//		}
203
//	}
204
//
205
//	private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
206
//		try {
207
//			return algo.between(a, b, dedupConf);
208
//		} catch(Throwable e) {
209
//			log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
210
//			throw new IllegalArgumentException(e);
211
//		}
212
//	}
213
//
214
//	private boolean mustSkip(final String idPivot) {
215
//		return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
216
//	}
217
//
218
//	private String getNsPrefix(final String id) {
219
//		return StringUtils.substringBetween(id, "|", "::");
220
//	}
221
//
222
//	private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException {
223
//		final byte[] rowKey = Bytes.toBytes(idPivot);
224
//		final byte[] target = Bytes.toBytes(id);
225
//
226
//		//log.info("writing similarity: " + idPivot + " <-> " + id);
227
//
228
//		emitRel(context, rowKey, target);
229
//		emitRel(context, target, rowKey);
230
//	}
231
//
232
//	private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException {
233
//		final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(Type.valueOf(dedupConf.getWf().getEntityType())), to, Bytes.toBytes(""));
234
//		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
235
//		ibw.set(from);
236
//		context.write(ibw, put);
237
//	}
238
//}
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.*;
5

  
6
import com.google.common.collect.Lists;
7
import eu.dnetlib.data.mapreduce.JobParams;
8
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
9
import eu.dnetlib.data.proto.TypeProtos.Type;
10
import eu.dnetlib.pace.clustering.NGramUtils;
11
import eu.dnetlib.pace.config.DedupConfig;
12
import eu.dnetlib.pace.distance.PaceDocumentDistance;
13
import eu.dnetlib.pace.distance.eval.ScoreResult;
14
import eu.dnetlib.pace.model.Field;
15
import eu.dnetlib.pace.model.MapDocument;
16
import eu.dnetlib.pace.model.MapDocumentComparator;
17
import eu.dnetlib.pace.model.MapDocumentSerializer;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.apache.hadoop.hbase.client.Put;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23
import org.apache.hadoop.hbase.mapreduce.TableReducer;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.apache.hadoop.io.Text;
26

  
27
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
28

  
29
	private static final Log log = LogFactory.getLog(DedupReducer.class);
30

  
31
	private DedupConfig dedupConf;
32

  
33
	private ImmutableBytesWritable ibw;
34

  
35
	@Override
36
	protected void setup(final Context context) throws IOException, InterruptedException {
37

  
38
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
39
		ibw = new ImmutableBytesWritable();
40

  
41
		log.info("dedup reduce phase \npace conf: " + dedupConf.toString());
42
	}
43

  
44
	@Override
45
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
46

  
47
		final Queue<MapDocument> q = prepare(context, key, values);
48

  
49
		if (q.size() > 1) {
50
			log.info("reducing key: '" + key + "' records: " + q.size());
51

  
52
			switch (Type.valueOf(dedupConf.getWf().getEntityType())) {
53
			case person:
54
				process(q, context);
55
				break;
56
			case publication:
57
			case dataset:
58
				process(simplifyQueue(q, key.toString(), context), context);
59
				break;
60
			case organization:
61
				process(q, context);
62
				break;
63
			default:
64
				throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType());
65
			}
66
		} else {
67
			context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1);
68
		}
69
	}
70

  
71
	private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
72
		final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
73

  
74
		final Set<String> seen = new HashSet<String>();
75
		final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
76
		int count = 0;
77
		boolean logged = false;
78

  
79
		for (final ImmutableBytesWritable i : values) {
80
			count++;
81

  
82
			if (queue.size() <= queueMaxSize) {
83
				final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
84
				final String id = doc.getIdentifier();
85

  
86
				if (!seen.contains(id)) {
87
					seen.add(id);
88
					queue.add(doc);
89
				}
90

  
91
			} else {
92
				if (!logged) {
93
					// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
94
					context.getCounter("ngram size > " + queueMaxSize, "N").increment(1);
95
					log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'");
96
					logged = true;
97
				}
98
			}
99
		}
100

  
101
		log.info(String.format("cluster key '%s' size '%s'", key, count));
102

  
103
		return queue;
104
	}
105

  
106
	private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
107
		final Queue<MapDocument> q = new LinkedList<MapDocument>();
108

  
109
		String fieldRef = "";
110
		final List<MapDocument> tempResults = Lists.newArrayList();
111

  
112
		while (!queue.isEmpty()) {
113
			final MapDocument result = queue.remove();
114

  
115
			final String orderFieldName = dedupConf.getWf().getOrderField();
116
			final Field orderFieldValue = result.values(orderFieldName);
117
			if (!orderFieldValue.isEmpty()) {
118
				final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
119
				if (field.equals(fieldRef)) {
120
					tempResults.add(result);
121
				} else {
122
					populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
123
					tempResults.clear();
124
					tempResults.add(result);
125
					fieldRef = field;
126
				}
127
			} else {
128
				context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1);
129
			}
130
		}
131
		populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
132

  
133
		return q;
134
	}
135

  
136
	private void populateSimplifiedQueue(final Queue<MapDocument> q,
137
			final List<MapDocument> tempResults,
138
			final Context context,
139
			final String fieldRef,
140
			final String ngram) {
141
		if (tempResults.size() < dedupConf.getWf().getGroupMaxSize()) {
142
			q.addAll(tempResults);
143
		} else {
144
			context.getCounter(dedupConf.getWf().getEntityType(),
145
					"Skipped records for count(" + dedupConf.getWf().getOrderField() + ") >= " + dedupConf.getWf().getGroupMaxSize())
146
					.increment(tempResults.size());
147
			log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
148
		}
149
	}
150

  
151
	private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
152

  
153
		final PaceDocumentDistance algo = new PaceDocumentDistance();
154

  
155
		while (!queue.isEmpty()) {
156

  
157
			final MapDocument pivot = queue.remove();
158
			final String idPivot = pivot.getIdentifier();
159

  
160
			final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField());
161
			final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
162

  
163
			if (fieldPivot != null) {
164
				// System.out.println(idPivot + " --> " + fieldPivot);
165

  
166
				int i = 0;
167
				for (final MapDocument curr : queue) {
168
					final String idCurr = curr.getIdentifier();
169

  
170
					if (mustSkip(idCurr)) {
171
						context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1);
172
						break;
173
					}
174

  
175
					if (i > dedupConf.getWf().getSlidingWindowSize()) {
176
						break;
177
					}
178

  
179
					final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField());
180
					final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
181

  
182
					if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
183

  
184
						final ScoreResult sr = similarity(algo, pivot, curr);
185
						emitOutput(sr, idPivot, idCurr, context);
186
						i++;
187
					}
188
				}
189
			}
190
		}
191
	}
192

  
193
	private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Context context) throws IOException, InterruptedException {
194
		final double d = sr.getScore();
195

  
196
		if (d >= dedupConf.getWf().getThreshold()) {
197
			writeSimilarity(context, idPivot, idCurr);
198
			context.getCounter(dedupConf.getWf().getEntityType(), "dedup similarity (x2)").increment(1);
199
		} else {
200
			context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1);
201
		}
202
	}
203

  
204
	private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
205
		try {
206
			return algo.between(a, b, dedupConf);
207
		} catch(Throwable e) {
208
			log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
209
			throw new IllegalArgumentException(e);
210
		}
211
	}
212

  
213
	private boolean mustSkip(final String idPivot) {
214
		return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
215
	}
216

  
217
	private String getNsPrefix(final String id) {
218
		return StringUtils.substringBetween(id, "|", "::");
219
	}
220

  
221
	private void writeSimilarity(final Context context, final String idPivot, final String id) throws IOException, InterruptedException {
222
		final byte[] rowKey = Bytes.toBytes(idPivot);
223
		final byte[] target = Bytes.toBytes(id);
224

  
225
		//log.info("writing similarity: " + idPivot + " <-> " + id);
226

  
227
		emitRel(context, rowKey, target);
228
		emitRel(context, target, rowKey);
229
	}
230

  
231
	private void emitRel(final Context context, final byte[] from, final byte[] to) throws IOException, InterruptedException {
232
		final byte[] qualifier = HBaseTableDAO.getSimilarityQualifierBytes(Type.valueOf(dedupConf.getWf().getEntityType()), new String(to));
233
		final Put put = new Put(from).add(HBaseTableDAO.cfRelsByte(), qualifier, Bytes.toBytes(""));
234
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
235
		ibw.set(from);
236
		context.write(ibw, put);
237
	}
238
}

Also available in: Unified diff