Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
13
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.data.proto.TypeProtos.Type;
16
import eu.dnetlib.data.transform.OafEntityMerger;
17
import eu.dnetlib.pace.config.DedupConfig;
18
import org.apache.commons.lang.StringUtils;
19
import org.apache.hadoop.hbase.client.Put;
20
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
21
import org.apache.hadoop.hbase.mapreduce.TableReducer;
22
import org.apache.hadoop.hbase.util.Bytes;
23
import org.apache.hadoop.io.Text;
24

    
25
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
26

    
27
	private enum OafPatch {
28
		rootToEntity, entityToRoot
29
	}
30

    
31
	private DedupConfig dedupConf;
32

    
33
	private RelClasses relClasses;
34

    
35
	@Override
36
	protected void setup(final Context context) throws IOException, InterruptedException {
37
		super.setup(context);
38
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
39
		System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString());
40

    
41
		final String relClassJson = context.getConfiguration().get("relClasses");
42
		System.out.println("relClassesJson:\n" + relClassJson);
43
		relClasses = RelClasses.fromJSon(relClassJson);
44
		System.out.println("relClasses:\n" + relClasses);
45
	}
46

    
47
	@Override
48
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
49

    
50
		// ensures we're dealing with a root, otherwise returns
51
		if (!DedupUtils.isRoot(key.toString())) {
52
			System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
53
			context.getCounter("DedupBuildRootsReducer", "aborted").increment(1);
54
			return;
55
		}
56

    
57
		final byte[] rowkey = Bytes.toBytes(key.toString());
58
		final List<Oaf> entities = Lists.newArrayList();
59

    
60
		for (final Oaf oaf : toOaf(values)) {
61
			switch (oaf.getKind()) {
62
			case entity:
63
				entities.add(oaf);
64
				break;
65
			case relation:
66
				handleRels(context, rowkey, oaf, true);
67
				break;
68
			default:
69
				break;
70
			}
71
		}
72

    
73
		// build and emit the root body
74
		final Oaf.Builder builder = OafEntityMerger.merge(dedupConf, key.toString(), entities);
75
		if (entities.size() < JobParams.MAX_COUNTERS) {
76
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(entities.size())).increment(1);
77
		} else {
78
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
79
		}
80

    
81
		emit(context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
82

    
83
		// add person rels TODO: remove this hack
84
		// context.getCounter("hack", "personResult out").increment(personMap.size());
85

    
86
	}
87

    
88
	private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) {
89
		return Iterables.transform(values, OafHbaseUtils.oafDecoder());
90
	}
91

    
92
	private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf, final boolean hack) throws IOException, InterruptedException {
93

    
94
		if (hack && checkHack(new String(rowkey), oaf)) {
95
			context.getCounter("hack", "personResult in").increment(1);
96
		} else {
97

    
98
			// emit relation from the root to the related entities
99
			OafDecoder decoder = rootToEntity(rowkey, oaf);
100
			emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
101

    
102
			// emit relation from the related entities to the root
103
			decoder = entityToRoot(rowkey, oaf);
104
			final byte[] revKey = Bytes.toBytes(decoder.relSourceId());
105
			emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
106
		}
107
		// mark relation from the related entities to the duplicate as deleted
108
		OafDecoder decoder = markDeleted(oaf, true);
109
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
110
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
111

    
112
		// mark relation from the related entities to the duplicate as deleted
113
		decoder = markDeleted(oaf, false);
114
		revKey = Bytes.toBytes(decoder.relSourceId());
115
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
116
	}
117

    
118
	private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
119
			throws IOException, InterruptedException {
120
		final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
121
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
122
		context.write(new ImmutableBytesWritable(rowkey), put);
123
		context.getCounter(family, label).increment(1);
124
	}
125

    
126
	// /////////////////
127

    
128
	public boolean checkHack(final String root, final Oaf oaf) {
129

    
130
		boolean res;
131
		if (dedupConf.getWf().getEntityType().equals(Type.result.toString()) && !md5matches(root, oaf.getRel().getSource())) {
132

    
133
			res = true;
134
		} else {
135
			res = false;
136
		}
137

    
138
		// if (root.equals("50|dedup_wf_001::92f6197ea6f16ae554755aced832fb6f")) {
139
		// System.out.println("##################");
140
		// System.out.println("root  : " + root);
141
		// System.out.println("source: " + oaf.getRel().getSource());
142
		// System.out.println("ckeck:  " + res);
143
		// }
144

    
145
		return res;
146
	}
147

    
148
	private boolean md5matches(final String id1, final String id2) {
149
		return id1.replaceAll("^.*\\:\\:", "").equals(id2.replaceAll("^.*\\:\\:", ""));
150
	}
151

    
152
	private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) {
153
		return patchRelations(rootRowkey, rel, OafPatch.rootToEntity);
154
	}
155

    
156
	private OafDecoder entityToRoot(final byte[] rootRowkey, final Oaf rel) {
157
		return patchRelations(rootRowkey, rel, OafPatch.entityToRoot);
158
	}
159

    
160
	private OafDecoder markDeleted(final Oaf rel, final boolean reverse) {
161
		return deleteRelations(rel, reverse);
162
	}
163

    
164
	// patches relation objects setting the source field with the root id
165
	private OafDecoder patchRelations(final byte[] rootRowkey, final Oaf rel, final OafPatch patchKind) {
166
		final String id = new String(rootRowkey);
167
		final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
168
		final Oaf.Builder builder = Oaf.newBuilder(rel);
169
		builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false);
170
		switch (patchKind) {
171
		case rootToEntity:
172
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
173
			builder.getRelBuilder().setSource(id);
174
			break;
175

    
176
		case entityToRoot:
177
			final String relClass = rel.getRel().getRelClass();
178
			/*
179
			if(StringUtils.isBlank(relClass)) {
180
				throw new IllegalStateException(String.format("missing relation term for %s in row %s", rel.getRel().getRelType().name(), id));
181
			}
182
			*/
183
			final String inverse = relClasses.getInverse(relClass);
184
			if(StringUtils.isBlank(inverse)) {
185
				throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id));
186
			}
187
			builder.setRel(decoder.setClassId(inverse));
188
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
189
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
190
			builder.getRelBuilder().setTarget(id);
191
			break;
192

    
193
		default:
194
			break;
195
		}
196

    
197
		return OafDecoder.decode(builder.build());
198
	}
199

    
200
	private OafDecoder deleteRelations(final Oaf rel, final boolean reverse) {
201
		final Oaf.Builder builder = Oaf.newBuilder(rel);
202
		// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots d: " + reverse + ")");
203
		builder.getDataInfoBuilder().setDeletedbyinference(true);
204

    
205
		if (reverse) {
206
			final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
207
			builder.setRel(decoder.setClassId(relClasses.getInverse(rel.getRel().getRelClass())));
208
			// swap source and target
209
			final String tmp = builder.getRel().getSource();
210
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
211
			builder.getRelBuilder().setTarget(tmp);
212
		}
213

    
214
		return OafDecoder.decode(builder.build());
215
	}
216

    
217
	private String lpad(final int s) {
218
		return StringUtils.leftPad(String.valueOf(s), 5);
219
	}
220

    
221
}
(2-2/16)