Project

General

Profile

1 51451 claudio.at
package eu.dnetlib.data.mapreduce.hbase.broker;
2
3 52765 michele.ar
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
4
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue;
5
6 51451 claudio.at
import java.io.IOException;
7
import java.util.List;
8
import java.util.Map;
9 52765 michele.ar
import java.util.Objects;
10 51451 claudio.at
import java.util.Set;
11 52765 michele.ar
import java.util.stream.Collectors;
12 51451 claudio.at
13 53260 claudio.at
import eu.dnetlib.data.proto.TypeProtos;
14 52765 michele.ar
import org.apache.commons.lang.StringUtils;
15
import org.apache.hadoop.mapreduce.Reducer.Context;
16
import org.dom4j.DocumentException;
17
18 51451 claudio.at
import com.google.common.collect.Iterables;
19
import com.google.common.collect.Lists;
20
import com.google.common.collect.Maps;
21
import com.google.common.collect.Sets;
22 52765 michele.ar
23 51451 claudio.at
import eu.dnetlib.broker.objects.Instance;
24
import eu.dnetlib.broker.objects.OpenAireEventPayload;
25
import eu.dnetlib.broker.objects.Provenance;
26
import eu.dnetlib.broker.objects.Software;
27
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
28
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
29
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.ProtoMapping;
30
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
31
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
32
import eu.dnetlib.data.proto.OafProtos.Oaf;
33
import eu.dnetlib.data.proto.OafProtos.OafEntity;
34
35
public class SoftwareEventFactory extends ProtoMapping {
36
37 52765 michele.ar
	private final Set<String> inferenceProvenance = Sets.newHashSet("sysimport:mining:repository", "iis");
38 51451 claudio.at
39 53260 claudio.at
	private Map<String, String> baseUrlMap;
40
41
	public SoftwareEventFactory(Map<String, String> baseUrlMap) {
42
		this.baseUrlMap = baseUrlMap;
43
	}
44
45
	public static List<EventWrapper> process(final Context context, final Oaf current, final Oaf other, final Float trust, final Map<String, String> baseUrlMap)
46 51451 claudio.at
			throws IOException, InterruptedException, DocumentException {
47
48
		/*
49 52765 michele.ar
		 * if (!current.getRel().hasCachedOafTarget() || (other != null && !other.getRel().hasCachedOafTarget())) {
50
		 * context.getCounter(COUNTER_GROUP, "events skipped: missing project 2nd step").increment(1); return; }
51
		 */
52 51451 claudio.at
53 53260 claudio.at
		return new SoftwareEventFactory(baseUrlMap).processSoftware(context, current, other, trust);
54 51451 claudio.at
	}
55
56 53260 claudio.at
	public static List<EventWrapper> process(final Context context, final Oaf current, final Map<String, String> baseUrlMap)
57 51451 claudio.at
			throws IOException, InterruptedException, DocumentException {
58 53260 claudio.at
		return process(context, current, null, null, baseUrlMap);
59 51451 claudio.at
	}
60
61
	private List<EventWrapper> processSoftware(final Context context, final Oaf current, final Oaf other, final Float trust)
62
			throws IOException, InterruptedException, DocumentException {
63
64
		final List<EventWrapper> events = Lists.newArrayList();
65
		if (other == null) {
66
			for (final Oaf oafRel : current.getEntity().getCachedOafRelList()) {
67
68
				final String provenance = oafRel.getDataInfo().getProvenanceaction().getClassid();
69
				if (inferenceProvenance.contains(provenance)) {
70
					final OafEntity result = oafRel.getRel().getCachedOafTarget().getEntity();
71
					events.add(doProcessSoftware(context, current, current, result, provenance, Topic.ENRICH_MISSING_SOFTWARE, trust(trust, oafRel)));
72
				}
73
			}
74
		} else {
75
			for (final Oaf currentOafRel : current.getEntity().getCachedOafRelList()) {
76
				for (final Oaf otherOafRel : other.getEntity().getCachedOafRelList()) {
77
78
					final String currentTarget = currentOafRel.getRel().getTarget();
79
					final String otherTarget = otherOafRel.getRel().getTarget();
80
81
					if (!currentTarget.equals(otherTarget)) {
82
83
						final String provenance = otherOafRel.getDataInfo().getProvenanceaction().getClassid();
84
85
						final OafEntity software = otherOafRel.getRel().getCachedOafTarget().getEntity();
86
87 52765 michele.ar
						final boolean currentHasSw = Iterables.tryFind(current.getEntity().getCachedOafRelList(), oaf -> {
88 51451 claudio.at
							final String currentSwId = oaf.getRel().getCachedOafTarget().getEntity().getId();
89 52765 michele.ar
							// System.out.println(String.format("%s = %s ? %s", currentProjectId, project.getId(),
90
							// currentProjectId.equals(project.getId())));
91 51451 claudio.at
							return currentSwId.equals(software.getId());
92
						}).isPresent();
93
94
						if (!currentHasSw) {
95 52765 michele.ar
							// System.out.println(String.format("generating event for other = %s\n\nproject = %s", other, project));
96
							events.add(doProcessSoftware(context, current, other, software, provenance, Topic.ENRICH_MISSING_PROJECT,
97
									trust(trust, currentOafRel)));
98 51451 claudio.at
						}
99
					}
100
				}
101
			}
102
		}
103
		return events;
104
	}
105
106
	private EventWrapper doProcessSoftware(final Context context,
107
			final Oaf current,
108
			final Oaf other,
109
			final OafEntity software,
110
			final String provenance,
111
			final Topic topic,
112
			final Float trust)
113
			throws IOException, InterruptedException, DocumentException {
114
115
		final OafEntity currentEntity = current.getEntity();
116
		final OafEntity otherEntity = other.getEntity();
117
118
		final Provenance prov = getProvenance(otherEntity, provenance);
119
120
		final OpenAireEventPayload payload = addSoftware(OpenAireEventPayloadFactory.fromOAF(currentEntity, trust, prov), software);
121
122
		final EventMessage event = asEvent(currentEntity, topic, payload, otherEntity, trust);
123
		event.setPayload(HighlightFactory.highlightEnrichSoftware(payload, software, provenance).toJSON());
124 52765 michele.ar
		return EventWrapper.newInstance(event,
125
				payload.getHighlight().getSoftwares().stream().filter(Objects::nonNull).map(s -> s.getName()).sorted()
126
						.collect(Collectors.joining(", ")),
127
				topic.getValue());
128 51451 claudio.at
	}
129
130
	private OpenAireEventPayload addSoftware(final OpenAireEventPayload payload, final OafEntity software) {
131
		final Map<String, Software> swMap = Maps.newHashMap();
132 52765 michele.ar
		for (final Software s : payload.getPublication().getSoftwares()) {
133 51451 claudio.at
			swMap.put(s.getLandingPage(), s);
134
		}
135
		final Software hlSw = mapRelatedSoftware(software.getResult());
136
		swMap.put(hlSw.getLandingPage(), hlSw);
137
138
		payload.getPublication().setSoftwares(Lists.newArrayList(swMap.values()));
139
140
		return payload;
141
	}
142
143
	private Provenance getProvenance(final OafEntity entity, final String provenance) {
144 52765 michele.ar
		if (inferenceProvenance.contains(provenance)) { return new Provenance()
145
				.setRepositoryName("OpenAIRE")
146 53260 claudio.at
				.setUrl(getUrl(entity))
147 52765 michele.ar
				.setId(Iterables.getFirst(entity.getOriginalIdList(), "")); }
148 51451 claudio.at
		return new Provenance()
149
				.setRepositoryName(getValue(entity.getCollectedfromList()))
150
				.setUrl(Iterables.getFirst(mapInstances(entity.getResult().getInstanceList()), new Instance()).getUrl())
151
				.setId(getValue(entity.getOriginalIdList()));
152
	}
153
154 53260 claudio.at
	private String getUrl(OafEntity entity) {
155
		if (entity.getType().equals(TypeProtos.Type.result)) {
156
			String resulttype = entity.getResult().getMetadata().getResulttype().getClassid();
157
158
			return String.format(baseUrlMap.get(resulttype), StringUtils.substringAfter(entity.getId(), "|"));
159
160
		}
161
		return "";
162
	}
163
164 51451 claudio.at
	private Float trust(final Float trust, final Oaf oaf) {
165
		final Float provenanceTrust = Float.valueOf(oaf.getDataInfo().getTrust());
166
		return trust != null ? trust * provenanceTrust : provenanceTrust;
167
	}
168
169
}