Project

General

Profile

« Previous | Next » 

Revision 53518

introduced use of BlockProcessor

View differences:

modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupMapper.java
97 97
					final String subType = fields.get(wf.getSubEntityType()).stringValue();
98 98
					if (wf.getSubEntityValue().equalsIgnoreCase(subType)) {
99 99
						context.getCounter(subType, "converted as MapDocument").increment(1);
100
						emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, dedupConf.blacklists()));
100
						emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf));
101 101
					} else {
102 102
						context.getCounter(subType, "ignored").increment(1);
103 103
					}
104 104
				} else {
105
					emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, dedupConf.blacklists()));
105
					emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf));
106 106
				}
107 107
			}
108 108
		} else {
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupReducer.java
3 3
import java.io.IOException;
4 4
import java.util.*;
5 5

  
6
import com.google.common.base.Function;
7
import com.google.common.collect.Iterables;
6 8
import com.google.common.collect.Lists;
9
import com.google.protobuf.InvalidProtocolBufferException;
7 10
import eu.dnetlib.data.mapreduce.JobParams;
8 11
import eu.dnetlib.data.mapreduce.util.DedupUtils;
12
import eu.dnetlib.data.mapreduce.util.StreamUtils;
13
import eu.dnetlib.data.proto.OafProtos;
9 14
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
10 15
import eu.dnetlib.data.proto.TypeProtos.Type;
11 16
import eu.dnetlib.pace.clustering.NGramUtils;
......
13 18
import eu.dnetlib.pace.config.WfConfig;
14 19
import eu.dnetlib.pace.distance.PaceDocumentDistance;
15 20
import eu.dnetlib.pace.distance.eval.ScoreResult;
16
import eu.dnetlib.pace.model.Field;
17
import eu.dnetlib.pace.model.MapDocument;
18
import eu.dnetlib.pace.model.MapDocumentComparator;
19
import eu.dnetlib.pace.model.MapDocumentSerializer;
21
import eu.dnetlib.pace.model.*;
22
import eu.dnetlib.pace.util.BlockProcessor;
23
import eu.dnetlib.pace.util.Reporter;
20 24
import org.apache.commons.lang.StringUtils;
21 25
import org.apache.commons.logging.Log;
22 26
import org.apache.commons.logging.LogFactory;
......
27 31
import org.apache.hadoop.hbase.util.Bytes;
28 32
import org.apache.hadoop.io.Text;
29 33

  
34
import javax.annotation.Nullable;
35

  
30 36
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
31 37

  
32 38
	private static final Log log = LogFactory.getLog(DedupReducer.class);
......
45 51
	}
46 52

  
47 53
	@Override
48
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
54
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) {
49 55

  
50
		final Queue<MapDocument> q = prepare(context, key, values);
51

  
52
		if (q.size() > 1) {
53
			log.info("reducing key: '" + key + "' records: " + q.size());
54

  
55
			final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
56

  
57
			switch (type) {
58
			case result:
59
				process(simplifyQueue(q, key.toString(), context), context);
60
				break;
61
			case organization:
62
				process(q, context);
63
				break;
64
			default:
65
				throw new IllegalArgumentException("process not implemented for type: " + dedupConf.getWf().getEntityType());
56
		final Iterable<MapDocument> docs = Iterables.transform(values, new Function<ImmutableBytesWritable, MapDocument>() {
57
			@Nullable
58
			@Override
59
			public MapDocument apply(@Nullable ImmutableBytesWritable b) {
60
				return MapDocumentSerializer.decode(b.copyBytes());
66 61
			}
67
		} else {
68
			context.getCounter(dedupConf.getWf().getEntityType(), "records per hash key = 1").increment(1);
69
		}
70
	}
62
		});
71 63

  
72
	private Queue<MapDocument> prepare(final Context context, final Text key, final Iterable<ImmutableBytesWritable> values) {
73
		final Queue<MapDocument> queue = new PriorityQueue<MapDocument>(100, new MapDocumentComparator(dedupConf.getWf().getOrderField()));
64
		new BlockProcessor(dedupConf).process(key.toString(), docs, new Reporter() {
74 65

  
75
		final Set<String> seen = new HashSet<String>();
76
		final int queueMaxSize = dedupConf.getWf().getQueueMaxSize();
77

  
78
		boolean logged = false;
79

  
80
		for (final ImmutableBytesWritable i : values) {
81
			if (queue.size() <= queueMaxSize) {
82
				final MapDocument doc = MapDocumentSerializer.decode(i.copyBytes());
83
				final String id = doc.getIdentifier();
84

  
85
				if (!seen.contains(id)) {
86
					seen.add(id);
87
					queue.add(doc);
88
				}
89

  
90
			} else {
91
				if (!logged) {
92
					// context.getCounter("ngram size > " + LIMIT, "'" + key.toString() + "', --> " + context.getTaskAttemptID()).increment(1);
93
					context.getCounter("ngram size > " + queueMaxSize, "N").increment(1);
94
					log.info("breaking out after limit (" + queueMaxSize + ") for ngram '" + key + "'");
95
					logged = true;
96
				}
66
			@Override
67
			public void incrementCounter(String counterGroup, String counterName, long delta) {
68
				context.getCounter(counterGroup, counterName).increment(delta);
97 69
			}
98
		}
99 70

  
100
		return queue;
101
	}
71
			@Override
72
			public void emit(String type, String from, String to) {
102 73

  
103
	private Queue<MapDocument> simplifyQueue(final Queue<MapDocument> queue, final String ngram, final Context context) {
104
		final Queue<MapDocument> q = new LinkedList<MapDocument>();
105

  
106
		String fieldRef = "";
107
		final List<MapDocument> tempResults = Lists.newArrayList();
108

  
109
		while (!queue.isEmpty()) {
110
			final MapDocument result = queue.remove();
111

  
112
			final String orderFieldName = dedupConf.getWf().getOrderField();
113
			final Field orderFieldValue = result.values(orderFieldName);
114
			if (!orderFieldValue.isEmpty()) {
115
				final String field = NGramUtils.cleanupForOrdering(orderFieldValue.stringValue());
116
				if (field.equals(fieldRef)) {
117
					tempResults.add(result);
118
				} else {
119
					populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
120
					tempResults.clear();
121
					tempResults.add(result);
122
					fieldRef = field;
123
				}
124
			} else {
125
				context.getCounter(dedupConf.getWf().getEntityType(), "missing " + dedupConf.getWf().getOrderField()).increment(1);
74
				emitRel(context, type, from, to);
75
				emitRel(context, type, to, from);
126 76
			}
127
		}
128
		populateSimplifiedQueue(q, tempResults, context, fieldRef, ngram);
129 77

  
130
		return q;
131
	}
132

  
133
	private void populateSimplifiedQueue(final Queue<MapDocument> q,
134
			final List<MapDocument> tempResults,
135
			final Context context,
136
			final String fieldRef,
137
			final String ngram) {
138
		final WfConfig wfConf = dedupConf.getWf();
139
		if (tempResults.size() < wfConf.getGroupMaxSize()) {
140
			q.addAll(tempResults);
141
		} else {
142
			context.getCounter(wfConf.getEntityType(),
143
					"Skipped records for count(" + wfConf.getOrderField() + ") >= " + wfConf.getGroupMaxSize())
144
					.increment(tempResults.size());
145
			log.info("Skipped field: " + fieldRef + " - size: " + tempResults.size() + " - ngram: " + ngram);
146
		}
147
	}
148

  
149
	private void process(final Queue<MapDocument> queue, final Context context) throws IOException, InterruptedException {
150

  
151
		final PaceDocumentDistance algo = new PaceDocumentDistance();
152

  
153
		while (!queue.isEmpty()) {
154

  
155
			final MapDocument pivot = queue.remove();
156
			final String idPivot = pivot.getIdentifier();
157

  
158
			final Field fieldsPivot = pivot.values(dedupConf.getWf().getOrderField());
159
			final String fieldPivot = (fieldsPivot == null) || fieldsPivot.isEmpty() ? null : fieldsPivot.stringValue();
160

  
161
			if (fieldPivot != null) {
162
				// System.out.println(idPivot + " --> " + fieldPivot);
163

  
164
				int i = 0;
165
				for (final MapDocument curr : queue) {
166
					final String idCurr = curr.getIdentifier();
167

  
168
					if (mustSkip(idCurr)) {
169
						context.getCounter(dedupConf.getWf().getEntityType(), "skip list").increment(1);
170
						break;
171
					}
172

  
173
					if (i > dedupConf.getWf().getSlidingWindowSize()) {
174
						break;
175
					}
176

  
177
					final Field fieldsCurr = curr.values(dedupConf.getWf().getOrderField());
178
					final String fieldCurr = (fieldsCurr == null) || fieldsCurr.isEmpty() ? null : fieldsCurr.stringValue();
179

  
180
					if (!idCurr.equals(idPivot) && (fieldCurr != null)) {
181

  
182
						final ScoreResult sr = similarity(algo, pivot, curr);
183
						emitOutput(sr, idPivot, idCurr, context);
184
						i++;
185
					}
78
			private void emitRel(final Context context, final String type, final String from, final String to) {
79
				final Put put = new Put(Bytes.toBytes(from)).add(DedupUtils.getSimilarityCFBytes(type), Bytes.toBytes(to), Bytes.toBytes(""));
80
				put.setDurability(Durability.SKIP_WAL);
81
				ibw.set(Bytes.toBytes(from));
82
				try {
83
					context.write(ibw, put);
84
				} catch (IOException | InterruptedException e) {
85
					e.printStackTrace();
186 86
				}
187 87
			}
188
		}
88
		});
189 89
	}
190 90

  
191
	private void emitOutput(final ScoreResult sr, final String idPivot, final String idCurr, final Context context) throws IOException, InterruptedException {
192
		final double d = sr.getScore();
193

  
194
		if (d >= dedupConf.getWf().getThreshold()) {
195
			writeSimilarity(context, idPivot, idCurr, d);
196
			context.getCounter(dedupConf.getWf().getEntityType(), SubRelType.dedupSimilarity.toString() + " (x2)").increment(1);
197
		} else {
198
			context.getCounter(dedupConf.getWf().getEntityType(), "d < " + dedupConf.getWf().getThreshold()).increment(1);
199
		}
200
	}
201

  
202
	private ScoreResult similarity(final PaceDocumentDistance algo, final MapDocument a, final MapDocument b) {
203
		try {
204
			return algo.between(a, b, dedupConf);
205
		} catch(Throwable e) {
206
			log.error(String.format("\nA: %s\n----------------------\nB: %s", a, b), e);
207
			throw new IllegalArgumentException(e);
208
		}
209
	}
210

  
211
	private boolean mustSkip(final String idPivot) {
212
		return dedupConf.getWf().getSkipList().contains(getNsPrefix(idPivot));
213
	}
214

  
215
	private String getNsPrefix(final String id) {
216
		return StringUtils.substringBetween(id, "|", "::");
217
	}
218

  
219
	private void writeSimilarity(final Context context, final String idPivot, final String id, final double d) throws IOException, InterruptedException {
220
		final byte[] rowKey = Bytes.toBytes(idPivot);
221
		final byte[] target = Bytes.toBytes(id);
222

  
223
		//log.info("writing similarity: " + idPivot + " <-> " + id);
224

  
225
		emitRel(context, rowKey, target, d);
226
		emitRel(context, target, rowKey, d);
227
	}
228

  
229
	private void emitRel(final Context context, final byte[] from, final byte[] to, final double d) throws IOException, InterruptedException {
230
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
231

  
232
		//final OafRel.Builder rel = DedupUtils.getDedupSimilarity(dedupConf, new String(from), new String(to));
233
		//final Oaf.Builder oaf = DedupUtils.buildRel(dedupConf, rel, d);
234

  
235
		//final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, oaf.build().toByteArray());
236
		final Put put = new Put(from).add(DedupUtils.getSimilarityCFBytes(type), to, Bytes.toBytes(""));
237
		put.setDurability(Durability.SKIP_WAL);
238
		ibw.set(from);
239
		context.write(ibw, put);
240
	}
241 91
}
modules/dnet-mapreduce-jobs/branches/master/pom.xml
143 143
		<dependency>
144 144
			<groupId>eu.dnetlib</groupId>
145 145
			<artifactId>dnet-openaireplus-mapping-utils</artifactId>
146
			<version>[6.2.16]</version>
146
			<version>[6.2.17]</version>
147 147
		</dependency>
148 148
		<dependency>
149 149
			<groupId>org.antlr</groupId>
......
161 161
			<version>1.2.1</version>
162 162
		</dependency>
163 163
		<dependency>
164
			<groupId>eu.dnetlib</groupId>
165
			<artifactId>dnet-pace-core</artifactId>
166
			<version>[2.0.0,3.0.0)</version>
167
		</dependency>
168
		<dependency>
169 164
			<groupId>org.mongodb</groupId>
170 165
			<artifactId>mongo-java-driver</artifactId>
171 166
			<version>${mongodb.driver.version}</version>

Also available in: Unified diff