Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import org.apache.commons.lang.StringUtils;
7
import org.apache.hadoop.hbase.client.Put;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableReducer;
10
import org.apache.hadoop.hbase.util.Bytes;
11
import org.apache.hadoop.io.Text;
12

    
13
import com.google.common.collect.Iterables;
14
import com.google.common.collect.Lists;
15
import com.google.protobuf.InvalidProtocolBufferException;
16

    
17
import eu.dnetlib.data.mapreduce.JobParams;
18
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
19
import eu.dnetlib.data.mapreduce.util.DedupUtils;
20
import eu.dnetlib.data.mapreduce.util.OafDecoder;
21
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
22
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
23
import eu.dnetlib.data.proto.OafProtos.Oaf;
24
import eu.dnetlib.data.proto.RelTypeProtos.RelType;
25
import eu.dnetlib.data.proto.TypeProtos.Type;
26
import eu.dnetlib.data.transform.OafEntityMerger;
27
import eu.dnetlib.pace.util.DedupConfig;
28
import eu.dnetlib.pace.util.DedupConfigLoader;
29

    
30
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
31

    
32
	private enum OafPatch {
33
		rootToEntity, entityToRoot
34
	}
35

    
36
	private static final boolean WRITE_TO_WAL = false;
37

    
38
	private DedupConfig dedupConf;
39

    
40
	private RelClasses relClasses;
41

    
42
	@Override
43
	protected void setup(final Context context) throws IOException, InterruptedException {
44
		super.setup(context);
45
		dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
46
		System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString());
47

    
48
		final String relClassJson = context.getConfiguration().get("relClasses");
49
		System.out.println("relClassesJson:\n" + relClassJson);
50
		relClasses = RelClasses.fromJSon(relClassJson);
51
		System.out.println("relClasses:\n" + relClasses);
52
	}
53

    
54
	@Override
55
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
56

    
57
		// ensures we're dealing with a root, otherwise returns
58
		if (!DedupUtils.isRoot(key.toString())) {
59
			System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
60
			context.getCounter("DedupBuildRootsReducer", "aborted").increment(1);
61
			return;
62
		}
63

    
64
		final byte[] rowkey = Bytes.toBytes(key.toString());
65
		final List<Oaf> entities = Lists.newArrayList();
66

    
67
		for (final Oaf oaf : toOaf(values)) {
68
			switch (oaf.getKind()) {
69
			case entity:
70
				entities.add(oaf);
71
				break;
72
			case relation:
73
				handleRels(context, rowkey, oaf, true);
74
				break;
75
			default:
76
				break;
77
			}
78
		}
79

    
80
		// build and emit the root body
81
		final Oaf.Builder builder = OafEntityMerger.merge(dedupConf, key.toString(), entities);
82
		if (entities.size() < JobParams.MAX_COUNTERS) {
83
			context.getCounter(dedupConf.getEntityType() + " root group size", lpad(entities.size())).increment(1);
84
		} else {
85
			context.getCounter(dedupConf.getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
86
		}
87

    
88
		emit(context, rowkey, dedupConf.getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
89

    
90
		// add person rels TODO: remove this hack
91
		// context.getCounter("hack", "personResult out").increment(personMap.size());
92

    
93
	}
94

    
95
	private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) {
96
		return Iterables.transform(values, OafHbaseUtils.oafDecoder());
97
	}
98

    
99
	private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf, final boolean hack) throws IOException, InterruptedException,
100
	InvalidProtocolBufferException {
101

    
102
		if (hack && checkHack(new String(rowkey), oaf)) {
103
			context.getCounter("hack", "personResult in").increment(1);
104
		} else {
105

    
106
			// emit relation from the root to the related entities
107
			OafDecoder decoder = rootToEntity(rowkey, oaf);
108
			emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
109

    
110
			// emit relation from the related entities to the root
111
			decoder = entityToRoot(rowkey, oaf);
112
			final byte[] revKey = Bytes.toBytes(decoder.relSourceId());
113
			emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
114
		}
115
		// mark relation from the related entities to the duplicate as deleted
116
		OafDecoder decoder = markDeleted(oaf, true);
117
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
118
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
119

    
120
		// mark relation from the related entities to the duplicate as deleted
121
		decoder = markDeleted(oaf, false);
122
		revKey = Bytes.toBytes(decoder.relSourceId());
123
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
124
	}
125

    
126
	private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
127
			throws IOException, InterruptedException {
128
		final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
129
		put.setWriteToWAL(WRITE_TO_WAL);
130
		context.write(new ImmutableBytesWritable(rowkey), put);
131
		context.getCounter(family, label).increment(1);
132
	}
133

    
134
	// /////////////////
135

    
136
	public boolean checkHack(final String root, final Oaf oaf) {
137

    
138
		boolean res;
139
		if (dedupConf.getEntityType().equals(Type.result.toString()) && oaf.getRel().getRelType().toString().equals(RelType.personResult.toString())
140
				&& !md5matches(root, oaf.getRel().getSource())) {
141

    
142
			res = true;
143
		} else {
144
			res = false;
145
		}
146

    
147
		// if (root.equals("50|dedup_wf_001::92f6197ea6f16ae554755aced832fb6f")) {
148
		// System.out.println("##################");
149
		// System.out.println("root  : " + root);
150
		// System.out.println("source: " + oaf.getRel().getSource());
151
		// System.out.println("ckeck:  " + res);
152
		// }
153

    
154
		return res;
155
	}
156

    
157
	private boolean md5matches(final String id1, final String id2) {
158
		return id1.replaceAll("^.*\\:\\:", "").equals(id2.replaceAll("^.*\\:\\:", ""));
159
	}
160

    
161
	private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) {
162
		return patchRelations(rootRowkey, rel, OafPatch.rootToEntity);
163
	}
164

    
165
	private OafDecoder entityToRoot(final byte[] rootRowkey, final Oaf rel) {
166
		return patchRelations(rootRowkey, rel, OafPatch.entityToRoot);
167
	}
168

    
169
	private OafDecoder markDeleted(final Oaf rel, final boolean reverse) {
170
		return deleteRelations(rel, reverse);
171
	}
172

    
173
	// patches relation objects setting the source field with the root id
174
	private OafDecoder patchRelations(final byte[] rootRowkey, final Oaf rel, final OafPatch patchKind) {
175
		final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
176
		final Oaf.Builder builder = Oaf.newBuilder(rel);
177
		builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false);
178
		switch (patchKind) {
179
		case rootToEntity:
180
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
181
			builder.getRelBuilder().setSource(new String(rootRowkey));
182
			break;
183

    
184
		case entityToRoot:
185
			builder.setRel(decoder.setClassId(relClasses.getInverse(rel.getRel().getRelClass())));
186
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
187
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
188
			builder.getRelBuilder().setTarget(new String(rootRowkey));
189
			break;
190

    
191
		default:
192
			break;
193
		}
194

    
195
		return OafDecoder.decode(builder.build());
196
	}
197

    
198
	private OafDecoder deleteRelations(final Oaf rel, final boolean reverse) {
199
		final Oaf.Builder builder = Oaf.newBuilder(rel);
200
		// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots d: " + reverse + ")");
201
		builder.getDataInfoBuilder().setDeletedbyinference(true);
202

    
203
		if (reverse) {
204
			final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
205
			builder.setRel(decoder.setClassId(relClasses.getInverse(rel.getRel().getRelClass())));
206
			// swap source and target
207
			final String tmp = builder.getRel().getSource();
208
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
209
			builder.getRelBuilder().setTarget(tmp);
210
		}
211

    
212
		return OafDecoder.decode(builder.build());
213
	}
214

    
215
	private String lpad(final int s) {
216
		return StringUtils.leftPad(String.valueOf(s), 5);
217
	}
218

    
219
}
(3-3/23)