Project

General

Profile

« Previous | Next » 

Revision 54507

migrated classes for the FixRelation job from MASTER branch

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/DedupFixRelationReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation;
2

  
3
import com.google.common.collect.Iterables;
4
import com.google.gson.Gson;
5
import eu.dnetlib.data.mapreduce.JobParams;
6
import eu.dnetlib.data.mapreduce.util.OafDecoder;
7
import eu.dnetlib.data.mapreduce.util.OafRelDecoder;
8
import eu.dnetlib.data.mapreduce.util.RelDescriptor;
9
import eu.dnetlib.data.proto.FieldTypeProtos;
10
import eu.dnetlib.data.proto.OafProtos;
11
import eu.dnetlib.utils.ontologies.Ontologies;
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.mapreduce.TableReducer;
14
import org.apache.hadoop.hbase.util.Bytes;
15
import org.apache.hadoop.io.Writable;
16
import org.apache.hadoop.mapreduce.Reducer;
17

  
18
import java.io.IOException;
19
import java.util.Iterator;
20

  
21
import static eu.dnetlib.data.mapreduce.util.DedupUtils.isRoot;
22
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.asPut;
23

  
24
/**
25
 * Created by sandro on 2/24/17.
26
 */
27
public class DedupFixRelationReducer extends TableReducer<Key, ImmutableBytesWritable, ImmutableBytesWritable> {
28

  
29
    public static final String COUNTER_GROUP = "Fix relations";
30

  
31
    private Ontologies ontologies;
32

  
33
    private boolean simulation;
34

  
35
    @Override
36
    protected void setup(final Context context) throws IOException, InterruptedException {
37
        super.setup(context);
38

  
39
        simulation = context.getConfiguration().getBoolean("fixrel.simulation", false);
40
        ontologies = new Gson().fromJson(context.getConfiguration().get(JobParams.ONTOLOGIES), Ontologies.class);
41

  
42
        System.out.println("ontologies: " + ontologies.toJson(true));
43
    }
44

  
45
    @Override
46
    protected void reduce(Key key, Iterable<ImmutableBytesWritable> values, Context context) {
47
        if (isRoot(key.toString())) {
48
            System.err.println("aborting DedupFixRelationReducer, found root key: " + key);
49
            context.getCounter(COUNTER_GROUP, "aborted").increment(1);
50
            return;
51
        }
52

  
53
        final Iterator<ImmutableBytesWritable> it = values.iterator();
54
        final OafProtos.Oaf first = OafDecoder.decode(it.next().copyBytes()).getOaf();
55

  
56
        if (!first.getRel().getRelClass().equals("merges")) {
57
            context.getCounter(COUNTER_GROUP, "Relation skipped").increment(1);
58
            return;
59
        }
60
        context.getCounter(COUNTER_GROUP, "Item to fix").increment(1);
61

  
62
        final String dedupRoot = first.getRel().getSource();
63
        it.forEachRemaining(b -> {
64
            try {
65
                handleRels(context, OafDecoder.decode(b.copyBytes()).getOaf(), dedupRoot);
66
            } catch (Exception e) {
67
                throw new RuntimeException(e);
68
            }
69
        });
70
    }
71

  
72
    private void handleRels(final Context context, final OafProtos.Oaf oaf, final String dedupRoot) throws IOException, InterruptedException {
73

  
74
        final String relType = oaf.getRel().getRelClass();
75
        if (relType.contains("merges")) {
76
            return;
77
        }
78

  
79
        // Set relation deleted by inference from Root to entity that has been merged to another one
80
        final FieldTypeProtos.DataInfo.Builder dataInfoBuilder = FieldTypeProtos.DataInfo.newBuilder(oaf.getDataInfo());
81
        dataInfoBuilder.setDeletedbyinference(true);
82
        OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder(oaf);
83
        builder.setDataInfo(dataInfoBuilder.build());
84
        final String sourceKey = oaf.getRel().getSource();
85
        write(context, sourceKey, builder);
86

  
87
        String relGroup = oaf.getRel().getRelType().toString() + oaf.getRel().getSubRelType().toString();
88
        context.getCounter(COUNTER_GROUP, String.format("%s - Relation set deleted", relGroup)).increment(1);
89

  
90
        // Create Relation from Root Entity to its deduplicated Entity
91
        builder = OafProtos.Oaf.newBuilder(oaf);
92
        OafProtos.OafRel.Builder relBuilder = OafProtos.OafRel.newBuilder(oaf.getRel());
93
        relBuilder.setTarget(dedupRoot);
94
        builder.setRel(relBuilder.build());
95
        write(context, sourceKey, builder);
96

  
97
        // Create Relation from deduplicated Entity to Root Entity
98
        relBuilder = OafProtos.OafRel.newBuilder(oaf.getRel());
99
        relBuilder.setTarget(relBuilder.getSource());
100
        relBuilder.setSource(dedupRoot);
101

  
102
        RelDescriptor rd = OafRelDecoder.decode(oaf.getRel()).getRelDescriptor();
103

  
104
        final String inverseRelation = ontologies.get(rd.getRelType().toString()).inverseOf(rd.getRelClass());
105
        relBuilder.setRelType(rd.getRelType()).setSubRelType(rd.getSubRelType()).setRelClass(inverseRelation);
106

  
107
        builder.setRel(relBuilder.build());
108
        write(context, dedupRoot, builder);
109

  
110
        relGroup = oaf.getRel().getRelType().toString() + oaf.getRel().getSubRelType().toString();
111
        context.getCounter(COUNTER_GROUP, String.format("%s - Relation fixed", relGroup)).increment(2);
112
    }
113

  
114
    private void write(Context context, String dedupRoot, OafProtos.Oaf.Builder builder) throws IOException, InterruptedException {
115
        if (!simulation) {
116
            context.write(new ImmutableBytesWritable(Bytes.toBytes(dedupRoot)), asPut(builder.build()));
117
        }
118
    }
119

  
120
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/Key.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation;
2

  
3
import org.apache.hadoop.io.IntWritable;
4
import org.apache.hadoop.io.Text;
5
import org.apache.hadoop.io.Writable;
6
import org.apache.hadoop.io.WritableComparable;
7

  
8
import java.io.DataInput;
9
import java.io.DataOutput;
10
import java.io.IOException;
11

  
12
public class Key implements Writable, WritableComparable<Key> {
13

  
14
    public final static int MERGES_REL = 0;
15
    public final static int OTHER_REL = 1;
16

  
17
    private IntWritable keyType;
18

  
19
    private Text id;
20

  
21
    public Key() {}
22

  
23
    public Key(final int keyType, final String id) {
24
        this.id = new Text(id);
25
        this.keyType = new IntWritable(keyType);
26
    }
27

  
28
    public static Key create(final int keyType, final String id) {
29
        return new Key(keyType, id);
30
    }
31

  
32
    public static Key mergesRel(final String id) {
33
        return new Key(MERGES_REL, id);
34
    }
35

  
36
    public static Key otherRel(final String id) {
37
        return new Key(OTHER_REL, id);
38
    }
39

  
40
    public Text getId() {
41
        return id;
42
    }
43

  
44
    public void setId(final Text id) {
45
        this.id = id;
46
    }
47

  
48
    public IntWritable getKeyType() {
49
        return keyType;
50
    }
51

  
52
    public void setKeyType(final IntWritable keyType) {
53
        this.keyType = keyType;
54
    }
55

  
56
    @Override
57
    public int compareTo(final Key o) {
58
        int compareValue = this.id.toString().compareTo(o.getId().toString());
59
        if (compareValue == 0) {
60
            compareValue = this.keyType.compareTo(o.getKeyType());
61
        }
62
        return compareValue;    // sort ascending
63
    }
64

  
65
    @Override
66
    public void write(final DataOutput out) throws IOException {
67
        keyType.write(out);
68
        id.write(out);
69
    }
70

  
71
    @Override
72
    public void readFields(final DataInput in) throws IOException {
73
        keyType = new IntWritable();
74
        keyType.readFields(in);
75
        id = new Text();
76
        id.readFields(in);
77
    }
78

  
79
    @Override
80
    public String toString() {
81
        return (new StringBuilder())
82
                .append('{')
83
                .append(getKeyType().get())
84
                .append(',')
85
                .append(getId())
86
                .append('}')
87
                .toString();
88
    }
89
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/KeyPartitioner.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation;
2

  
3
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
4
import org.apache.hadoop.mapreduce.Partitioner;
5

  
6
public class KeyPartitioner extends Partitioner<Key, ImmutableBytesWritable> {
7

  
8
    @Override
9
    public int getPartition(Key key, ImmutableBytesWritable val, int numPartitions) {
10
        final int res = Math.abs(key.getId().hashCode() % numPartitions);
11
        return res;
12
    }
13

  
14
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/DedupFixRelationMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation;
2

  
3
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
4
import eu.dnetlib.data.proto.OafProtos;
5
import eu.dnetlib.data.proto.TypeProtos;
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9

  
10
import java.io.IOException;
11

  
12
import static eu.dnetlib.data.mapreduce.util.DedupUtils.isRoot;
13
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getBody;
14
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.rel;
15

  
16
/**
17
 * Created by sandro on 2/24/17.
18
 */
19
public class DedupFixRelationMapper extends TableMapper<Key, ImmutableBytesWritable> {
20

  
21
    public static final String COUNTER_GROUP = "Fix relations";
22

  
23
    private ImmutableBytesWritable ibw;
24

  
25
    private TypeProtos.Type expectedType;
26

  
27
    @Override
28
    protected void setup(final Context context) {
29

  
30
        expectedType = TypeProtos.Type.valueOf(context.getConfiguration().get("type"));
31

  
32
        ibw = new ImmutableBytesWritable();
33
    }
34
    @Override
35
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
36
        final String rowKey = new String(key.copyBytes());
37
        final OafRowKeyDecoder rowKeyDecoder = OafRowKeyDecoder.decode(rowKey);
38
        final TypeProtos.Type type = rowKeyDecoder.getType();
39

  
40
        if (!type.equals(expectedType) || !isRoot(rowKey)) {
41
            return;
42
        }
43

  
44
        final OafProtos.Oaf oaf = getBody(value, type);
45
        if (oaf == null) {
46
            context.getCounter(COUNTER_GROUP, String.format("%s - missing body", type.toString())).increment(1);
47
            return;
48
        }
49

  
50
        rel(value).forEach(rel -> {
51
            final String targetId = rel.getRel().getTarget();
52
            if (!isRoot(targetId)) {
53
                if (rel.getRel().getRelClass().equals("merges")) {
54
                    emit(context, Key.mergesRel(targetId), rel.toByteArray());
55
                } else {
56
                    emit(context, Key.otherRel(targetId), rel.toByteArray());
57
                }
58
            }
59
        });
60
    }
61

  
62
    private void emit(final Context context, final Key key, final byte[] value) {
63
        ibw.set(value);
64
        switch (key.getKeyType().get()) {
65
            case Key.MERGES_REL:
66
                context.getCounter(COUNTER_GROUP, String.format("%s - Merge Relationship", expectedType)).increment(1);
67
                break;
68
            case Key.OTHER_REL:
69
                context.getCounter(COUNTER_GROUP, String.format("%s - Other Relationship", expectedType)).increment(1);
70
                break;
71
        }
72
        try {
73
            context.write(key, ibw);
74
        } catch (Exception e) {
75
            throw new RuntimeException(e);
76
        }
77
    }
78

  
79
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/KeyGroupingComparator.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation;
2

  
3
import org.apache.hadoop.io.WritableComparable;
4
import org.apache.hadoop.io.WritableComparator;
5

  
6
public class KeyGroupingComparator extends WritableComparator {
7

  
8
    protected KeyGroupingComparator() {
9
        super(Key.class, true);
10
    }
11

  
12
    @Override
13
    public int compare(WritableComparable w1, WritableComparable w2) {
14
        final Key k1 = (Key) w1;
15
        final Key k2 = (Key) w2;
16
        return k1.getId().compareTo(k2.getId());
17
    }
18
}

Also available in: Unified diff