Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.openorgs;
2

    
3
import java.io.IOException;
4
import java.util.ArrayList;
5
import java.util.List;
6

    
7
import org.apache.commons.lang3.StringUtils;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.io.NullWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Reducer;
12

    
13
import com.google.protobuf.InvalidProtocolBufferException;
14

    
15
import eu.dnetlib.data.proto.OafProtos.Oaf;
16
import eu.dnetlib.data.proto.OafProtos.OafEntity;
17

    
18
public class GenerateSimilaritiesReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, Text> {
19

    
20
	private final Text valueOut = new Text();
21

    
22
	@Override
23
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context)
24
			throws IOException, InterruptedException {
25

    
26
		try {
27
			final List<OafEntity> list = new ArrayList<>();
28

    
29
			for (final ImmutableBytesWritable ibw : values) {
30
				list.add(Oaf.parseFrom(ibw.get()).getEntity());
31
			}
32

    
33
			if (list.size() < 2) { return; }
34

    
35
			if (reduceUsingId(OpenOrgsCommon.OPENORGS_MAIN_PREFIX, list, context)
36
					|| reduceUsingId(OpenOrgsCommon.OPENORGS_CORDA_FP7_PREFIX, list, context)
37
					|| reduceUsingId(OpenOrgsCommon.OPENORGS_CORDA_H2020_PREFIX, list, context)
38
					|| reduceUsingId("20|", list, context)) {
39
				// NOHING TODO
40
			}
41
		} catch (final InvalidProtocolBufferException e) {
42
			e.printStackTrace();
43
			throw new RuntimeException(e);
44
		}
45
	}
46

    
47
	private boolean reduceUsingId(final String idPrefix, final List<OafEntity> list, final Context context) {
48
		final String mainId = findMainId(idPrefix, list);
49

    
50
		if (StringUtils.isNotBlank(mainId)) {
51
			for (final OafEntity o : list) {
52
				if (!o.getId().equals(mainId)) {
53
					context.getCounter("organization", "relations to " + idPrefix + "*").increment(1);
54
					emit(newSimilarity(mainId, o), context);
55
				}
56
			}
57

    
58
			return true;
59
		}
60
		return false;
61
	}
62

    
63
	private String findMainId(final String idPrefix, final List<OafEntity> list) {
64
		final List<String> valids = new ArrayList<>();
65

    
66
		for (final OafEntity e : list) {
67
			if (e.getId().startsWith(idPrefix)) {
68
				valids.add(e.getId());
69
			}
70
		}
71
		if (valids.isEmpty()) { return null; }
72

    
73
		valids.sort(String::compareTo);
74

    
75
		return valids.get(0);
76
	}
77

    
78
	private void emit(final Similarity simrel, final Context context) {
79
		try {
80
			valueOut.set(simrel.toJsonBytes());
81
			context.getCounter("organization", "relations (total)").increment(1);
82
			context.write(NullWritable.get(), valueOut);
83
		} catch (IOException | InterruptedException e) {
84
			throw new RuntimeException(e);
85
		}
86
	}
87

    
88
	private Similarity newSimilarity(final String openOrgsId, final OafEntity oafEntity) {
89
		final Similarity s = new Similarity();
90
		s.setOpenOrgID(openOrgsId);
91
		s.setOpenaireId(oafEntity.getId());
92
		s.setOpenaireOriginalId(oafEntity.getOriginalId(0));
93
		s.setName(oafEntity.getOrganization().getMetadata().getLegalname().getValue());
94
		s.setAcronym(oafEntity.getOrganization().getMetadata().getLegalshortname().getValue());
95
		s.setCountry(oafEntity.getOrganization().getMetadata().getCountry().getClassid());
96
		s.setUrl(oafEntity.getOrganization().getMetadata().getWebsiteurl().getValue());
97
		s.setCollectedFrom(oafEntity.getCollectedfrom(0).getValue());
98
		return s;
99
	}
100

    
101
}
(5-5/7)