Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8
import eu.dnetlib.data.graph.model.DNGFDecoder;
9
import eu.dnetlib.data.graph.model.DNGFRelDecoder;
10
import eu.dnetlib.data.mapreduce.JobParams;
11
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
12
import eu.dnetlib.data.proto.DNGFProtos.DNGFEntity;
13
import eu.dnetlib.data.proto.TypeProtos.Type;
14
import eu.dnetlib.data.transform.DNGFEntityMerger;
15
import eu.dnetlib.data.transform.Ontologies;
16
import eu.dnetlib.data.transform.OntologyLoader;
17
import eu.dnetlib.pace.config.DedupConfig;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.hbase.mapreduce.TableReducer;
21
import org.apache.hadoop.hbase.util.Bytes;
22
import org.apache.hadoop.io.Text;
23

    
24
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.*;
25

    
26
public class DedupBuildRootsReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
27

    
28
    public static final String FIXED_RELATION = "Fixed Relation";
29
    private DedupConfig dedupConf;
30
    private Ontologies ontologies;
31

    
32
	@Override
33
	protected void setup(final Context context) throws IOException, InterruptedException {
34
		super.setup(context);
35
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
36
		System.out.println("dedup buildRoots reducer\n\nwf conf: " + dedupConf.toString());
37

    
38
		ontologies = OntologyLoader.loadOntologies(context.getConfiguration().get(JobParams.ONTOLOGIES));
39
		System.out.println("ontologies: " + ontologies.toJson(true));
40

    
41
	}
42

    
43
	@Override
44
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, InterruptedException {
45

    
46
		// ensures we're dealing with a root, otherwise returns
47
		if (!isRoot(key.toString())) {
48
			System.err.println("aborting DedupBuildRootsReducer, found non-root key: " + key);
49
			context.getCounter("DedupBuildRootsReducer", "aborted").increment(1);
50
			return;
51
		}
52

    
53
		final byte[] rowkey = Bytes.toBytes(key.toString());
54
		final List<DNGF> entities = Lists.newArrayList();
55

    
56
		for (final DNGF oaf : toDNGF(values)) {
57
			switch (oaf.getKind()) {
58
			case entity:
59
				entities.add(oaf);
60
				break;
61
			case relation:
62
				handleRels(context, rowkey, oaf, true);
63
				break;
64
			default:
65
				break;
66
			}
67
		}
68
		// build and emit the root body
69
		final DNGF.Builder builder = DNGFEntityMerger.merge(dedupConf, key.toString(), entities);
70
		if (entities.size() < JobParams.MAX_COUNTERS) {
71
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", lpad(entities.size())).increment(1);
72
		} else {
73
			context.getCounter(dedupConf.getWf().getEntityType() + " root group size", "> " + JobParams.MAX_COUNTERS).increment(1);
74
		}
75

    
76
		final DNGF dngf = builder.build();
77
		final DNGFEntity entity = dngf.getEntity();
78

    
79
		try {
80

    
81
			context.write(new ImmutableBytesWritable(Bytes.toBytes(entity.getId())), asPut(dngf));
82
		} catch (Throwable e) {
83
			System.out.println("Exception dngf = " + dngf.toString());
84
			context.getCounter(entity.getType().toString(), e.getClass().getName()).increment(1);
85
		}
86

    
87
		context.getCounter(entity.getType().toString(), "root").increment(1);
88

    
89
		// add person rels TODO: remove this hack
90
		// context.getCounter("hack", "personResult out").increment(personMap.size());
91

    
92
	}
93

    
94
	private Iterable<DNGF> toDNGF(final Iterable<ImmutableBytesWritable> values) {
95
		return Iterables.transform(values, ibw -> DNGFDecoder.decode(ibw.copyBytes()).getDNGF());
96
	}
97

    
98
	private void handleRels(final Context context, final byte[] rowkey, final DNGF rel, final boolean hack) throws IOException, InterruptedException {
99

    
100
		if (hack && checkHack(new String(rowkey), rel)) {
101
			context.getCounter("hack", "personResult in").increment(1);
102
		} else {
103

    
104
			// emit relation from the root to the related entities
105
			DNGFDecoder decoder = rootToEntity(rowkey, rel, context);
106
			context.write(new ImmutableBytesWritable(rowkey), asPutByCollectedFrom(decoder.getDNGF()));
107

    
108
			// emit relation from the related entities to the root
109
			decoder = entityToRoot(rowkey, rel, context);
110
			final byte[] revKey = Bytes.toBytes(decoder.relSourceId());
111
			context.write(new ImmutableBytesWritable(revKey), asPutByCollectedFrom(decoder.getDNGF()));
112

    
113
            context.getCounter(FIXED_RELATION, decoder.getRelDescriptor().shortQualifier() + " [entity <-> root]").increment(2);
114
        }
115
        // mark relation from the related entities to the duplicate as deleted
116
		DNGFDecoder decoder = markDeleted(rel, true, context);
117
		byte[] revKey = Bytes.toBytes(decoder.relSourceId());
118
		context.write(new ImmutableBytesWritable(revKey), asPut(decoder.getDNGF()));
119

    
120
		// mark relation from the related entities to the duplicate as deleted
121
		decoder = markDeleted(rel, false, context);
122
		revKey = Bytes.toBytes(decoder.relSourceId());
123
		context.write(new ImmutableBytesWritable(revKey), asPut(decoder.getDNGF()));
124

    
125
        context.getCounter(FIXED_RELATION, decoder.getRelDescriptor().shortQualifier() + " mark deleted [dup <-> entity]").increment(2);
126
    }
127

    
128
	public boolean checkHack(final String root, final DNGF oaf) {
129

    
130
		boolean res;
131
		final String type = dedupConf.getWf().getEntityType();
132

    
133
		if ((type.equals(Type.publication.toString()) || type.equals(Type.dataset.toString())) &&
134
				oaf.getRel().getTargetType().equals(Type.person) && !md5matches(root, oaf.getRel().getSource())) {
135

    
136
			res = true;
137
		} else {
138
			res = false;
139
		}
140

    
141
		// if (root.equals("50|dedup_wf_001::92f6197ea6f16ae554755aced832fb6f")) {
142
		// System.out.println("##################");
143
		// System.out.println("root  : " + root);
144
		// System.out.println("source: " + oaf.getRel().getSource());
145
		// System.out.println("ckeck:  " + res);
146
		// }
147

    
148
		return res;
149
	}
150

    
151
	private boolean md5matches(final String id1, final String id2) {
152
		return id1.replaceAll("^.*\\:\\:", "").equals(id2.replaceAll("^.*\\:\\:", ""));
153
	}
154

    
155
	private DNGFDecoder rootToEntity(final byte[] rootRowkey, final DNGF rel, final Context context) {
156
		return patchRelations(rootRowkey, rel, DNGFPatch.rootToEntity, context);
157
	}
158

    
159
	private DNGFDecoder entityToRoot(final byte[] rootRowkey, final DNGF rel, final Context context) {
160
		return patchRelations(rootRowkey, rel, DNGFPatch.entityToRoot, context);
161
	}
162

    
163
	private DNGFDecoder markDeleted(final DNGF rel, final boolean reverse, final Context context) {
164
		return deleteRelations(rel, reverse, context);
165
	}
166

    
167
	// patches relation objects setting the source field with the root id
168
	private DNGFDecoder patchRelations(final byte[] rootRowkey, final DNGF rel, final DNGFPatch patchKind, final Context context) {
169
		final DNGFRelDecoder decoder = DNGFRelDecoder.decode(rel.getRel());
170
		final DNGF.Builder builder = DNGF.newBuilder(rel);
171
		builder.getDataInfoBuilder().setInferred(true).setDeletedbyinference(false);
172
		switch (patchKind) {
173
		case rootToEntity:
174
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:rootToEntity)");
175
			builder.getRelBuilder().setSource(new String(rootRowkey));
176
			break;
177

    
178
		case entityToRoot:
179
			builder.setRel(decoder.setClassId(getInverse(decoder, context)));
180
			// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots p:entityToRoot)");
181
			builder.getRelBuilder().setSource(builder.getRel().getTarget());
182
			builder.getRelBuilder().setTarget(new String(rootRowkey));
183
            final Type sourceType = builder.getRelBuilder().getSourceType();
184
            builder.getRelBuilder().setSourceType(builder.getRel().getTargetType());
185
            builder.getRelBuilder().setTargetType(sourceType);
186
            break;
187

    
188
		default:
189
			break;
190
		}
191

    
192
		return DNGFDecoder.decode(builder.build());
193
	}
194

    
195
	private String getInverse(final DNGFRelDecoder decoder, final Context context) {
196
		final String inverse = ontologies.inverseOf(decoder.getRelDescriptor());
197
		if (StringUtils.isBlank(inverse)) {
198
            //context.getCounter("unmapped relationship", decoder.getRelDescriptor().shortQualifier()).increment(1);
199
            return "unknown";
200
		}
201
		return inverse;
202
	}
203

    
204
	private DNGFDecoder deleteRelations(final DNGF rel, final boolean reverse, final Context context) {
205
		final DNGF.Builder builder = DNGF.newBuilder(rel);
206
		// builder.getDataInfoBuilder().setInferenceprovenance("dedup (BuildRoots d: " + reverse + ")");
207
		builder.getDataInfoBuilder().setDeletedbyinference(true);
208

    
209
		if (reverse) {
210
			final DNGFRelDecoder decoder = DNGFRelDecoder.decode(rel.getRel());
211

    
212
			builder.setRel(decoder.setClassId(getInverse(decoder, context)));
213
			// swap source and target
214
			final String tmp = builder.getRel().getSource();
215
            final Type sType = builder.getRel().getSourceType();
216
            builder.getRelBuilder().setSource(builder.getRel().getTarget());
217
			builder.getRelBuilder().setTarget(tmp);
218
            builder.getRelBuilder().setSourceType(builder.getRel().getTargetType());
219
            builder.getRelBuilder().setTargetType(sType);
220
        }
221

    
222
		return DNGFDecoder.decode(builder.build());
223
	}
224

    
225
	private String lpad(final int s) {
226
		return StringUtils.leftPad(String.valueOf(s), 5);
227
	}
228

    
229
	private enum DNGFPatch {
230
		rootToEntity, entityToRoot
231
	}
232

    
233
}
(2-2/22)