Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.openorgs;
2

    
3
import java.io.IOException;
4
import java.util.ArrayList;
5
import java.util.List;
6

    
7
import org.apache.commons.lang3.StringUtils;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.io.NullWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Reducer;
12

    
13
import com.google.protobuf.InvalidProtocolBufferException;
14
import com.googlecode.protobuf.format.JsonFormat;
15

    
16
import eu.dnetlib.data.proto.OafProtos.Oaf;
17
import eu.dnetlib.data.proto.OafProtos.OafEntity;
18

    
19
public class GenerateOrganizationsReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, NullWritable, Text> {
20

    
21
	private final Text valueOut = new Text();
22

    
23
	@Override
24
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context)
25
			throws IOException, InterruptedException {
26

    
27
		try {
28
			final List<OafEntity> list = new ArrayList<>();
29
			for (final ImmutableBytesWritable ibw : values) {
30
				list.add(Oaf.parseFrom(ibw.get()).getEntity());
31
			}
32

    
33
			final OafEntity e1 = findOrgToEmit(OpenOrgsCommon.OPENORGS_MAIN_PREFIX, list);
34
			if (e1 != null) {
35
				context.getCounter("organization", "already present in openOrgs").increment(1);;
36
				return;
37
			}
38
			final OafEntity e2 = findOrgToEmit(OpenOrgsCommon.OPENORGS_CORDA_FP7_PREFIX, list);
39
			if (e2 != null) {
40
				context.getCounter("organization", "new (from corda FP7)").increment(1);
41
				emit(e2, context);
42
				return;
43
			}
44
			final OafEntity e3 = findOrgToEmit(OpenOrgsCommon.OPENORGS_CORDA_H2020_PREFIX, list);
45
			if (e3 != null) {
46
				context.getCounter("organization", "new (from corda H2020)").increment(1);
47
				emit(e3, context);
48
				return;
49
			}
50
			final OafEntity e4 = findOrgToEmit("20|", list);
51
			if (e4 != null) {
52
				context.getCounter("organization", "new (from other sources)").increment(1);
53
				emit(e4, context);
54
				return;
55
			}
56
		} catch (final InvalidProtocolBufferException e) {
57
			e.printStackTrace();
58
			throw new RuntimeException(e);
59
		}
60
	}
61

    
62
	private OafEntity findOrgToEmit(final String idPrefix, final List<OafEntity> list) {
63
		final List<OafEntity> valids = new ArrayList<>();
64

    
65
		for (final OafEntity e : list) {
66
			if (e.getId().startsWith(idPrefix)) {
67
				valids.add(e);
68
			}
69
		}
70
		if (valids.isEmpty()) { return null; }
71

    
72
		valids.sort((o1, o2) -> StringUtils.compare(o1.getId(), o2.getId()));
73

    
74
		return valids.get(0);
75
	}
76

    
77
	private void emit(final OafEntity entity, final Context context) {
78
		try {
79
			context.getCounter("organization", "new (total)").increment(1);
80
			valueOut.set(JsonFormat.printToString(entity));
81
			context.write(NullWritable.get(), valueOut);
82
		} catch (IOException | InterruptedException e) {
83
			throw new RuntimeException(e);
84
		}
85
	}
86

    
87
}
(3-3/7)