Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.openorgs;
2

    
3
import java.io.IOException;
4
import java.util.ArrayList;
5
import java.util.List;
6

    
7
import org.apache.commons.lang3.StringUtils;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.io.NullWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Reducer;
12

    
13
import com.google.protobuf.InvalidProtocolBufferException;
14

    
15
import eu.dnetlib.data.proto.OafProtos.Oaf;
16
import eu.dnetlib.data.proto.OafProtos.OafEntity;
17

    
18
public class GenerateSimilaritiesReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, Text> {
19

    
20
	private final Text valueOut = new Text();
21

    
22
	@Override
23
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context)
24
			throws IOException, InterruptedException {
25

    
26
		try {
27
			final List<OafEntity> list = new ArrayList<>();
28

    
29
			for (final ImmutableBytesWritable ibw : values) {
30
				list.add(Oaf.parseFrom(ibw.get()).getEntity());
31
			}
32

    
33
			if (list.size() < 2) { return; }
34

    
35
			final String mainId = findMainId(OpenOrgsCommon.OPENORGS_MAIN_PREFIX, list);
36

    
37
			if (StringUtils.isNotBlank(mainId)) {
38
				for (final OafEntity o : list) {
39
					if (!o.getOriginalIdList().contains(mainId)) {
40
						context.getCounter("organization", "relations to " + OpenOrgsCommon.OPENORGS_MAIN_PREFIX + "*").increment(1);
41
						emit(newSimilarity(mainId, o), context);
42
					}
43
				}
44
			}
45
		} catch (final InvalidProtocolBufferException e) {
46
			e.printStackTrace();
47
			throw new RuntimeException(e);
48
		}
49
	}
50

    
51
	private String findMainId(final String idPrefix, final List<OafEntity> list) {
52
		final List<String> valids = new ArrayList<>();
53

    
54
		for (final OafEntity e : list) {
55
			for (final String id : e.getOriginalIdList()) {
56
				if (id.startsWith(idPrefix)) {
57
					valids.add(id);
58
				}
59
			}
60
		}
61
		if (valids.isEmpty()) { return null; }
62

    
63
		valids.sort(String::compareTo);
64

    
65
		return valids.get(0);
66
	}
67

    
68
	private void emit(final Similarity simrel, final Context context) {
69
		try {
70
			valueOut.set(simrel.toTsv());
71
			context.getCounter("organization", "relations (total)").increment(1);
72
			context.write(NullWritable.get(), valueOut);
73
		} catch (IOException | InterruptedException e) {
74
			throw new RuntimeException(e);
75
		}
76
	}
77

    
78
	private Similarity newSimilarity(final String openOrgsId, final OafEntity oafEntity) {
79
		final Similarity s = new Similarity();
80
		s.setOpenOrgID(openOrgsId);
81
		s.setOpenaireOriginalId(oafEntity.getOriginalId(0));
82
		s.setName(oafEntity.getOrganization().getMetadata().getLegalname().getValue());
83
		s.setAcronym(oafEntity.getOrganization().getMetadata().getLegalshortname().getValue());
84
		s.setCountry(oafEntity.getOrganization().getMetadata().getCountry().getClassid());
85
		s.setUrl(oafEntity.getOrganization().getMetadata().getWebsiteurl().getValue());
86
		s.setCollectedFrom(oafEntity.getCollectedfrom(0).getValue());
87
		return s;
88
	}
89

    
90
}
(5-5/7)