Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Objects;
7
import java.util.Set;
8
import java.util.stream.Collectors;
9

    
10
import com.google.common.collect.Iterables;
11
import com.google.common.collect.Lists;
12
import com.google.common.collect.Maps;
13
import com.google.common.collect.Sets;
14
import eu.dnetlib.broker.objects.Instance;
15
import eu.dnetlib.broker.objects.OpenAireEventPayload;
16
import eu.dnetlib.broker.objects.Project;
17
import eu.dnetlib.broker.objects.Provenance;
18
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
19
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
20
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.ProtoMapping;
21
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
22
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
23
import eu.dnetlib.data.mapreduce.util.DedupUtils;
24
import eu.dnetlib.data.proto.OafProtos.Oaf;
25
import eu.dnetlib.data.proto.OafProtos.OafEntity;
26
import eu.dnetlib.data.proto.TypeProtos;
27
import org.apache.commons.lang.StringUtils;
28
import org.apache.hadoop.io.Text;
29
import org.apache.hadoop.mapreduce.Reducer.Context;
30
import org.dom4j.DocumentException;
31

    
32
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
33
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue;
34

    
35
@SuppressWarnings("rawtypes")
36
public class ProjectEventFactory extends ProtoMapping {
37

    
38
	private static String COUNTER_GROUP = "Broker Enrichment projects";
39

    
40
	private final Set<String> inferenceProvenance = Sets.newHashSet("sysimport:mining:repository", "iis");
41
	private final Set<String> aggregationProvenance = Sets.newHashSet("sysimport:crosswalk:repository");
42

    
43
	protected Text tKey = new Text("");
44

    
45
	private Map<String, String> baseUrlMap;
46

    
47
	public ProjectEventFactory(Map<String, String> baseUrlMap) {
48
		this.baseUrlMap = baseUrlMap;
49
	}
50

    
51
	public static List<EventWrapper> process(final Context context, final Oaf current, final Oaf other, final Float trust, final Map<String, String> baseUrlMap)
52
			throws IOException, InterruptedException, DocumentException {
53

    
54
		/*
55
		 * if (!current.getRel().hasCachedOafTarget() || (other != null && !other.getRel().hasCachedOafTarget())) {
56
		 * context.getCounter(COUNTER_GROUP, "events skipped: missing project 2nd step").increment(1); return; }
57
		 */
58

    
59
		return new ProjectEventFactory(baseUrlMap).processProject(context, current, other, trust);
60
	}
61

    
62
	public static List<EventWrapper> process(final Context context, final Oaf current, final Map<String, String> baseUrlMap)
63
			throws IOException, InterruptedException, DocumentException {
64
		return process(context, current, null, null, baseUrlMap);
65
	}
66

    
67
	private List<EventWrapper> processProject(final Context context, final Oaf current, final Oaf other, final Float trust)
68
			throws IOException, InterruptedException, DocumentException {
69

    
70
		final List<EventWrapper> events = Lists.newArrayList();
71
		if (other == null) {
72
			for (final Oaf oafRel : current.getEntity().getCachedOafRelList()) {
73

    
74
				final String provenance = oafRel.getDataInfo().getProvenanceaction().getClassid();
75
				if (inferenceProvenance.contains(provenance)) {
76
					final OafEntity project = oafRel.getRel().getCachedOafTarget().getEntity();
77
					events.add(doProcessProject(context, current, current, project, provenance, Topic.ENRICH_MISSING_PROJECT, trust(trust, oafRel)));
78
				}
79
			}
80
		} else {
81
			for (final Oaf currentOafRel : current.getEntity().getCachedOafRelList()) {
82
				for (final Oaf otherOafRel : other.getEntity().getCachedOafRelList()) {
83

    
84
					final String currentTarget = currentOafRel.getRel().getTarget();
85
					final String otherTarget = otherOafRel.getRel().getTarget();
86

    
87
					if (!currentTarget.equals(otherTarget) && !DedupUtils.isRoot(current.getEntity().getId())) {
88

    
89
						final String provenance = otherOafRel.getDataInfo().getProvenanceaction().getClassid();
90

    
91
						final OafEntity project = otherOafRel.getRel().getCachedOafTarget().getEntity();
92

    
93
						final boolean currentHasProject = Iterables.tryFind(current.getEntity().getCachedOafRelList(), oaf -> {
94
							final String currentProjectId = oaf.getRel().getCachedOafTarget().getEntity().getId();
95
							// System.out.println(String.format("%s = %s ? %s", currentProjectId, project.getId(),
96
							// currentProjectId.equals(project.getId())));
97
							return currentProjectId.equals(project.getId());
98
						}).isPresent();
99

    
100
						if (!currentHasProject) {
101
							// System.out.println(String.format("generating event for other = %s\n\nproject = %s", other, project));
102
							events.add(
103
									doProcessProject(context, current, other, project, provenance, Topic.ENRICH_MISSING_PROJECT, trust(trust, currentOafRel)));
104
						}
105
					}
106
				}
107
			}
108
		}
109
		return events;
110
	}
111

    
112
	private EventWrapper doProcessProject(final Context context,
113
			final Oaf current,
114
			final Oaf other,
115
			final OafEntity project,
116
			final String provenance,
117
			final Topic topic,
118
			final Float trust)
119
			throws IOException, InterruptedException, DocumentException {
120

    
121
		final OafEntity currentEntity = current.getEntity();
122
		final OafEntity otherEntity = other.getEntity();
123

    
124
		// System.out.println("ProjectEventFactory.doProcessProject = " + entity);
125

    
126
		final Provenance prov = getProvenance(otherEntity, provenance);
127

    
128
		final OpenAireEventPayload payload = addProject(OpenAireEventPayloadFactory.fromOAF(currentEntity, trust, prov), project);
129

    
130
		final EventMessage event = asEvent(currentEntity, topic, payload, otherEntity, trust);
131
		event.setPayload(HighlightFactory.highlightEnrichProject(payload, project, provenance).toJSON());
132
		return EventWrapper.newInstance(event,
133
				payload.getHighlight().getProjects()
134
						.stream()
135
						.filter(Objects::nonNull)
136
						.map(p -> p.getFunder() + ":" + p.getCode())
137
						.sorted()
138
						.collect(Collectors.joining(", ")),
139
				topic.getValue());
140
	}
141

    
142
	private OpenAireEventPayload addProject(final OpenAireEventPayload payload, final OafEntity project) {
143
		final Map<String, Project> projects = Maps.newHashMap();
144
		for (final Project prj : payload.getPublication().getProjects()) {
145
			projects.put(prj.getCode() + prj.getTitle(), prj);
146
		}
147
		final Project hlProject = mapRelatedProject(project.getProject());
148
		projects.put(hlProject.getCode() + hlProject.getTitle(), hlProject);
149

    
150
		payload.getPublication().setProjects(Lists.newArrayList(projects.values()));
151

    
152
		return payload;
153
	}
154

    
155
	private Provenance getProvenance(final OafEntity entity, final String provenance) {
156
		if (inferenceProvenance.contains(provenance)) { return new Provenance()
157
				.setRepositoryName("OpenAIRE")
158
				.setUrl(getUrl(entity))
159
				.setId(Iterables.getFirst(entity.getOriginalIdList(), "")); }
160
		return new Provenance()
161
				.setRepositoryName(getValue(entity.getCollectedfromList()))
162
				.setUrl(Iterables.getFirst(mapInstances(entity.getResult().getInstanceList()), new Instance()).getUrl())
163
				.setId(getValue(entity.getOriginalIdList()));
164
	}
165

    
166
	private String getUrl(OafEntity entity) {
167
		if (entity.getType().equals(TypeProtos.Type.result)) {
168
			String resulttype = entity.getResult().getMetadata().getResulttype().getClassid();
169

    
170
			return String.format(baseUrlMap.get(resulttype), StringUtils.substringAfter(entity.getId(), "|"));
171

    
172
		}
173
		return "";
174
	}
175

    
176
	private Float trust(final Float trust, final Oaf oaf) {
177
		final Float provenanceTrust = Float.valueOf(oaf.getDataInfo().getTrust());
178
		return trust != null ? trust * provenanceTrust : provenanceTrust;
179
	}
180

    
181
}
(6-6/10)