Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
13
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.data.transform.OafEntityMerger;
16
import eu.dnetlib.pace.config.DedupConfig;
17
import org.apache.commons.lang.StringUtils;
18
import org.apache.hadoop.hbase.client.Put;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.hbase.mapreduce.TableReducer;
21
import org.apache.hadoop.hbase.util.Bytes;
22
import org.apache.hadoop.io.Text;
23

    
24
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
25

    
26
	private enum OafPatch {
27
		rootToEntity, entityToRoot
28
	}
29

    
30
	private DedupConfig dedupConf;
31

    
32
	private RelClasses relClasses;
33

    
34
	@Override
35
	protected void setup(final Context context) throws IOException, InterruptedException {
36
		super.setup(context);
37
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
38
		System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString());
39

    
40
		final String relClassJson = context.getConfiguration().get("relClasses");
41
		System.out.println("relClassesJson:\n" + relClassJson);
42
		relClasses = RelClasses.fromJSon(relClassJson);
43
		System.out.println("relClasses:\n" + relClasses);
44
	}
45

    
46
	@Override
47
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
48

    
49
		// ensures we're dealing with a root, otherwise returns
50
		if (!DedupUtils.isRoot(key.toString())) {
51
			System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
52
			context.getCounter("DedupBuildRootsReducer", "aborted").increment(1);
53
			return;
54
		}
55

    
56
		final byte[] rowkey = Bytes.toBytes(key.toString());
57
		final List<Oaf> entities = Lists.newArrayList();
58

    
59
		for (final Oaf oaf : toOaf(values)) {
60
			switch (oaf.getKind()) {
61
			case entity:
62
				entities.add(oaf);
63
				break;
64
			case relation:
65
				handleRels(context, rowkey, oaf);
66
				break;
67
			default:
68
				break;
69
			}
70
		}
71

    
72
		// build and emit the root body
73
		final Oaf.Builder builder = OafEntityMerger.merge(dedupConf, key.toString(), entities);
74
		if (entities.size() < JobParams.MAX_COUNTERS) {
75
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(entities.size())).increment(1);
76
		} else {
77
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
78
		}
79

    
80
		emit(context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
81

    
82
	}
83

    
84
	private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) {
85
		return Iterables.transform(values, OafHbaseUtils.oafDecoder());
86
	}
87

    
88
	private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf) throws IOException, InterruptedException {
89

    
90
		// emit relation from the root to the related entities
91
		OafDecoder decoder = rootToEntity(rowkey, oaf);
92
		emit(context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
93

    
94
		// emit relation from the related entities to the root
95
		decoder = entityToRoot(rowkey, oaf);
96
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
97
		emit(context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
98

    
99
		// mark relation from the related entities to the duplicate as deleted
100
		decoder = markDeleted(oaf, true);
101
		revKey = Bytes.toBytes(decoder.relSourceId());
102
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
103

    
104
		// mark relation from the related entities to the duplicate as deleted
105
		decoder = markDeleted(oaf, false);
106
		revKey = Bytes.toBytes(decoder.relSourceId());
107
		emit(context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
108
	}
109

    
110
	private void emit(final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
111
			throws IOException, InterruptedException {
112
		final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
113
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
114
		context.write(new ImmutableBytesWritable(rowkey), put);
115
		context.getCounter(family, label).increment(1);
116
	}
117

    
118
	// /////////////////
119

    
120
	private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) {
121
		return patchRelations(rootRowkey, rel, OafPatch.rootToEntity);
122
	}
123

    
124
	private OafDecoder entityToRoot(final byte[] rootRowkey, final Oaf rel) {
125
		return patchRelations(rootRowkey, rel, OafPatch.entityToRoot);
126
	}
127

    
128
	private OafDecoder markDeleted(final Oaf rel, final boolean reverse) {
129
		return deleteRelations(rel, reverse);
130
	}
131

    
132
	// patches relation objects setting the source field with the root id
133
	private OafDecoder patchRelations(final byte[] rootRowkey, final Oaf rel, final OafPatch patchKind) {
134
		final String id = new String(rootRowkey);
135
		final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
136
		final Oaf.Builder builder = Oaf.newBuilder(rel);
137
		builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false);
138
		switch (patchKind) {
139
		case rootToEntity:
140
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
141
			builder.getRelBuilder().setSource(id);
142
			break;
143

    
144
		case entityToRoot:
145
			final String relClass = rel.getRel().getRelClass();
146
			/*
147
			if(StringUtils.isBlank(relClass)) {
148
				throw new IllegalStateException(String.format("missing relation term for %s in row %s", rel.getRel().getRelType().name(), id));
149
			}
150
			*/
151
			final String inverse = relClasses.getInverse(relClass);
152
			if(StringUtils.isBlank(inverse)) {
153
				throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id));
154
			}
155
			builder.setRel(decoder.setClassId(inverse));
156
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
157
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
158
			builder.getRelBuilder().setTarget(id);
159
			break;
160

    
161
		default:
162
			break;
163
		}
164

    
165
		return OafDecoder.decode(builder.build());
166
	}
167

    
168
	private OafDecoder deleteRelations(final Oaf rel, final boolean reverse) {
169
		final Oaf.Builder builder = Oaf.newBuilder(rel);
170
		// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots d: " + reverse + ")");
171
		builder.getDataInfoBuilder().setDeletedbyinference(true);
172

    
173
		if (reverse) {
174
			final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
175
			builder.setRel(decoder.setClassId(relClasses.getInverse(rel.getRel().getRelClass())));
176
			// swap source and target
177
			final String tmp = builder.getRel().getSource();
178
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
179
			builder.getRelBuilder().setTarget(tmp);
180
		}
181

    
182
		return OafDecoder.decode(builder.build());
183
	}
184

    
185
	private String lpad(final int s) {
186
		return StringUtils.leftPad(String.valueOf(s), 5);
187
	}
188

    
189
}
(2-2/16)