Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

    
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.Map;
6
import java.util.stream.Stream;
7

    
8
import com.google.common.collect.Lists;
9
import eu.dnetlib.data.mapreduce.util.OafDecoder;
10
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
11
import eu.dnetlib.data.proto.OafProtos.Oaf;
12
import eu.dnetlib.data.proto.TypeProtos.Type;
13
import org.apache.commons.collections.MapUtils;
14
import org.apache.hadoop.hbase.client.Result;
15
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
16
import org.apache.hadoop.hbase.util.Bytes;
17

    
18
/**
19
 * Created by claudio on 08/07/16.
20
 */
21
public class SoftwareEnrichmentMapper extends AbstractEnrichmentMapper {
22

    
23
	public static final String SOFTWARE = "software";
24
	public static final String PUBLICATION = "publication";
25

    
26
	public static final String RESULT_RESULT_RELATIONSHIP_IS_RELATED_TO = "resultResult_relationship_isRelatedTo";
27

    
28
	@Override
29
	protected String counterGroup() {
30
		return "Broker Enrichment Software";
31
	}
32

    
33
	@Override
34
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
35

    
36
		final Type type = getEntityType(key);
37

    
38
		final byte[] body = value.getValue(Bytes.toBytes(type.toString()), Bytes.toBytes("body"));
39

    
40
		if (body == null) {
41
			context.getCounter(counterGroup(), "missing metadata").increment(1);
42
			return;
43
		}
44

    
45
		switch (type) {
46
		case result:
47

    
48
			final Oaf oaf = OafDecoder.decode(body).getOaf();
49
			if (oaf.getDataInfo().getDeletedbyinference()) {
50
				context.getCounter(counterGroup(), "result deletedbyinference").increment(1);
51
				return;
52
			}
53
			final Oaf.Builder oafBuilder = Oaf.newBuilder(oaf);
54
			final String resulttype = oafBuilder.getEntity().getResult().getMetadata().getResulttype().getClassid();
55

    
56
			if (SOFTWARE.equalsIgnoreCase(resulttype)) {
57
				for (final byte[] publicationId : listRelatedIds(value, RESULT_RESULT_RELATIONSHIP_IS_RELATED_TO)) {
58
					emit(context, publicationId, body, SOFTWARE);
59
				}
60
				break;
61
			} else if (PUBLICATION.equalsIgnoreCase(resulttype)) {
62
				for (final String relName : Arrays.asList(RESULT_RESULT_RELATIONSHIP_IS_RELATED_TO)) {
63
					listRelations(value, relName).forEach(rel -> {
64
						oafBuilder.getEntityBuilder().addCachedOafRel(rel);
65
						context.getCounter(counterGroup(), "rel: " + relName).increment(1);
66
					});
67
				}
68

    
69
				emit(context, key.copyBytes(), oafBuilder.build().toByteArray(), PUBLICATION);
70
				break;
71
			}
72
			break;
73
		default:
74
			throw new IllegalArgumentException("invalid type: " + type);
75
		}
76
	}
77

    
78
	private Stream<Oaf> listRelations(final Result value, final String relType) {
79

    
80
		//TODO consider only relationshipt not deletedbyinference
81

    
82
		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(relType));
83
		return MapUtils.isEmpty(map) ? Stream.empty() : map.values().stream()
84
				.map(input -> OafDecoder.decode(input).getOaf())
85
				.filter(rel -> !rel.getRel().getTarget().contains("unidentified"));
86
	}
87

    
88
	private Iterable<byte[]> listRelatedIds(final Result value, final String relType) {
89

    
90
		//TODO consider only relationshipt not deletedbyinference
91

    
92
		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(relType));
93
		return MapUtils.isNotEmpty(map) ? map.keySet() : Lists.newArrayList();
94
	}
95

    
96
	private Type getEntityType(final ImmutableBytesWritable key) {
97
		return OafRowKeyDecoder.decode(key.copyBytes()).getType();
98
	}
99

    
100

    
101
}
(7-7/8)