Project

General

Profile

« Previous | Next » 

Revision 54182

fixRelations must work on main entities in a single scan pass

View differences:

modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/DedupFixRelationReducer.java
36 36
    protected void setup(final Context context) throws IOException, InterruptedException {
37 37
        super.setup(context);
38 38

  
39
        simulation = context.getConfiguration().getBoolean("simulation", false);
39
        simulation = context.getConfiguration().getBoolean("fixrel.simulation", false);
40 40
        ontologies = new Gson().fromJson(context.getConfiguration().get(JobParams.ONTOLOGIES), Ontologies.class);
41 41

  
42 42
        System.out.println("ontologies: " + ontologies.toJson(true));
......
85 85
        write(context, sourceKey, builder);
86 86

  
87 87
        String relGroup = oaf.getRel().getRelType().toString() + oaf.getRel().getSubRelType().toString();
88
        context.getCounter(COUNTER_GROUP, String.format("Relation '%s' set deleted", relGroup)).increment(1);
88
        context.getCounter(COUNTER_GROUP, String.format("%s - Relation set deleted", relGroup)).increment(1);
89 89

  
90 90
        // Create Relation from Root Entity to its deduplicated Entity
91 91
        builder = OafProtos.Oaf.newBuilder(oaf);
......
108 108
        write(context, dedupRoot, builder);
109 109

  
110 110
        relGroup = oaf.getRel().getRelType().toString() + oaf.getRel().getSubRelType().toString();
111
        context.getCounter(COUNTER_GROUP, String.format("Relation '%s' fixed", relGroup)).increment(2);
111
        context.getCounter(COUNTER_GROUP, String.format("%s - Relation fixed", relGroup)).increment(2);
112 112
    }
113 113

  
114 114
    private void write(Context context, String dedupRoot, OafProtos.Oaf.Builder builder) throws IOException, InterruptedException {
......
117 117
        }
118 118
    }
119 119

  
120
    private Iterable<OafProtos.Oaf> toOaf(final Iterable<ImmutableBytesWritable> values) {
121
        return Iterables.transform(values, ibw -> OafDecoder.decode(ibw.copyBytes()).getOaf());
122
    }
123

  
124 120
}
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/fixrelation/DedupFixRelationMapper.java
1 1
package eu.dnetlib.data.mapreduce.hbase.dedup.fixrelation;
2 2

  
3
import eu.dnetlib.data.mapreduce.JobParams;
4 3
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
5 4
import eu.dnetlib.data.proto.OafProtos;
6 5
import eu.dnetlib.data.proto.TypeProtos;
7
import eu.dnetlib.pace.config.DedupConfig;
8
import org.apache.commons.lang3.StringUtils;
9 6
import org.apache.hadoop.hbase.client.Result;
10 7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11 8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12 9

  
13 10
import java.io.IOException;
14
import java.util.List;
15 11

  
16 12
import static eu.dnetlib.data.mapreduce.util.DedupUtils.isRoot;
17 13
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getBody;
......
26 22

  
27 23
    private ImmutableBytesWritable ibw;
28 24

  
29
    private String subEntityType;
30

  
31 25
    private TypeProtos.Type expectedType;
32 26

  
33 27
    @Override
34 28
    protected void setup(final Context context) {
35
        subEntityType = context.getConfiguration().get(JobParams.SUBENTITYTYPE);
36
        if (StringUtils.isBlank(subEntityType)) {
37
            throw new RuntimeException(String.format("workflow must provide parameter '%s', got '%s'", JobParams.SUBENTITYTYPE, subEntityType));
38
        }
39 29

  
40
        switch (subEntityType) {
41
            case "organization" :
42
                expectedType = TypeProtos.Type.organization;
43
                break;
44
            case "publication" :
45
            case "dataset" :
46
            case "software" :
47
            case "other" :
48
                expectedType = TypeProtos.Type.result;
49
                break;
50
            default:
51
                throw new RuntimeException("wrong subType: " + subEntityType);
52
        }
30
        expectedType = TypeProtos.Type.valueOf(context.getConfiguration().get("type"));
53 31

  
54 32
        ibw = new ImmutableBytesWritable();
55 33
    }
56

  
57 34
    @Override
58 35
    protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
59 36
        final String rowKey = new String(key.copyBytes());
......
65 42
        }
66 43

  
67 44
        final OafProtos.Oaf oaf = getBody(value, type);
68

  
69
        // we need to check the result subtypes
70
        if (type.equals(TypeProtos.Type.result) && !subEntityType.equalsIgnoreCase(oaf.getEntity().getResult().getMetadata().getResulttype().getClassid())) {
45
        if (oaf == null) {
46
            context.getCounter(COUNTER_GROUP, String.format("%s - missing body", type.toString())).increment(1);
71 47
            return;
72 48
        }
73 49

  
74
        rel(value).values().forEach(rel -> {
50
        rel(value).forEach(rel -> {
75 51
            final String targetId = rel.getRel().getTarget();
76 52
            if (!isRoot(targetId)) {
77 53
                if (rel.getRel().getRelClass().equals("merges")) {
......
87 63
        ibw.set(value);
88 64
        switch (key.getKeyType().get()) {
89 65
            case Key.MERGES_REL:
90
                context.getCounter(COUNTER_GROUP, "Merges Relationship").increment(1);
66
                context.getCounter(COUNTER_GROUP, String.format("%s - Merge Relationship", expectedType)).increment(1);
91 67
                break;
92 68
            case Key.OTHER_REL:
93
                context.getCounter(COUNTER_GROUP, "Other Relationship").increment(1);
69
                context.getCounter(COUNTER_GROUP, String.format("%s - Other Relationship", expectedType)).increment(1);
94 70
                break;
95 71
        }
96 72
        try {
modules/dnet-mapreduce-jobs/branches/master/src/main/java/eu/dnetlib/data/mapreduce/util/OafHbaseUtils.java
121 121
		return Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(s));
122 122
	}
123 123

  
124
	public static Map<String, Oaf> rel(final Result value) {
124
	public static List<Oaf> rel(final Result value) {
125 125
		return value.list().stream()
126 126
				.filter(kv -> {
127 127
					final String q = new String(kv.getQualifier());
128 128
					return q.matches(OafRowKeyDecoder.ID_REGEX);
129
				}).collect(Collectors.toMap(
130
					kv -> new String(kv.getQualifier()),
131
					kv -> parseProto(kv.getValue())
132
		));
129
				})
130
				.filter(kv -> kv.getValue() != null && kv.getValue().length > 0)
131
				.map(kv -> parseProto(kv.getValue()))
132
				.collect(Collectors.toList());
133 133
	}
134 134

  
135 135
	public static Oaf parseProto(final byte[] value) {

Also available in: Unified diff