Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Iterator;
5

    
6
import com.google.common.collect.Iterables;
7
import eu.dnetlib.data.graph.model.DNGFDecoder;
8
import eu.dnetlib.data.graph.model.DNGFRelDecoder;
9
import eu.dnetlib.data.mapreduce.JobParams;
10
import eu.dnetlib.data.mapreduce.hbase.dedup.kv.DNGFKey;
11
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
12
import eu.dnetlib.data.proto.DNGFProtos;
13
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
14
import eu.dnetlib.data.proto.FieldTypeProtos;
15
import eu.dnetlib.data.transform.Ontologies;
16
import eu.dnetlib.data.transform.OntologyLoader;
17
import eu.dnetlib.pace.config.DedupConfig;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.mapreduce.TableReducer;
20
import org.apache.hadoop.hbase.util.Bytes;
21

    
22
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.asPut;
23
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.isRoot;
24

    
25
/**
26
 * Created by sandro on 2/24/17.
27
 */
28
public class DedupFixRelationReducer extends TableReducer<DNGFKey, ImmutableBytesWritable, ImmutableBytesWritable> {
29

    
30
    public static final String COUNTER_GROUP = "Fix relations";
31

    
32
    private DedupConfig dedupConf;
33

    
34
    private Ontologies ontologies;
35

    
36
    @Override
37
    protected void setup(final Context context) throws IOException, InterruptedException {
38
        super.setup(context);
39
        dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
40
        System.out.println("dedup fix Relations reducer\n\nwf conf: " + dedupConf.toString());
41

    
42
        ontologies = OntologyLoader.loadOntologies(context.getConfiguration().get(JobParams.ONTOLOGIES));
43
        System.out.println("ontologies: " + ontologies.toJson(true));
44
    }
45

    
46
    @Override
47
    protected void reduce(DNGFKey key, Iterable<ImmutableBytesWritable> values, Context context) throws IOException, InterruptedException {
48
        if (isRoot(key.toString())) {
49
            System.err.println("aborting DedupFixRelationReducer, found root key: " + key);
50
            context.getCounter(COUNTER_GROUP, "aborted").increment(1);
51
            return;
52
        }
53

    
54
        final Iterator<ImmutableBytesWritable> it = values.iterator();
55
        final DNGF first = DNGFDecoder.decode(it.next().copyBytes()).getDNGF();
56

    
57
        if (!first.getRel().getRelType().getClassid().equals("merges")) {
58
            context.getCounter(COUNTER_GROUP, "Relation skipped").increment(1);
59
            return;
60
        }
61
        context.getCounter(COUNTER_GROUP, "Item to fix").increment(1);
62

    
63
        final String dedupRoot = first.getRel().getSource();
64
        it.forEachRemaining(b -> {
65
            try {
66
                handleRels(context, DNGFDecoder.decode(b.copyBytes()).getDNGF(), dedupRoot);
67
            } catch (Exception e) {
68
               throw new RuntimeException(e);
69
            }
70
        });
71
    }
72

    
73
    private void handleRels(final Context context, final DNGFProtos.DNGF dngf, final String dedupRoot) throws IOException, InterruptedException {
74

    
75
        final String relType = dngf.getRel().getRelType().getClassid();
76
        if (relType.contains("merges")) {
77
            return;
78
        }
79

    
80
        //Set relation deleted by inference from Root to entity that has been merged to another one
81
        final FieldTypeProtos.DataInfo.Builder dataInfoBuilder = FieldTypeProtos.DataInfo.newBuilder(dngf.getDataInfo());
82
        dataInfoBuilder.setDeletedbyinference(true);
83
        DNGFProtos.DNGF.Builder dngfBuilder = DNGFProtos.DNGF.newBuilder(dngf);
84
        dngfBuilder.setDataInfo(dataInfoBuilder.build());
85
        final String sourceKey = dngf.getRel().getSource();
86
        context.write(new ImmutableBytesWritable(Bytes.toBytes(sourceKey)), asPut(dngfBuilder.build()));
87

    
88
        context.getCounter(COUNTER_GROUP, "Relation set deleted").increment(1);
89

    
90
        // Create Relation from Root Entity to its deduplicated Entity
91
        dngfBuilder = DNGFProtos.DNGF.newBuilder(dngf);
92
        DNGFProtos.DNGFRel.Builder relBuilder = DNGFProtos.DNGFRel.newBuilder(dngf.getRel());
93
        relBuilder.setTarget(dedupRoot);
94
        dngfBuilder.setRel(relBuilder.build());
95
        context.write(new ImmutableBytesWritable(Bytes.toBytes(sourceKey)), asPut(dngfBuilder.build()));
96

    
97
        //Create Relation from deduplicated Entity to Root Entity
98
        relBuilder = DNGFProtos.DNGFRel.newBuilder(dngf.getRel());
99
        relBuilder.setTarget(relBuilder.getSource());
100
        relBuilder.setSource(dedupRoot);
101
        final String inverseRelation = HBaseTableDAO.getInverseRelation(DNGFRelDecoder.decode(dngf.getRel()).getRelDescriptor(), ontologies);
102
        relBuilder.setRelType(FieldTypeProtos.Qualifier.newBuilder(relBuilder.getRelType()).setClassid(inverseRelation).setClassname(inverseRelation));
103
        dngfBuilder.setRel(relBuilder.build());
104
        context.write(new ImmutableBytesWritable(Bytes.toBytes(dedupRoot)), asPut(dngfBuilder.build()));
105

    
106
        context.getCounter(COUNTER_GROUP, "Relation fixed").increment(2);
107
    }
108

    
109
    private Iterable<DNGFProtos.DNGF> toDNGF(final Iterable<ImmutableBytesWritable> values) {
110
        return Iterables.transform(values, ibw -> DNGFDecoder.decode(ibw.copyBytes()).getDNGF());
111
    }
112
}
(9-9/22)