Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
4
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue;
5

    
6
import java.io.IOException;
7
import java.util.List;
8
import java.util.Map;
9
import java.util.Objects;
10
import java.util.Set;
11
import java.util.stream.Collectors;
12

    
13
import eu.dnetlib.data.proto.TypeProtos;
14
import org.apache.commons.lang.StringUtils;
15
import org.apache.hadoop.mapreduce.Reducer.Context;
16
import org.dom4j.DocumentException;
17

    
18
import com.google.common.collect.Iterables;
19
import com.google.common.collect.Lists;
20
import com.google.common.collect.Maps;
21
import com.google.common.collect.Sets;
22

    
23
import eu.dnetlib.broker.objects.Instance;
24
import eu.dnetlib.broker.objects.OpenAireEventPayload;
25
import eu.dnetlib.broker.objects.Provenance;
26
import eu.dnetlib.broker.objects.Software;
27
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
28
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
29
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.ProtoMapping;
30
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
31
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
32
import eu.dnetlib.data.proto.OafProtos.Oaf;
33
import eu.dnetlib.data.proto.OafProtos.OafEntity;
34

    
35
public class SoftwareEventFactory extends ProtoMapping {
36

    
37
	private final Set<String> inferenceProvenance = Sets.newHashSet("sysimport:mining:repository", "iis");
38

    
39
	private Map<String, String> baseUrlMap;
40

    
41
	public SoftwareEventFactory(Map<String, String> baseUrlMap) {
42
		this.baseUrlMap = baseUrlMap;
43
	}
44

    
45
	public static List<EventWrapper> process(final Context context, final Oaf current, final Oaf other, final Float trust, final Map<String, String> baseUrlMap)
46
			throws IOException, InterruptedException, DocumentException {
47

    
48
		/*
49
		 * if (!current.getRel().hasCachedOafTarget() || (other != null && !other.getRel().hasCachedOafTarget())) {
50
		 * context.getCounter(COUNTER_GROUP, "events skipped: missing project 2nd step").increment(1); return; }
51
		 */
52

    
53
		return new SoftwareEventFactory(baseUrlMap).processSoftware(context, current, other, trust);
54
	}
55

    
56
	public static List<EventWrapper> process(final Context context, final Oaf current, final Map<String, String> baseUrlMap)
57
			throws IOException, InterruptedException, DocumentException {
58
		return process(context, current, null, null, baseUrlMap);
59
	}
60

    
61
	private List<EventWrapper> processSoftware(final Context context, final Oaf current, final Oaf other, final Float trust)
62
			throws IOException, InterruptedException, DocumentException {
63

    
64
		final List<EventWrapper> events = Lists.newArrayList();
65
		if (other == null) {
66
			for (final Oaf oafRel : current.getEntity().getCachedOafRelList()) {
67

    
68
				final String provenance = oafRel.getDataInfo().getProvenanceaction().getClassid();
69
				if (inferenceProvenance.contains(provenance)) {
70
					final OafEntity result = oafRel.getRel().getCachedOafTarget().getEntity();
71
					events.add(doProcessSoftware(context, current, current, result, provenance, Topic.ENRICH_MISSING_SOFTWARE, trust(trust, oafRel)));
72
				}
73
			}
74
		} else {
75
			for (final Oaf currentOafRel : current.getEntity().getCachedOafRelList()) {
76
				for (final Oaf otherOafRel : other.getEntity().getCachedOafRelList()) {
77

    
78
					final String currentTarget = currentOafRel.getRel().getTarget();
79
					final String otherTarget = otherOafRel.getRel().getTarget();
80

    
81
					if (!currentTarget.equals(otherTarget)) {
82

    
83
						final String provenance = otherOafRel.getDataInfo().getProvenanceaction().getClassid();
84

    
85
						final OafEntity software = otherOafRel.getRel().getCachedOafTarget().getEntity();
86

    
87
						final boolean currentHasSw = Iterables.tryFind(current.getEntity().getCachedOafRelList(), oaf -> {
88
							final String currentSwId = oaf.getRel().getCachedOafTarget().getEntity().getId();
89
							// System.out.println(String.format("%s = %s ? %s", currentProjectId, project.getId(),
90
							// currentProjectId.equals(project.getId())));
91
							return currentSwId.equals(software.getId());
92
						}).isPresent();
93

    
94
						if (!currentHasSw) {
95
							// System.out.println(String.format("generating event for other = %s\n\nproject = %s", other, project));
96
							events.add(doProcessSoftware(context, current, other, software, provenance, Topic.ENRICH_MISSING_PROJECT,
97
									trust(trust, currentOafRel)));
98
						}
99
					}
100
				}
101
			}
102
		}
103
		return events;
104
	}
105

    
106
	private EventWrapper doProcessSoftware(final Context context,
107
			final Oaf current,
108
			final Oaf other,
109
			final OafEntity software,
110
			final String provenance,
111
			final Topic topic,
112
			final Float trust)
113
			throws IOException, InterruptedException, DocumentException {
114

    
115
		final OafEntity currentEntity = current.getEntity();
116
		final OafEntity otherEntity = other.getEntity();
117

    
118
		final Provenance prov = getProvenance(otherEntity, provenance);
119

    
120
		final OpenAireEventPayload payload = addSoftware(OpenAireEventPayloadFactory.fromOAF(currentEntity, trust, prov), software);
121

    
122
		final EventMessage event = asEvent(currentEntity, topic, payload, otherEntity, trust);
123
		event.setPayload(HighlightFactory.highlightEnrichSoftware(payload, software, provenance).toJSON());
124
		return EventWrapper.newInstance(event,
125
				payload.getHighlight().getSoftwares().stream().filter(Objects::nonNull).map(s -> s.getName()).sorted()
126
						.collect(Collectors.joining(", ")),
127
				topic.getValue());
128
	}
129

    
130
	private OpenAireEventPayload addSoftware(final OpenAireEventPayload payload, final OafEntity software) {
131
		final Map<String, Software> swMap = Maps.newHashMap();
132
		for (final Software s : payload.getPublication().getSoftwares()) {
133
			swMap.put(s.getLandingPage(), s);
134
		}
135
		final Software hlSw = mapRelatedSoftware(software.getResult());
136
		swMap.put(hlSw.getLandingPage(), hlSw);
137

    
138
		payload.getPublication().setSoftwares(Lists.newArrayList(swMap.values()));
139

    
140
		return payload;
141
	}
142

    
143
	private Provenance getProvenance(final OafEntity entity, final String provenance) {
144
		if (inferenceProvenance.contains(provenance)) { return new Provenance()
145
				.setRepositoryName("OpenAIRE")
146
				.setUrl(getUrl(entity))
147
				.setId(Iterables.getFirst(entity.getOriginalIdList(), "")); }
148
		return new Provenance()
149
				.setRepositoryName(getValue(entity.getCollectedfromList()))
150
				.setUrl(Iterables.getFirst(mapInstances(entity.getResult().getInstanceList()), new Instance()).getUrl())
151
				.setId(getValue(entity.getOriginalIdList()));
152
	}
153

    
154
	private String getUrl(OafEntity entity) {
155
		if (entity.getType().equals(TypeProtos.Type.result)) {
156
			String resulttype = entity.getResult().getMetadata().getResulttype().getClassid();
157

    
158
			return String.format(baseUrlMap.get(resulttype), StringUtils.substringAfter(entity.getId(), "|"));
159

    
160
		}
161
		return "";
162
	}
163

    
164
	private Float trust(final Float trust, final Oaf oaf) {
165
		final Float provenanceTrust = Float.valueOf(oaf.getDataInfo().getTrust());
166
		return trust != null ? trust * provenanceTrust : provenanceTrust;
167
	}
168

    
169
}
(7-7/9)