Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
4
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue;
5

    
6
import java.io.IOException;
7
import java.util.List;
8
import java.util.Map;
9
import java.util.Objects;
10
import java.util.Set;
11
import java.util.stream.Collectors;
12

    
13
import eu.dnetlib.data.proto.TypeProtos;
14
import org.apache.commons.lang.StringUtils;
15
import org.apache.hadoop.mapreduce.Reducer.Context;
16
import org.dom4j.DocumentException;
17

    
18
import com.google.common.collect.Iterables;
19
import com.google.common.collect.Lists;
20
import com.google.common.collect.Maps;
21
import com.google.common.collect.Sets;
22

    
23
import eu.dnetlib.broker.objects.Instance;
24
import eu.dnetlib.broker.objects.OpenAireEventPayload;
25
import eu.dnetlib.broker.objects.Provenance;
26
import eu.dnetlib.broker.objects.Software;
27
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
28
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
29
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.ProtoMapping;
30
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
31
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
32
import eu.dnetlib.data.proto.OafProtos.Oaf;
33
import eu.dnetlib.data.proto.OafProtos.OafEntity;
34

    
35
public class SoftwareEventFactory extends ProtoMapping {
36

    
37
	private final Set<String> inferenceProvenance = Sets.newHashSet("sysimport:mining:repository", "iis");
38

    
39
	private Map<String, String> baseUrlMap;
40

    
41
	public SoftwareEventFactory(Map<String, String> baseUrlMap) {
42
		this.baseUrlMap = baseUrlMap;
43
	}
44

    
45
	public static List<EventWrapper> process(final Context context, final Oaf current, final Oaf other, final Float trust, final Map<String, String> baseUrlMap)
46
			throws IOException, InterruptedException, DocumentException {
47
		return new SoftwareEventFactory(baseUrlMap).processSoftware(context, current, other, trust);
48
	}
49

    
50
	public static List<EventWrapper> process(final Context context, final Oaf current, final Map<String, String> baseUrlMap)
51
			throws IOException, InterruptedException, DocumentException {
52
		return process(context, current, null, null, baseUrlMap);
53
	}
54

    
55
	private List<EventWrapper> processSoftware(final Context context, final Oaf current, final Oaf other, final Float trust)
56
			throws IOException, InterruptedException, DocumentException {
57

    
58
		final List<EventWrapper> events = Lists.newArrayList();
59
		if (other == null) {
60
			for (final Oaf oafRel : current.getEntity().getCachedOafRelList()) {
61

    
62
				final String provenance = oafRel.getDataInfo().getProvenanceaction().getClassid();
63
				if (inferenceProvenance.contains(provenance)) {
64
					final Software software = mapRelatedSoftware(oafRel.getRel().getCachedOafTarget().getEntity().getResult());
65
					events.add(doProcessSoftware(context, current, current, software, provenance, Topic.ENRICH_MISSING_SOFTWARE, trust(trust, oafRel)));
66
				}
67
			}
68
		} else {
69
			for (final Oaf currentOafRel : current.getEntity().getCachedOafRelList()) {
70
				for (final Oaf otherOafRel : other.getEntity().getCachedOafRelList()) {
71

    
72
					final String currentTarget = currentOafRel.getRel().getTarget();
73
					final String otherTarget = otherOafRel.getRel().getTarget();
74

    
75
					if (!currentTarget.equals(otherTarget)) {
76

    
77
						final String provenance = otherOafRel.getDataInfo().getProvenanceaction().getClassid();
78

    
79
						final OafEntity swEntity = otherOafRel.getRel().getCachedOafTarget().getEntity();
80
						final Software software = mapRelatedSoftware(swEntity.getResult());
81

    
82
						final boolean currentHasSw = Iterables.tryFind(current.getEntity().getCachedOafRelList(), oaf -> {
83
							final String currentSwId = oaf.getRel().getCachedOafTarget().getEntity().getId();
84
							return currentSwId.equals(swEntity.getId());
85
						}).isPresent();
86

    
87
						if (!currentHasSw) {
88
							events.add(doProcessSoftware(context, current, other, software, provenance, Topic.ENRICH_MISSING_SOFTWARE,
89
									trust(trust, currentOafRel)));
90
						}
91
					}
92
				}
93
			}
94
		}
95
		return events;
96
	}
97

    
98
	private EventWrapper doProcessSoftware(final Context context,
99
			final Oaf current,
100
			final Oaf other,
101
			final Software software,
102
			final String provenance,
103
			final Topic topic,
104
			final Float trust)
105
			throws IOException, InterruptedException, DocumentException {
106

    
107
		final OafEntity currentEntity = current.getEntity();
108
		final OafEntity otherEntity = other.getEntity();
109

    
110
		final Provenance prov = getProvenance(otherEntity, provenance);
111

    
112
		final OpenAireEventPayload payload = OpenAireEventPayloadFactory.fromOAF(currentEntity, trust, prov);
113

    
114
		final Map<String, Software> swMap = Maps.newHashMap();
115
		for (final Software s : payload.getPublication().getSoftwares()) {
116
			swMap.put(s.getLandingPage(), s);
117
		}
118

    
119
		swMap.put(software.getLandingPage(), software);
120

    
121
		payload.getPublication().setSoftwares(Lists.newArrayList(swMap.values()));
122
		payload.getHighlight().setSoftwares(Lists.newArrayList(software));
123

    
124
		final EventMessage event = asEvent(currentEntity, topic, payload, otherEntity, trust);
125

    
126
		event.setPayload(payload.toJSON());
127
		return EventWrapper.newInstance(event,
128
				payload.getHighlight().getSoftwares().stream().filter(Objects::nonNull).map(s -> s.getName()).sorted()
129
						.collect(Collectors.joining(", ")),
130
				topic.getValue());
131
	}
132

    
133
	private Provenance getProvenance(final OafEntity entity, final String provenance) {
134
		if (inferenceProvenance.contains(provenance)) { return new Provenance()
135
				.setRepositoryName("OpenAIRE")
136
				.setUrl(getUrl(entity))
137
				.setId(Iterables.getFirst(entity.getOriginalIdList(), "")); }
138
		return new Provenance()
139
				.setRepositoryName(getValue(entity.getCollectedfromList()))
140
				.setUrl(Iterables.getFirst(mapInstances(entity.getResult().getInstanceList()), new Instance()).getUrl())
141
				.setId(getValue(entity.getOriginalIdList()));
142
	}
143

    
144
	private String getUrl(OafEntity entity) {
145
		if (entity.getType().equals(TypeProtos.Type.result)) {
146
			String resulttype = entity.getResult().getMetadata().getResulttype().getClassid();
147

    
148
			return String.format(baseUrlMap.get(resulttype), StringUtils.substringAfter(entity.getId(), "|"));
149

    
150
		}
151
		return "";
152
	}
153

    
154
	private Float trust(final Float trust, final Oaf oaf) {
155
		final Float provenanceTrust = Float.valueOf(oaf.getDataInfo().getTrust());
156
		return trust != null ? trust * provenanceTrust : provenanceTrust;
157
	}
158

    
159
}
(8-8/10)