Project

General

Profile

« Previous | Next » 

Revision 46317

sorting values on reduce side, cleanup

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/kv/DNGFKey.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.kv;
2

  
3
import java.io.DataInput;
4
import java.io.DataOutput;
5
import java.io.IOException;
6

  
7
import com.google.common.collect.ComparisonChain;
8
import org.apache.hadoop.io.IntWritable;
9
import org.apache.hadoop.io.Text;
10
import org.apache.hadoop.io.WritableComparable;
11

  
12
/**
13
 * Created by claudio on 13/03/2017.
14
 */
15
public class DNGFKey implements WritableComparable<DNGFKey> {
16

  
17
	public final static int MERGES_REL = 0;
18
	public final static int OTHER_REL = 1;
19

  
20
	private IntWritable keyType;
21

  
22
	private Text id;
23

  
24
	public DNGFKey() {}
25

  
26
	public static DNGFKey create(final int keyType, final String id) {
27
		return new DNGFKey(keyType, id);
28
	}
29

  
30
	public static DNGFKey mergesRel(final String id) {
31
		return new DNGFKey(MERGES_REL, id);
32
	}
33

  
34
	public static DNGFKey otherRel(final String id) {
35
		return new DNGFKey(OTHER_REL, id);
36
	}
37

  
38
	public DNGFKey(final int keyType, final String id) {
39
		this.id = new Text(id);
40
		this.keyType = new IntWritable(keyType);
41
	}
42

  
43
	public void setKeyType(final IntWritable keyType) {
44
		this.keyType = keyType;
45
	}
46

  
47
	public void setId(final Text id) {
48
		this.id = id;
49
	}
50

  
51
	public Text getId() {
52
		return id;
53
	}
54

  
55
	public IntWritable getKeyType() {
56
		return keyType;
57
	}
58

  
59
	@Override
60
	public int compareTo(final DNGFKey o) {
61
		return ComparisonChain.start()
62
				.compare(getId(), o.getId())
63
				.compare(getKeyType(), o.getKeyType())
64
				.result();
65
	}
66

  
67
	@Override
68
	public void write(final DataOutput out) throws IOException {
69
		keyType.write(out);
70
		id.write(out);
71
	}
72

  
73
	@Override
74
	public void readFields(final DataInput in) throws IOException {
75
		keyType = new IntWritable();
76
		keyType.readFields(in);
77
		id = new Text();
78
		id.readFields(in);
79
	}
80

  
81
	@Override
82
	public String toString() {
83
		return (new StringBuilder())
84
				.append('{')
85
				.append(getKeyType().get())
86
				.append(',')
87
				.append(getId())
88
				.append('}')
89
				.toString();
90
	}
91
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/kv/DNGFKeyPartitioner.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.kv;
2

  
3
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
4
import org.apache.hadoop.mapreduce.Partitioner;
5

  
6
/**
7
 * Created by claudio on 13/03/2017.
8
 */
9
public class DNGFKeyPartitioner extends Partitioner<DNGFKey, ImmutableBytesWritable > {
10

  
11
	@Override
12
	public int getPartition(DNGFKey key, ImmutableBytesWritable  val, int numPartitions) {
13
		return Math.abs(key.getId().hashCode() % numPartitions);
14
	}
15

  
16
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/kv/DNGFKeyGroupingComparator.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.kv;
2

  
3
import org.apache.hadoop.io.WritableComparable;
4
import org.apache.hadoop.io.WritableComparator;
5

  
6
/**
7
 * Created by claudio on 13/03/2017.
8
 */
9
public class DNGFKeyGroupingComparator extends WritableComparator {
10

  
11
	protected DNGFKeyGroupingComparator() {
12
		super(DNGFKey.class, true);
13
	}
14

  
15
	@Override
16
	public int compare(WritableComparable w1, WritableComparable w2) {
17
		final DNGFKey k1 = (DNGFKey) w1;
18
		final DNGFKey k2 = (DNGFKey) w2;
19

  
20
		return k1.getId().compareTo(k2.getId());
21
	}
22
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFixRelationMapper.java
4 4

  
5 5
import eu.dnetlib.data.graph.model.DNGFRowKeyDecoder;
6 6
import eu.dnetlib.data.mapreduce.JobParams;
7
import eu.dnetlib.data.mapreduce.hbase.dedup.kv.DNGFKey;
7 8
import eu.dnetlib.data.proto.TypeProtos;
8 9
import eu.dnetlib.pace.config.DedupConfig;
9 10
import org.apache.hadoop.hbase.client.Result;
10 11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11 12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.io.Text;
13 13

  
14 14
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.isRoot;
15 15
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.rel;
......
17 17
/**
18 18
 * Created by sandro on 2/24/17.
19 19
 */
20
public class DedupFixRelationMapper extends TableMapper<Text, ImmutableBytesWritable> {
20
public class DedupFixRelationMapper extends TableMapper<DNGFKey, ImmutableBytesWritable> {
21 21

  
22 22
    private DedupConfig dedupConf;
23 23

  
......
36 36
        final TypeProtos.Type type = TypeProtos.Type.valueOf(dedupConf.getWf().getEntityType());
37 37
        final String rowKey = new String(key.copyBytes());
38 38
        final DNGFRowKeyDecoder rowKeyDecoder = DNGFRowKeyDecoder.decode(rowKey);
39
        if ((rowKeyDecoder.getType() != type) || !isRoot(rowKey)) {
39
        if ((!rowKeyDecoder.getType().equals(type)) || !isRoot(rowKey)) {
40 40
            return;
41 41
        }
42 42
        rel(value).values().forEach(dngf -> {
43 43
            final String targetId = dngf.getRel().getTarget();
44 44
            if (!isRoot(targetId)) {
45
                final Text rootId = new Text(targetId);
46
                emit(context, rootId, dngf.toByteArray());
45
                if (dngf.getRel().getRelType().getClassid().equals("merges")) {
46
                    emit(context, DNGFKey.mergesRel(targetId), dngf.toByteArray());
47
                } else {
48
                    emit(context, DNGFKey.otherRel(targetId), dngf.toByteArray());
49
                }
47 50
            }
48 51
        });
49 52
    }
50 53

  
51
    private void emit(final Context context, final Text rootId, final byte[] value) {
54
    private void emit(final Context context, final DNGFKey key, final byte[] value) {
52 55
        ibw.set(value);
53 56
        try {
54
            context.write(rootId, ibw);
57
            context.write(key, ibw);
55 58
        } catch (Exception e) {
56 59
            throw new RuntimeException(e);
57 60
        }
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupFixRelationReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2 2

  
3 3
import java.io.IOException;
4
import java.util.List;
4
import java.util.Iterator;
5 5

  
6 6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8 7
import eu.dnetlib.data.graph.model.DNGFDecoder;
9 8
import eu.dnetlib.data.graph.model.DNGFRelDecoder;
10 9
import eu.dnetlib.data.mapreduce.JobParams;
......
12 11
import eu.dnetlib.data.proto.DNGFProtos;
13 12
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
14 13
import eu.dnetlib.data.proto.FieldTypeProtos;
15
import eu.dnetlib.data.proto.KindProtos;
16 14
import eu.dnetlib.data.transform.Ontologies;
17 15
import eu.dnetlib.data.transform.OntologyLoader;
18 16
import eu.dnetlib.pace.config.DedupConfig;
......
52 50
            context.getCounter(COUNTER_GROUP, "aborted").increment(1);
53 51
            return;
54 52
        }
55
        int relCount = 0;
56
        String dedupRoot = null;
57 53

  
58
        final List<DNGF> dngfs = Lists.newArrayList(toDNGF(values));
54
        final Iterator<ImmutableBytesWritable> it = values.iterator();
55
        final DNGF first = DNGFDecoder.decode(it.next().copyBytes()).getDNGF();
59 56

  
60
        // The reducer should fix if it founds a relation merges and other relations
61
        for (final DNGFProtos.DNGF dngf : dngfs) {
62
            if (dngf.getKind() == KindProtos.Kind.entity) {
63
                throw new RuntimeException("aborting unexpected entities on this reducer");
64
            }
65
            final String relType = dngf.getRel().getRelType().getClassid();
66
            if (relType.contains("merges")) {
67
                dedupRoot = dngf.getRel().getSource();
68
            }
69
            relCount++;
57
        if (!first.getRel().getRelType().equals("merges")) {
58
            context.getCounter(COUNTER_GROUP, "Relation skipped").increment(1);
59
            return;
70 60
        }
61
        context.getCounter(COUNTER_GROUP, "Item to fix").increment(1);
71 62

  
72
        if (dedupRoot != null && relCount > 1) {
73
            context.getCounter(COUNTER_GROUP, "Item to fix").increment(1);
74
            for (final DNGFProtos.DNGF dngf : dngfs) {
75
                handleRels(context, dngf, dedupRoot);
63
        final String dedupRoot = first.getRel().getSource();
64
        it.forEachRemaining(b -> {
65
            try {
66
                handleRels(context, DNGFDecoder.decode(b.copyBytes()).getDNGF(), dedupRoot);
67
            } catch (Exception e) {
68
               throw new RuntimeException(e);
76 69
            }
77
        } else {
78
            context.getCounter(COUNTER_GROUP, "Relation skipped").increment(1);
79
        }
70
        });
80 71
    }
81 72

  
82 73
    private void handleRels(final Context context, final DNGFProtos.DNGF dngf, final String dedupRoot) throws IOException, InterruptedException {
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dli/PrepareScholixDataMapper.java
20 20
 */
21 21
public class PrepareScholixDataMapper extends TableMapper<DliKey, ImmutableBytesWritable> {
22 22

  
23
    private final static String SCHOLIX = "Scholix";
24
    private final static String DNGF = "DNGF";
25

  
23 26
    @Override
24 27
    protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
25 28

  
......
31 34

  
32 35
                final Scholix.Builder source = DngfToScholixConverter.withSource(entity.getEntity());
33 36
                emit(DliKey.ENTITY, entity.getEntity().getId(), context, source.build().toByteArray());
34
                context.getCounter("scholix", "source " + source.getSource().getObjectType()).increment(1);
37
                context.getCounter(SCHOLIX, "Source: " + source.getSource().getObjectType()).increment(1);
35 38

  
36 39
                rel(value, "isMergedIn", "merges", "isSimilarTo").values().forEach(r -> {
37 40
                    if (!deletedByInference(r)) {
38 41
                        final Scholix.Builder target = DngfToScholixConverter.withTarget(entity.getEntity(), r.getRel());
39 42
                        emit(DliKey.REL, r.getRel().getTarget(), context, target.build().toByteArray());
40
                        context.getCounter("scholix", "rel " + target.getTarget().getObjectType()).increment(1);
43
                        context.getCounter(SCHOLIX, "Rel type: " + target.getTarget().getObjectType()).increment(1);
41 44
                    } else {
42
                        context.getCounter("DNGF", type.name() + " rel deletedbyinference").increment(1);
45
                        context.getCounter(DNGF, "Rel deletedbyinference: " + type.name()).increment(1);
43 46
                    }
44 47
                });
45 48
            } else {
46
                context.getCounter("DNGF", type.name() + " deletedbyinference").increment(1);
49
                context.getCounter(DNGF, "Deletedbyinference: " + type.name()).increment(1);
47 50
            }
48 51
        } else {
49
            context.getCounter("DNGF", type.name() + " invalid").increment(1);
52
            context.getCounter(DNGF, "Invalid: " + type.name()).increment(1);
50 53
        }
51 54
    }
52 55

  
......
60 63

  
61 64
    private void emit(final int keyType, final String id, final Context context, final byte[] data) {
62 65
        try {
63
            context.getCounter("scholix", keyType == DliKey.ENTITY ? "ENTITY" : "REL").increment(1);
66
            context.getCounter(SCHOLIX, keyType == DliKey.ENTITY ? "ENTITY" : "REL").increment(1);
64 67
            context.write(DliKey.create(keyType, id), new ImmutableBytesWritable(data));
65 68
        } catch (Exception e) {
66 69
            e.printStackTrace();
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dli/PrepareSummaryDataMapper.java
21 21

  
22 22
public class PrepareSummaryDataMapper extends TableMapper<Text, Text> {
23 23

  
24
    private final static String SCHOLIX = "Scholix";
25

  
24 26
    private Ontologies ontologies = null;
25 27

  
26 28
    private Text outKey;
......
63 65
                    emit(new String(keyIn.copyBytes()), context, converter.convertAsJson());
64 66
                }
65 67
            } else {
66
                incrementCounter(context, "deleted by inference", type.toString(), 1);
68
                incrementCounter(context, SCHOLIX, "Deletedbyinference: " + type.toString(), 1);
67 69
            }
68 70
        } else {
69
            incrementCounter(context, "missing body (map)", type.toString(), 1);
71
            incrementCounter(context, SCHOLIX, "Missing body: " + type.toString(), 1);
70 72
        }
71 73
    }
72 74

  
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dli/PrepareScholixDataReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.dli;
2 2

  
3 3
import java.io.IOException;
4
import java.util.Arrays;
4 5
import java.util.Iterator;
6
import java.util.LinkedList;
5 7
import java.util.concurrent.atomic.AtomicInteger;
6 8

  
9
import com.google.common.base.Splitter;
10
import com.google.common.collect.Lists;
7 11
import com.google.protobuf.InvalidProtocolBufferException;
8 12
import com.googlecode.protobuf.format.JsonFormat;
9 13
import eu.dnetlib.data.mapreduce.hbase.dli.kv.DliKey;
......
56 60
			}
57 61
		});
58 62

  
59
		context.getCounter("scholix", "group size " + groupSize.get()).increment(1);
63
		groupSizeCounter(context, groupSize.get(),
64
				"1,1",
65
				"1,10",
66
				"10,20",
67
				"20,100",
68
				"100,200",
69
				"200,500",
70
				"500,1000",
71
				"1000,2000",
72
				"2000,5000",
73
				"5000,10000",
74
				"10000,20000",
75
				"20000,*");
60 76
	}
61 77

  
78
	private void groupSizeCounter(final Context context, final int groupSize, final String... groups) {
79
		Arrays.asList(groups).forEach(g -> {
80
			final LinkedList<String> i = Lists.newLinkedList(Splitter.on(",").split(g));
81
			int min = Integer.parseInt(i.getFirst());
82
			int max = i.getLast().equals("*") ? Integer.MAX_VALUE : Integer.parseInt(i.getLast());
83
			groupSizeCounter(context, groupSize, min, max);
84
		});
85

  
86
	}
87

  
88
	private void groupSizeCounter(final Context context, final int groupSize, Integer min, Integer max) {
89
		if (groupSize > min & groupSize <= max) {
90
			context.getCounter("scholix groups", String.format("group size (%s,%s)", min, max)).increment(1);
91
		}
92
	}
93

  
62 94
	private Scholix.Builder parse(DliKey key, final ImmutableBytesWritable value) {
63 95
		try {
64 96
			return Scholix.newBuilder(Scholix.parseFrom(value.copyBytes()));

Also available in: Unified diff