Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
13
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.data.transform.OafEntityMerger;
16
import eu.dnetlib.pace.config.DedupConfig;
17
import org.apache.commons.lang.StringUtils;
18
import org.apache.hadoop.hbase.client.Durability;
19
import org.apache.hadoop.hbase.client.Put;
20
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
21
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22
import org.apache.hadoop.hbase.mapreduce.TableReducer;
23
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.apache.hadoop.io.Text;
26

    
27
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
28

    
29
	private enum OafPatch {
30
		rootToEntity, entityToRoot
31
	}
32

    
33
	private DedupConfig dedupConf;
34

    
35
	private RelClasses relClasses;
36

    
37
	@Override
38
	protected void setup(final Context context) throws IOException, InterruptedException {
39
		super.setup(context);
40
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
41
		System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString());
42

    
43
		final String relClassJson = context.getConfiguration().get("relClasses");
44
		System.out.println("relClassesJson:\n" + relClassJson);
45
		relClasses = RelClasses.fromJSon(relClassJson);
46
		System.out.println("relClasses:\n" + relClasses);
47
	}
48

    
49
	@Override
50
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
51

    
52
		// ensures we're dealing with a root, otherwise returns
53
		if (!DedupUtils.isRoot(key.toString())) {
54
			System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
55
			context.getCounter("DedupBuildRootsReducer", "aborted").increment(1);
56
			return;
57
		}
58

    
59
		final byte[] rowkey = Bytes.toBytes(key.toString());
60
		final List<Oaf> entities = Lists.newArrayList();
61

    
62
		for (final Oaf oaf : toOaf(values)) {
63
			switch (oaf.getKind()) {
64
			case entity:
65
				entities.add(oaf);
66
				break;
67
			case relation:
68
				handleRels(context, rowkey, oaf);
69
				break;
70
			default:
71
				break;
72
			}
73
		}
74

    
75
		// build and emit the root body
76
		final Oaf.Builder builder = OafEntityMerger.merge(dedupConf, key.toString(), entities);
77
		if (entities.size() < JobParams.MAX_COUNTERS) {
78
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(entities.size())).increment(1);
79
		} else {
80
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
81
		}
82

    
83
		emit(builder.getEntity().getType().toString(),
84
				context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
85

    
86
	}
87

    
88
	private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) {
89
		return Iterables.transform(values, OafHbaseUtils.oafDecoder());
90
	}
91

    
92
	private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf) throws IOException, InterruptedException {
93

    
94
		// emit relation from the root to the related entities
95
		OafDecoder decoder = rootToEntity(rowkey, oaf);
96
		emit("emit relation from the root to the related entities",
97
				context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
98

    
99
		// emit relation from the related entities to the root
100
		decoder = entityToRoot(rowkey, oaf);
101
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
102
		emit("emit relation from the related entities to the root",
103
				context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
104

    
105
		// mark relation from the related entities to the duplicate as deleted
106
		decoder = markDeleted(oaf, true);
107
		revKey = Bytes.toBytes(decoder.relSourceId());
108
		emit("mark relation from the related entities to the duplicate as deleted",
109
				context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
110

    
111
		// mark relation from the related entities to the duplicate as deleted
112
		decoder = markDeleted(oaf, false);
113
		revKey = Bytes.toBytes(decoder.relSourceId());
114
		emit("mark relation from the related entities to the duplicate as deleted",
115
				context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
116
	}
117

    
118
	private void emit(final String msg, final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label) {
119

    
120
		final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
121
		put.setDurability(Durability.USE_DEFAULT);
122

    
123
		try {
124
			context.write(new ImmutableBytesWritable(rowkey), put);
125
		} catch (Throwable e) {
126
			System.err.println(
127
					String.format("%s, rowkey %s, family %s, qualifier %s",
128
							msg, new String(rowkey), family, qualifier));
129
			throw new RuntimeException(e);
130
		}
131

    
132
		context.getCounter(family, label).increment(1);
133
	}
134

    
135
	// /////////////////
136

    
137
	private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) {
138
		return patchRelations(rootRowkey, rel, OafPatch.rootToEntity);
139
	}
140

    
141
	private OafDecoder entityToRoot(final byte[] rootRowkey, final Oaf rel) {
142
		return patchRelations(rootRowkey, rel, OafPatch.entityToRoot);
143
	}
144

    
145
	private OafDecoder markDeleted(final Oaf rel, final boolean reverse) {
146
		return deleteRelations(rel, reverse);
147
	}
148

    
149
	// patches relation objects setting the source field with the root id
150
	private OafDecoder patchRelations(final byte[] rootRowkey, final Oaf rel, final OafPatch patchKind) {
151
		final String id = new String(rootRowkey);
152
		final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
153
		final Oaf.Builder builder = Oaf.newBuilder(rel);
154
		builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false);
155
		switch (patchKind) {
156
		case rootToEntity:
157
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
158
			builder.getRelBuilder().setSource(id);
159
			break;
160

    
161
		case entityToRoot:
162
			final String relClass = rel.getRel().getRelClass();
163
			/*
164
			if(StringUtils.isBlank(relClass)) {
165
				throw new IllegalStateException(String.format("missing relation term for %s in row %s", rel.getRel().getRelType().name(), id));
166
			}
167
			*/
168
			final String inverse = relClasses.getInverse(relClass);
169
			if(StringUtils.isBlank(inverse)) {
170
				throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id));
171
			}
172
			builder.setRel(decoder.setClassId(inverse));
173
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
174
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
175
			builder.getRelBuilder().setTarget(id);
176
			break;
177

    
178
		default:
179
			throw new IllegalStateException(String.format("case not implemented for %s", patchKind.toString()));
180
		}
181

    
182
		return OafDecoder.decode(builder.build());
183
	}
184

    
185
	private OafDecoder deleteRelations(final Oaf rel, final boolean reverse) {
186
		final Oaf.Builder builder = Oaf.newBuilder(rel);
187
		// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots d: " + reverse + ")");
188
		builder.getDataInfoBuilder().setDeletedbyinference(true);
189

    
190
		if (reverse) {
191
			final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
192
			builder.setRel(decoder.setClassId(relClasses.getInverse(rel.getRel().getRelClass())));
193
			// swap source and target
194
			final String tmp = builder.getRel().getSource();
195
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
196
			builder.getRelBuilder().setTarget(tmp);
197
		}
198

    
199
		return OafDecoder.decode(builder.build());
200
	}
201

    
202
	private String lpad(final int s) {
203
		return StringUtils.leftPad(String.valueOf(s), 5);
204
	}
205

    
206
}
(2-2/16)