Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Objects;
7
import java.util.Set;
8
import java.util.stream.Collectors;
9

    
10
import com.google.common.collect.Iterables;
11
import com.google.common.collect.Lists;
12
import com.google.common.collect.Maps;
13
import com.google.common.collect.Sets;
14
import eu.dnetlib.broker.objects.Instance;
15
import eu.dnetlib.broker.objects.OpenAireEventPayload;
16
import eu.dnetlib.broker.objects.Project;
17
import eu.dnetlib.broker.objects.Provenance;
18
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
19
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
20
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.ProtoMapping;
21
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
22
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
23
import eu.dnetlib.data.mapreduce.util.DedupUtils;
24
import eu.dnetlib.data.proto.OafProtos.Oaf;
25
import eu.dnetlib.data.proto.OafProtos.OafEntity;
26
import org.apache.commons.lang.StringUtils;
27
import org.apache.hadoop.io.Text;
28
import org.apache.hadoop.mapreduce.Reducer.Context;
29
import org.dom4j.DocumentException;
30

    
31
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
32
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue;
33

    
34
@SuppressWarnings("rawtypes")
35
public class ProjectEventFactory extends ProtoMapping {
36

    
37
	private static String COUNTER_GROUP = "Broker Enrichment projects";
38

    
39
	private final Set<String> inferenceProvenance = Sets.newHashSet("sysimport:mining:repository", "iis");
40
	private final Set<String> aggregationProvenance = Sets.newHashSet("sysimport:crosswalk:repository");
41

    
42
	protected Text tKey = new Text("");
43

    
44
	public static List<EventWrapper> process(final Context context, final Oaf current, final Oaf other, final Float trust)
45
			throws IOException, InterruptedException, DocumentException {
46

    
47
		/*
48
		 * if (!current.getRel().hasCachedOafTarget() || (other != null && !other.getRel().hasCachedOafTarget())) {
49
		 * context.getCounter(COUNTER_GROUP, "events skipped: missing project 2nd step").increment(1); return; }
50
		 */
51

    
52
		return new ProjectEventFactory().processProject(context, current, other, trust);
53
	}
54

    
55
	public static List<EventWrapper> process(final Context context, final Oaf current)
56
			throws IOException, InterruptedException, DocumentException {
57
		return process(context, current, null, null);
58
	}
59

    
60
	private List<EventWrapper> processProject(final Context context, final Oaf current, final Oaf other, final Float trust)
61
			throws IOException, InterruptedException, DocumentException {
62

    
63
		final List<EventWrapper> events = Lists.newArrayList();
64
		if (other == null) {
65
			for (final Oaf oafRel : current.getEntity().getCachedOafRelList()) {
66

    
67
				final String provenance = oafRel.getDataInfo().getProvenanceaction().getClassid();
68
				if (inferenceProvenance.contains(provenance)) {
69
					final OafEntity project = oafRel.getRel().getCachedOafTarget().getEntity();
70
					events.add(doProcessProject(context, current, current, project, provenance, Topic.ENRICH_MISSING_PROJECT, trust(trust, oafRel)));
71
				}
72
			}
73
		} else {
74
			for (final Oaf currentOafRel : current.getEntity().getCachedOafRelList()) {
75
				for (final Oaf otherOafRel : other.getEntity().getCachedOafRelList()) {
76

    
77
					final String currentTarget = currentOafRel.getRel().getTarget();
78
					final String otherTarget = otherOafRel.getRel().getTarget();
79

    
80
					if (!currentTarget.equals(otherTarget) && !DedupUtils.isRoot(current.getEntity().getId())) {
81

    
82
						final String provenance = otherOafRel.getDataInfo().getProvenanceaction().getClassid();
83

    
84
						final OafEntity project = otherOafRel.getRel().getCachedOafTarget().getEntity();
85

    
86
						final boolean currentHasProject = Iterables.tryFind(current.getEntity().getCachedOafRelList(), oaf -> {
87
							final String currentProjectId = oaf.getRel().getCachedOafTarget().getEntity().getId();
88
							// System.out.println(String.format("%s = %s ? %s", currentProjectId, project.getId(),
89
							// currentProjectId.equals(project.getId())));
90
							return currentProjectId.equals(project.getId());
91
						}).isPresent();
92

    
93
						if (!currentHasProject) {
94
							// System.out.println(String.format("generating event for other = %s\n\nproject = %s", other, project));
95
							events.add(
96
									doProcessProject(context, current, other, project, provenance, Topic.ENRICH_MISSING_PROJECT, trust(trust, currentOafRel)));
97
						}
98
					}
99
				}
100
			}
101
		}
102
		return events;
103
	}
104

    
105
	private EventWrapper doProcessProject(final Context context,
106
			final Oaf current,
107
			final Oaf other,
108
			final OafEntity project,
109
			final String provenance,
110
			final Topic topic,
111
			final Float trust)
112
			throws IOException, InterruptedException, DocumentException {
113

    
114
		final OafEntity currentEntity = current.getEntity();
115
		final OafEntity otherEntity = other.getEntity();
116

    
117
		// System.out.println("ProjectEventFactory.doProcessProject = " + entity);
118

    
119
		final Provenance prov = getProvenance(otherEntity, provenance);
120

    
121
		final OpenAireEventPayload payload = addProject(OpenAireEventPayloadFactory.fromOAF(currentEntity, trust, prov), project);
122

    
123
		final EventMessage event = asEvent(currentEntity, topic, payload, otherEntity, trust);
124
		event.setPayload(HighlightFactory.highlightEnrichProject(payload, project, provenance).toJSON());
125
		return EventWrapper.newInstance(event,
126
				payload.getHighlight().getProjects()
127
						.stream()
128
						.filter(Objects::nonNull)
129
						.map(p -> p.getFunder() + ":" + p.getCode())
130
						.sorted()
131
						.collect(Collectors.joining(", ")),
132
				topic.getValue());
133
	}
134

    
135
	private OpenAireEventPayload addProject(final OpenAireEventPayload payload, final OafEntity project) {
136
		final Map<String, Project> projects = Maps.newHashMap();
137
		for (final Project prj : payload.getPublication().getProjects()) {
138
			projects.put(prj.getCode() + prj.getTitle(), prj);
139
		}
140
		final Project hlProject = mapRelatedProject(project.getProject());
141
		projects.put(hlProject.getCode() + hlProject.getTitle(), hlProject);
142

    
143
		payload.getPublication().setProjects(Lists.newArrayList(projects.values()));
144

    
145
		return payload;
146
	}
147

    
148
	private Provenance getProvenance(final OafEntity entity, final String provenance) {
149
		if (inferenceProvenance.contains(provenance)) { return new Provenance()
150
				.setRepositoryName("OpenAIRE")
151
				.setUrl("https://beta.openaire.eu/search/publication?articleId=" + StringUtils.substringAfter(entity.getId(), "|"))
152
				.setId(Iterables.getFirst(entity.getOriginalIdList(), "")); }
153
		return new Provenance()
154
				.setRepositoryName(getValue(entity.getCollectedfromList()))
155
				.setUrl(Iterables.getFirst(mapInstances(entity.getResult().getInstanceList()), new Instance()).getUrl())
156
				.setId(getValue(entity.getOriginalIdList()));
157
	}
158

    
159
	private Float trust(final Float trust, final Oaf oaf) {
160
		final Float provenanceTrust = Float.valueOf(oaf.getDataInfo().getTrust());
161
		return trust != null ? trust * provenanceTrust : provenanceTrust;
162
	}
163

    
164
}
(5-5/9)