Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.stream.Collectors;
6

    
7
import com.google.common.collect.Lists;
8
import com.google.common.collect.Streams;
9
import eu.dnetlib.data.mapreduce.hbase.broker.*;
10
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
11
import eu.dnetlib.data.mapreduce.util.DedupUtils;
12
import eu.dnetlib.data.proto.OafProtos.Oaf;
13
import org.apache.commons.lang.StringUtils;
14
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15
import org.dom4j.DocumentException;
16

    
17
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getKey;
18

    
19
/**
20
 * Created by claudio on 08/07/16.
21
 */
22
public class EnrichmentReducer extends AbstractEnrichmentReducer {
23

    
24
	@Override
25
	protected String counterGroup() {
26
		return "Broker Enrichment";
27
	}
28

    
29
	@Override
30
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
31
			InterruptedException {
32

    
33
		try {
34
			generateEvents(Streams.stream(values)
35
					.limit(LIMIT)
36
					.map(EnrichmentReducer::toOaf)
37
					.collect(Collectors.toList()), context);
38
		} catch (final DocumentException e) {
39
			throw new RuntimeException(e);
40
		}
41
	}
42

    
43
	private void generateEvents(final List<Oaf> oafList, final Context context) throws IOException, InterruptedException, DocumentException {
44

    
45
		for (final Oaf current : oafList) {
46

    
47
			context.getCounter(counterGroup(), "entity type: " + current.getEntity().getResult().getMetadata().getResulttype().getClassid()).increment(1);
48

    
49
			final String currentId = current.getEntity().getId();
50

    
51
			final String currentDsId = StringUtils.substringAfter(getKey(current.getEntity().getCollectedfromList()), "|");
52
			final String currentDsType = dsTypeMap.get(currentDsId);
53

    
54
			// System.out.println(String.format("'%s' -> '%s'", currentDsId, currentDsType));
55

    
56
			if (StringUtils.isBlank(currentDsType) && !dsWhitelist.contains(currentDsId)) {
57
				context.getCounter("events skipped", "datasource type excluded").increment(1);
58
			} else {
59
				if (dsBlacklist.contains(currentDsId)) {
60
					context.getCounter("events skipped", "datasource blacklisted").increment(1);
61
				} else {
62

    
63
					final List<EventWrapper> events = Lists.newArrayList();
64
					for (final Oaf other : oafList) {
65

    
66
						final String otherId = other.getEntity().getId();
67
						if (!currentId.equals(otherId)) {
68

    
69
							final float trust = similarity(current, other);
70

    
71
							if (!DedupUtils.isRoot(current.getEntity().getId())) {
72
								events.addAll(PIDEventFactory.process(current, other, trust));
73
								events.addAll(OAVersionEventFactory.process(current, other, trust, untrustedOaDsList));
74
								events.addAll(AbstractEventFactory.process(current, other, trust));
75
								events.addAll(PublicationDateEventFactory.process(current, other, trust));
76
							}
77
							events.addAll(SubjectEventFactory.process(context, current, other, trust));
78

    
79
						} else if (oafList.size() == 1) {
80
							events.addAll(SubjectEventFactory.process(context, current));
81
						}
82
					}
83
					emit(events, context);
84
				}
85
			}
86
		}
87
	}
88

    
89
}
(4-4/8)