Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import com.google.common.collect.Lists;
4
import eu.dnetlib.broker.objects.OpenAireEventPayload;
5
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
6
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
7
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
8
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
9
import eu.dnetlib.data.proto.OafProtos.Oaf;
10
import org.apache.commons.lang3.StringUtils;
11

    
12
import java.util.Collection;
13
import java.util.List;
14
import java.util.Objects;
15
import java.util.function.Function;
16
import java.util.stream.Collectors;
17
import java.util.stream.Stream;
18

    
19
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
20

    
21
/**
22
 * Created by claudio on 26/07/16.
23
 */
24
public class PIDEventFactory {
25

    
26
	public static List<EventWrapper> process(final Oaf current, final Oaf other, final float trust) {
27
		return new PIDEventFactory().processPID(current, other, trust);
28
	}
29

    
30
	public List<EventWrapper> processPID(final Oaf current, final Oaf other, final float trust) {
31

    
32
		final Collection<StructuredProperty> pids = other.getEntity().getPidList()
33
				.stream()
34
				.collect(Collectors.toMap(
35
						p -> StringUtils.lowerCase(p.getQualifier().getClassid()) + ":" +StringUtils.lowerCase(p.getValue()),
36
						Function.identity(),
37
						(p1, p2) -> p1))
38
				.values();
39

    
40
		final Stream<EventWrapper> more = pids.stream()
41
				.filter(p -> !hasPidValue(current, p.getValue()))
42
				.map(p -> doProcessPID(current, other, p, Topic.ENRICH_MORE_PID, trust));
43

    
44
		if (current.getEntity().getPidList().isEmpty()) { return Stream.concat(
45
				more,
46
				pids.stream().map(p -> doProcessPID(current, other, p, Topic.ENRICH_MISSING_PID, trust)))
47
				.collect(Collectors.toList()); }
48

    
49
		return more.collect(Collectors.toList());
50
	}
51

    
52
	private EventWrapper doProcessPID(
53
			final Oaf current,
54
			final Oaf other,
55
			final StructuredProperty pidOther,
56
			final Topic topic,
57
			final float trust) {
58

    
59
		final Oaf.Builder prototype = Oaf.newBuilder(current);
60
		prototype.getEntityBuilder().addPid(pidOther);
61
		final Oaf oaf = prototype.build();
62

    
63
		final OpenAireEventPayload payload =
64
				HighlightFactory.highlightEnrichPid(OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), other.getEntity(), trust),
65
						Lists.newArrayList(pidOther));
66
		return EventWrapper.newInstance(
67
				asEvent(oaf.getEntity(), topic, payload, other.getEntity(), trust),
68
				payload.getHighlight().getPids().stream().filter(Objects::nonNull).map(p -> p.getType() + ":" + p.getValue()).sorted()
69
						.collect(Collectors.joining(", ")),
70
				topic.getValue());
71
	}
72

    
73
	private boolean hasPidValue(final Oaf oaf, final String value) {
74
		return oaf.getEntity().getPidList()
75
				.stream()
76
				.anyMatch(pid -> pid.getValue().equalsIgnoreCase(value));
77
	}
78
}
(4-4/10)