Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Map.Entry;
5
import java.util.stream.Collectors;
6

    
7
import eu.dnetlib.data.graph.model.DNGFRelDecoder;
8
import eu.dnetlib.data.graph.utils.RelDescriptor;
9
import eu.dnetlib.data.mapreduce.JobParams;
10
import eu.dnetlib.data.proto.DNGFProtos;
11
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
12
import eu.dnetlib.data.proto.DNGFProtos.DNGF.Builder;
13
import eu.dnetlib.data.proto.FieldTypeProtos;
14
import eu.dnetlib.data.proto.TypeProtos;
15
import eu.dnetlib.data.transform.Ontologies;
16
import eu.dnetlib.data.transform.OntologyLoader;
17
import eu.dnetlib.pace.config.DedupConfig;
18
import org.apache.hadoop.hbase.client.Result;
19
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
20
import org.apache.hadoop.hbase.mapreduce.TableMapper;
21
import org.apache.hadoop.io.Writable;
22
import org.springframework.beans.factory.annotation.Qualifier;
23

    
24
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.asPut;
25
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.isRoot;
26
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.relVersions;
27

    
28
public class DedupMergeRelationVersionMapper extends TableMapper<ImmutableBytesWritable, Writable> {
29

    
30
	private DedupConfig dedupConf;
31

    
32
	private ImmutableBytesWritable outKey = new ImmutableBytesWritable();
33
    private Ontologies ontologies;
34

    
35
	@Override
36
	protected void setup(final Context context) throws IOException, InterruptedException {
37
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
38
		System.out.println("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
39

    
40
		outKey = new ImmutableBytesWritable();
41

    
42
        ontologies = OntologyLoader.loadOntologies(context.getConfiguration().get(JobParams.ONTOLOGIES));
43
        System.out.println("ontologies: " + ontologies.toJson(true));
44
    }
45

    
46
	@Override
47
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
48
        if (!isRoot(rowkey)) {
49
            context.getCounter("skipped row", "total").increment(1);
50
            return;
51
        }
52
        relVersions(
53
				value,
54
				"isMergedIn", "merges", "isSimilarTo")
55
				.entrySet().stream()
56
				.filter(e -> e.getValue().keySet().size() > 1)
57
				.collect(Collectors.toMap(
58
						e -> e.getKey(),
59
						e -> {
60
                            final DNGF.Builder b = DNGF.newBuilder();
61
                            e.getValue().values().forEach(v -> {
62
								b.mergeFrom(v);
63
								final String relType = RelDescriptor.asString(v.getRel().getRelType());
64
								context.getCounter("merged", relType).increment(1);
65
							});
66
                            return b;
67
                        }
68
				)).entrySet().forEach(dngf -> emit(context, rowkey, dngf));
69
	}
70

    
71
    private void emit(final Context context, final ImmutableBytesWritable rowkey, final Entry<String, DNGF.Builder> entry) {
72
        try {
73
			outKey.set(rowkey.copyBytes());
74
            DNGF.Builder inputRel = entry.getValue();
75
            System.out.println(String.format("wirting row: %s, data: %s", new String(rowkey.copyBytes()), inputRel.toString()));
76
            context.write(outKey, asPut(inputRel.build()));
77

    
78

    
79
            final String target = inputRel.getRel().getTarget();
80
            final TypeProtos.Type targetType = inputRel.getRel().getTargetType();
81
            final TypeProtos.Type sourceType = inputRel.getRel().getSourceType();
82
            final DNGFProtos.DNGFRel.Builder relBuilder = DNGFProtos.DNGFRel.newBuilder(inputRel.getRel());
83

    
84
            //SWAP Source Target and its type
85
            relBuilder.setTargetType(sourceType);
86
            relBuilder.setSourceType(targetType);
87
            relBuilder.setTarget(relBuilder.getSource());
88
            relBuilder.setSource(target);
89

    
90
            //Change relation to its inverse
91
            final String inverseRelation = ontologies.inverseOf(DNGFRelDecoder.decode(inputRel.getRel()).getRelDescriptor());
92
            relBuilder.setRelType(FieldTypeProtos.Qualifier.newBuilder(relBuilder.getRelType()).setClassname(inverseRelation).setClassid(inverseRelation).build());
93

    
94
            outKey.set(relBuilder.getSource().getBytes());
95
            inputRel.setRel(relBuilder.build());
96
            context.write(outKey, asPut(inputRel.build()));
97

    
98
		} catch (Exception e) {
99
			throw new RuntimeException(e);
100
		}
101
	}
102

    
103
}
(13-13/23)