Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.mapreduce.hbase.dedup;
2
3
import java.io.IOException;
4
import java.util.List;
5
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8 29702 claudio.at
import eu.dnetlib.data.mapreduce.JobParams;
9 28094 claudio.at
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11 26600 sandro.lab
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12 35128 claudio.at
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
13 28094 claudio.at
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
14 26600 sandro.lab
import eu.dnetlib.data.proto.OafProtos.Oaf;
15 35128 claudio.at
import eu.dnetlib.data.transform.OafEntityMerger;
16 36670 claudio.at
import eu.dnetlib.pace.config.DedupConfig;
17 49029 claudio.at
import org.apache.commons.lang.StringUtils;
18 52985 claudio.at
import org.apache.hadoop.hbase.client.Durability;
19 49029 claudio.at
import org.apache.hadoop.hbase.client.Put;
20 52985 claudio.at
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
21 49029 claudio.at
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22
import org.apache.hadoop.hbase.mapreduce.TableReducer;
23 52985 claudio.at
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
24 49029 claudio.at
import org.apache.hadoop.hbase.util.Bytes;
25
import org.apache.hadoop.io.Text;
26 26600 sandro.lab
27
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
28
29
	private enum OafPatch {
30
		rootToEntity, entityToRoot
31
	}
32
33
	private DedupConfig dedupConf;
34
35 28094 claudio.at
	private RelClasses relClasses;
36
37 26600 sandro.lab
	@Override
38 28308 claudio.at
	protected void setup(final Context context) throws IOException, InterruptedException {
39 26600 sandro.lab
		super.setup(context);
40 36670 claudio.at
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
41 28094 claudio.at
		System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString());
42
43 34225 claudio.at
		final String relClassJson = context.getConfiguration().get("relClasses");
44 28094 claudio.at
		System.out.println("relClassesJson:\n" + relClassJson);
45
		relClasses = RelClasses.fromJSon(relClassJson);
46
		System.out.println("relClasses:\n" + relClasses);
47 26600 sandro.lab
	}
48
49
	@Override
50 28308 claudio.at
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
51 26600 sandro.lab
52
		// ensures we're dealing with a root, otherwise returns
53 28094 claudio.at
		if (!DedupUtils.isRoot(key.toString())) {
54 26600 sandro.lab
			System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
55
			context.getCounter("DedupBuildRootsReducer", "aborted").increment(1);
56
			return;
57
		}
58
59 34225 claudio.at
		final byte[] rowkey = Bytes.toBytes(key.toString());
60 26600 sandro.lab
		final List<Oaf> entities = Lists.newArrayList();
61
62 34225 claudio.at
		for (final Oaf oaf : toOaf(values)) {
63 26600 sandro.lab
			switch (oaf.getKind()) {
64
			case entity:
65
				entities.add(oaf);
66
				break;
67
			case relation:
68 50270 claudio.at
				handleRels(context, rowkey, oaf);
69 26600 sandro.lab
				break;
70
			default:
71
				break;
72
			}
73
		}
74
75
		// build and emit the root body
76 36157 claudio.at
		final Oaf.Builder builder = OafEntityMerger.merge(dedupConf, key.toString(), entities);
77 29702 claudio.at
		if (entities.size() < JobParams.MAX_COUNTERS) {
78 36670 claudio.at
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(entities.size())).increment(1);
79 29702 claudio.at
		} else {
80 36670 claudio.at
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
81 29702 claudio.at
		}
82
83 52985 claudio.at
		emit(builder.getEntity().getType().toString(),
84
				context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
85 26600 sandro.lab
86
	}
87
88 28308 claudio.at
	private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) {
89 35128 claudio.at
		return Iterables.transform(values, OafHbaseUtils.oafDecoder());
90 26600 sandro.lab
	}
91
92 50270 claudio.at
	private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf) throws IOException, InterruptedException {
93 26600 sandro.lab
94 50270 claudio.at
		// emit relation from the root to the related entities
95
		OafDecoder decoder = rootToEntity(rowkey, oaf);
96 52985 claudio.at
		emit("emit relation from the root to the related entities",
97
				context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
98 26600 sandro.lab
99 50270 claudio.at
		// emit relation from the related entities to the root
100
		decoder = entityToRoot(rowkey, oaf);
101
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
102 52985 claudio.at
		emit("emit relation from the related entities to the root",
103
				context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
104 26600 sandro.lab
105
		// mark relation from the related entities to the duplicate as deleted
106 50270 claudio.at
		decoder = markDeleted(oaf, true);
107
		revKey = Bytes.toBytes(decoder.relSourceId());
108 52985 claudio.at
		emit("mark relation from the related entities to the duplicate as deleted",
109
				context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
110 26600 sandro.lab
111
		// mark relation from the related entities to the duplicate as deleted
112
		decoder = markDeleted(oaf, false);
113
		revKey = Bytes.toBytes(decoder.relSourceId());
114 52985 claudio.at
		emit("mark relation from the related entities to the duplicate as deleted",
115
				context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
116 26600 sandro.lab
	}
117
118 52985 claudio.at
	private void emit(final String msg, final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
119 28308 claudio.at
			throws IOException, InterruptedException {
120 34225 claudio.at
		final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
121 52985 claudio.at
		put.setDurability(Durability.SKIP_WAL);
122
123
		try {
124
			context.write(new ImmutableBytesWritable(rowkey), put);
125
		} catch (RetriesExhaustedWithDetailsException | NoSuchColumnFamilyException e) {
126
			System.err.println(
127
					String.format("%s, rowkey %s, family %s, qualifier %s",
128
							msg, new String(rowkey), family, qualifier));
129
			throw new RuntimeException(e);
130
		}
131
132 26600 sandro.lab
		context.getCounter(family, label).increment(1);
133
	}
134
135
	// /////////////////
136
137
	private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) {
138
		return patchRelations(rootRowkey, rel, OafPatch.rootToEntity);
139
	}
140
141
	private OafDecoder entityToRoot(final byte[] rootRowkey, final Oaf rel) {
142
		return patchRelations(rootRowkey, rel, OafPatch.entityToRoot);
143
	}
144
145 28308 claudio.at
	private OafDecoder markDeleted(final Oaf rel, final boolean reverse) {
146 26600 sandro.lab
		return deleteRelations(rel, reverse);
147
	}
148
149
	// patches relation objects setting the source field with the root id
150 28308 claudio.at
	private OafDecoder patchRelations(final byte[] rootRowkey, final Oaf rel, final OafPatch patchKind) {
151 44670 claudio.at
		final String id = new String(rootRowkey);
152 34225 claudio.at
		final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
153
		final Oaf.Builder builder = Oaf.newBuilder(rel);
154 26600 sandro.lab
		builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false);
155
		switch (patchKind) {
156
		case rootToEntity:
157 34225 claudio.at
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
158 44670 claudio.at
			builder.getRelBuilder().setSource(id);
159 26600 sandro.lab
			break;
160
161
		case entityToRoot:
162 44670 claudio.at
			final String relClass = rel.getRel().getRelClass();
163
			/*
164
			if(StringUtils.isBlank(relClass)) {
165
				throw new IllegalStateException(String.format("missing relation term for %s in row %s", rel.getRel().getRelType().name(), id));
166
			}
167
			*/
168
			final String inverse = relClasses.getInverse(relClass);
169
			if(StringUtils.isBlank(inverse)) {
170
				throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id));
171
			}
172
			builder.setRel(decoder.setClassId(inverse));
173 34225 claudio.at
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
174 26600 sandro.lab
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
175 44670 claudio.at
			builder.getRelBuilder().setTarget(id);
176 26600 sandro.lab
			break;
177
178
		default:
179
			break;
180
		}
181
182
		return OafDecoder.decode(builder.build());
183
	}
184
185 28308 claudio.at
	private OafDecoder deleteRelations(final Oaf rel, final boolean reverse) {
186 34225 claudio.at
		final Oaf.Builder builder = Oaf.newBuilder(rel);
187
		// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots d: " + reverse + ")");
188 26600 sandro.lab
		builder.getDataInfoBuilder().setDeletedbyinference(true);
189
190
		if (reverse) {
191 34225 claudio.at
			final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
192 28094 claudio.at
			builder.setRel(decoder.setClassId(relClasses.getInverse(rel.getRel().getRelClass())));
193 26600 sandro.lab
			// swap source and target
194 34225 claudio.at
			final String tmp = builder.getRel().getSource();
195 26600 sandro.lab
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
196
			builder.getRelBuilder().setTarget(tmp);
197
		}
198
199
		return OafDecoder.decode(builder.build());
200
	}
201
202 28308 claudio.at
	private String lpad(final int s) {
203 26600 sandro.lab
		return StringUtils.leftPad(String.valueOf(s), 5);
204
	}
205
206
}