Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
4
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue;
5

    
6
import java.io.IOException;
7
import java.util.List;
8
import java.util.Map;
9
import java.util.Objects;
10
import java.util.Set;
11
import java.util.stream.Collectors;
12

    
13
import org.apache.commons.lang.StringUtils;
14
import org.apache.hadoop.mapreduce.Reducer.Context;
15
import org.dom4j.DocumentException;
16

    
17
import com.google.common.collect.Iterables;
18
import com.google.common.collect.Lists;
19
import com.google.common.collect.Maps;
20
import com.google.common.collect.Sets;
21

    
22
import eu.dnetlib.broker.objects.Instance;
23
import eu.dnetlib.broker.objects.OpenAireEventPayload;
24
import eu.dnetlib.broker.objects.Provenance;
25
import eu.dnetlib.broker.objects.Software;
26
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
27
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
28
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.ProtoMapping;
29
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
30
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
31
import eu.dnetlib.data.proto.OafProtos.Oaf;
32
import eu.dnetlib.data.proto.OafProtos.OafEntity;
33

    
34
public class SoftwareEventFactory extends ProtoMapping {
35

    
36
	private final Set<String> inferenceProvenance = Sets.newHashSet("sysimport:mining:repository", "iis");
37

    
38
	public static List<EventWrapper> process(final Context context, final Oaf current, final Oaf other, final Float trust)
39
			throws IOException, InterruptedException, DocumentException {
40

    
41
		/*
42
		 * if (!current.getRel().hasCachedOafTarget() || (other != null && !other.getRel().hasCachedOafTarget())) {
43
		 * context.getCounter(COUNTER_GROUP, "events skipped: missing project 2nd step").increment(1); return; }
44
		 */
45

    
46
		return new SoftwareEventFactory().processSoftware(context, current, other, trust);
47
	}
48

    
49
	public static List<EventWrapper> process(final Context context, final Oaf current)
50
			throws IOException, InterruptedException, DocumentException {
51
		return process(context, current, null, null);
52
	}
53

    
54
	private List<EventWrapper> processSoftware(final Context context, final Oaf current, final Oaf other, final Float trust)
55
			throws IOException, InterruptedException, DocumentException {
56

    
57
		final List<EventWrapper> events = Lists.newArrayList();
58
		if (other == null) {
59
			for (final Oaf oafRel : current.getEntity().getCachedOafRelList()) {
60

    
61
				final String provenance = oafRel.getDataInfo().getProvenanceaction().getClassid();
62
				if (inferenceProvenance.contains(provenance)) {
63
					final OafEntity result = oafRel.getRel().getCachedOafTarget().getEntity();
64
					events.add(doProcessSoftware(context, current, current, result, provenance, Topic.ENRICH_MISSING_SOFTWARE, trust(trust, oafRel)));
65
				}
66
			}
67
		} else {
68
			for (final Oaf currentOafRel : current.getEntity().getCachedOafRelList()) {
69
				for (final Oaf otherOafRel : other.getEntity().getCachedOafRelList()) {
70

    
71
					final String currentTarget = currentOafRel.getRel().getTarget();
72
					final String otherTarget = otherOafRel.getRel().getTarget();
73

    
74
					if (!currentTarget.equals(otherTarget)) {
75

    
76
						final String provenance = otherOafRel.getDataInfo().getProvenanceaction().getClassid();
77

    
78
						final OafEntity software = otherOafRel.getRel().getCachedOafTarget().getEntity();
79

    
80
						final boolean currentHasSw = Iterables.tryFind(current.getEntity().getCachedOafRelList(), oaf -> {
81
							final String currentSwId = oaf.getRel().getCachedOafTarget().getEntity().getId();
82
							// System.out.println(String.format("%s = %s ? %s", currentProjectId, project.getId(),
83
							// currentProjectId.equals(project.getId())));
84
							return currentSwId.equals(software.getId());
85
						}).isPresent();
86

    
87
						if (!currentHasSw) {
88
							// System.out.println(String.format("generating event for other = %s\n\nproject = %s", other, project));
89
							events.add(doProcessSoftware(context, current, other, software, provenance, Topic.ENRICH_MISSING_PROJECT,
90
									trust(trust, currentOafRel)));
91
						}
92
					}
93
				}
94
			}
95
		}
96
		return events;
97
	}
98

    
99
	private EventWrapper doProcessSoftware(final Context context,
100
			final Oaf current,
101
			final Oaf other,
102
			final OafEntity software,
103
			final String provenance,
104
			final Topic topic,
105
			final Float trust)
106
			throws IOException, InterruptedException, DocumentException {
107

    
108
		final OafEntity currentEntity = current.getEntity();
109
		final OafEntity otherEntity = other.getEntity();
110

    
111
		final Provenance prov = getProvenance(otherEntity, provenance);
112

    
113
		final OpenAireEventPayload payload = addSoftware(OpenAireEventPayloadFactory.fromOAF(currentEntity, trust, prov), software);
114

    
115
		final EventMessage event = asEvent(currentEntity, topic, payload, otherEntity, trust);
116
		event.setPayload(HighlightFactory.highlightEnrichSoftware(payload, software, provenance).toJSON());
117
		return EventWrapper.newInstance(event,
118
				payload.getHighlight().getSoftwares().stream().filter(Objects::nonNull).map(s -> s.getName()).sorted()
119
						.collect(Collectors.joining(", ")),
120
				topic.getValue());
121
	}
122

    
123
	private OpenAireEventPayload addSoftware(final OpenAireEventPayload payload, final OafEntity software) {
124
		final Map<String, Software> swMap = Maps.newHashMap();
125
		for (final Software s : payload.getPublication().getSoftwares()) {
126
			swMap.put(s.getLandingPage(), s);
127
		}
128
		final Software hlSw = mapRelatedSoftware(software.getResult());
129
		swMap.put(hlSw.getLandingPage(), hlSw);
130

    
131
		payload.getPublication().setSoftwares(Lists.newArrayList(swMap.values()));
132

    
133
		return payload;
134
	}
135

    
136
	private Provenance getProvenance(final OafEntity entity, final String provenance) {
137
		if (inferenceProvenance.contains(provenance)) { return new Provenance()
138
				.setRepositoryName("OpenAIRE")
139
				.setUrl("https://beta.openaire.eu/search/publication?articleId=" + StringUtils.substringAfter(entity.getId(), "|"))
140
				.setId(Iterables.getFirst(entity.getOriginalIdList(), "")); }
141
		return new Provenance()
142
				.setRepositoryName(getValue(entity.getCollectedfromList()))
143
				.setUrl(Iterables.getFirst(mapInstances(entity.getResult().getInstanceList()), new Instance()).getUrl())
144
				.setId(getValue(entity.getOriginalIdList()));
145
	}
146

    
147
	private Float trust(final Float trust, final Oaf oaf) {
148
		final Float provenanceTrust = Float.valueOf(oaf.getDataInfo().getTrust());
149
		return trust != null ? trust * provenanceTrust : provenanceTrust;
150
	}
151

    
152
}
(7-7/9)