Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8
import eu.dnetlib.data.mapreduce.JobParams;
9
import eu.dnetlib.data.mapreduce.hbase.index.config.RelClasses;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
13
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.data.transform.OafEntityMerger;
16
import eu.dnetlib.pace.config.DedupConfig;
17
import org.apache.commons.lang.StringUtils;
18
import org.apache.hadoop.hbase.client.Durability;
19
import org.apache.hadoop.hbase.client.Put;
20
import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException;
21
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22
import org.apache.hadoop.hbase.mapreduce.TableReducer;
23
import org.apache.hadoop.hbase.regionserver.NoSuchColumnFamilyException;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.apache.hadoop.io.Text;
26

    
27
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
28

    
29
	private enum OafPatch {
30
		rootToEntity, entityToRoot
31
	}
32

    
33
	private DedupConfig dedupConf;
34

    
35
	private RelClasses relClasses;
36

    
37
	@Override
38
	protected void setup(final Context context) throws IOException, InterruptedException {
39
		super.setup(context);
40
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
41
		System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString());
42

    
43
		final String relClassJson = context.getConfiguration().get("relClasses");
44
		System.out.println("relClassesJson:\n" + relClassJson);
45
		relClasses = RelClasses.fromJSon(relClassJson);
46
		System.out.println("relClasses:\n" + relClasses);
47
	}
48

    
49
	@Override
50
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
51

    
52
		// ensures we're dealing with a root, otherwise returns
53
		if (!DedupUtils.isRoot(key.toString())) {
54
			System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
55
			context.getCounter("DedupBuildRootsReducer", "aborted").increment(1);
56
			return;
57
		}
58

    
59
		final byte[] rowkey = Bytes.toBytes(key.toString());
60
		final List<Oaf> entities = Lists.newArrayList();
61

    
62
		for (final Oaf oaf : toOaf(values)) {
63
			switch (oaf.getKind()) {
64
			case entity:
65
				entities.add(oaf);
66
				break;
67
			case relation:
68
				handleRels(context, rowkey, oaf);
69
				break;
70
			default:
71
				break;
72
			}
73
		}
74

    
75
		// build and emit the root body
76
		final Oaf.Builder builder = OafEntityMerger.merge(dedupConf, key.toString(), entities);
77
		if (entities.size() < JobParams.MAX_COUNTERS) {
78
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(entities.size())).increment(1);
79
		} else {
80
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
81
		}
82

    
83
		emit(builder.getEntity().getType().toString(),
84
				context, rowkey, dedupConf.getWf().getEntityType(), DedupUtils.BODY_S, builder.build().toByteArray(), "root");
85

    
86
	}
87

    
88
	private Iterable<Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) {
89
		return Iterables.transform(values, OafHbaseUtils.oafDecoder());
90
	}
91

    
92
	private void handleRels(final Context context, final byte[] rowkey, final Oaf oaf) throws IOException, InterruptedException {
93

    
94
		// emit relation from the root to the related entities
95
		OafDecoder decoder = rootToEntity(rowkey, oaf);
96
		emit("emit relation from the root to the related entities",
97
				context, rowkey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "[root -> entity]");
98

    
99
		// emit relation from the related entities to the root
100
		decoder = entityToRoot(rowkey, oaf);
101
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
102
		emit("emit relation from the related entities to the root",
103
				context, revKey, decoder.getCFQ(), new String(rowkey), decoder.toByteArray(), "[entity -> root]");
104

    
105
		// mark relation from the related entities to the duplicate as deleted
106
		decoder = markDeleted(oaf, true);
107
		revKey = Bytes.toBytes(decoder.relSourceId());
108
		emit("mark relation from the related entities to the duplicate as deleted",
109
				context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [dup -> entity]");
110

    
111
		// mark relation from the related entities to the duplicate as deleted
112
		decoder = markDeleted(oaf, false);
113
		revKey = Bytes.toBytes(decoder.relSourceId());
114
		emit("mark relation from the related entities to the duplicate as deleted",
115
				context, revKey, decoder.getCFQ(), decoder.relTargetId(), decoder.toByteArray(), "mark deleted [entity -> dup]");
116
	}
117

    
118
	private void emit(final String msg, final Context context, final byte[] rowkey, final String family, final String qualifier, final byte[] value, final String label)
119
			throws IOException, InterruptedException {
120
	    if (StringUtils.startsWith(family, "resultResult_publicationDataset_isSupplement")) {
121
            System.err.println(String.format("Error find relation %s on key %s and qualifier %s, message:'%s'", family, new String(rowkey), qualifier, msg));
122
            throw new RuntimeException("SUCA!");
123

    
124
        }
125

    
126

    
127

    
128

    
129
		final Put put = new Put(rowkey).add(Bytes.toBytes(family), Bytes.toBytes(qualifier), value);
130
		put.setDurability(Durability.SKIP_WAL);
131

    
132
		try {
133
			context.write(new ImmutableBytesWritable(rowkey), put);
134
		} catch (Throwable e) {
135
			System.err.println(
136
					String.format("%s, rowkey %s, family %s, qualifier %s",
137
							msg, new String(rowkey), family, qualifier));
138
			throw new RuntimeException(e);
139
		}
140

    
141
		context.getCounter(family, label).increment(1);
142
	}
143

    
144
	// /////////////////
145

    
146
	private OafDecoder rootToEntity(final byte[] rootRowkey, final Oaf rel) {
147
		return patchRelations(rootRowkey, rel, OafPatch.rootToEntity);
148
	}
149

    
150
	private OafDecoder entityToRoot(final byte[] rootRowkey, final Oaf rel) {
151
		return patchRelations(rootRowkey, rel, OafPatch.entityToRoot);
152
	}
153

    
154
	private OafDecoder markDeleted(final Oaf rel, final boolean reverse) {
155
		return deleteRelations(rel, reverse);
156
	}
157

    
158
	// patches relation objects setting the source field with the root id
159
	private OafDecoder patchRelations(final byte[] rootRowkey, final Oaf rel, final OafPatch patchKind) {
160
		final String id = new String(rootRowkey);
161
		final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
162
		final Oaf.Builder builder = Oaf.newBuilder(rel);
163
		builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false);
164
		switch (patchKind) {
165
		case rootToEntity:
166
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
167
			builder.getRelBuilder().setSource(id);
168
			break;
169

    
170
		case entityToRoot:
171
			final String relClass = rel.getRel().getRelClass();
172
			/*
173
			if(StringUtils.isBlank(relClass)) {
174
				throw new IllegalStateException(String.format("missing relation term for %s in row %s", rel.getRel().getRelType().name(), id));
175
			}
176
			*/
177
			final String inverse = relClasses.getInverse(relClass);
178
			if(StringUtils.isBlank(inverse)) {
179
				throw new IllegalStateException(String.format("missing inverse relation for %s in row %s", relClass, id));
180
			}
181
			builder.setRel(decoder.setClassId(inverse));
182
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
183
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
184
			builder.getRelBuilder().setTarget(id);
185
			break;
186

    
187
		default:
188
			break;
189
		}
190

    
191
		return OafDecoder.decode(builder.build());
192
	}
193

    
194
	private OafDecoder deleteRelations(final Oaf rel, final boolean reverse) {
195
		final Oaf.Builder builder = Oaf.newBuilder(rel);
196
		// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots d: " + reverse + ")");
197
		builder.getDataInfoBuilder().setDeletedbyinference(true);
198

    
199
		if (reverse) {
200
			final OafRelDecoder decoder = OafRelDecoder.decode(rel.getRel());
201
			builder.setRel(decoder.setClassId(relClasses.getInverse(rel.getRel().getRelClass())));
202
			// swap source and target
203
			final String tmp = builder.getRel().getSource();
204
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
205
			builder.getRelBuilder().setTarget(tmp);
206
		}
207

    
208
		return OafDecoder.decode(builder.build());
209
	}
210

    
211
	private String lpad(final int s) {
212
		return StringUtils.leftPad(String.valueOf(s), 5);
213
	}
214

    
215
}
(2-2/16)