Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Set;
6

    
7
import com.google.common.base.Predicate;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.broker.objects.Instance;
13
import eu.dnetlib.broker.objects.OpenAireEventPayload;
14
import eu.dnetlib.broker.objects.Project;
15
import eu.dnetlib.broker.objects.Provenance;
16
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
17
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
18
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.ProtoMapping;
19
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
20
import eu.dnetlib.data.mapreduce.util.DedupUtils;
21
import eu.dnetlib.data.proto.OafProtos.Oaf;
22
import eu.dnetlib.data.proto.OafProtos.OafEntity;
23
import org.apache.commons.lang.StringUtils;
24
import org.apache.hadoop.io.Text;
25
import org.apache.hadoop.mapreduce.Reducer.Context;
26
import org.dom4j.DocumentException;
27

    
28
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
29
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue;
30

    
31
@SuppressWarnings("rawtypes")
32
public class ProjectEventFactory extends ProtoMapping {
33

    
34
	private static String COUNTER_GROUP = "Broker Enrichment projects";
35

    
36
	private Set<String> inferenceProvenance = Sets.newHashSet("sysimport:mining:repository", "iis");
37
	private Set<String> aggregationProvenance = Sets.newHashSet("sysimport:crosswalk:repository");
38

    
39
	protected Text tKey = new Text("");
40

    
41
	public static void process(final Context context, final Oaf current, final Oaf other, final Float trust)
42
			throws IOException, InterruptedException, DocumentException {
43

    
44
		/*
45
		if (!current.getRel().hasCachedOafTarget() || (other != null && !other.getRel().hasCachedOafTarget())) {
46
			context.getCounter(COUNTER_GROUP, "events skipped: missing project 2nd step").increment(1);
47
			return;
48
		}
49
		*/
50

    
51
		new ProjectEventFactory().processProject(context, current, other, trust);
52
	}
53

    
54
	public static void process(final Context context, final Oaf current)
55
			throws IOException, InterruptedException, DocumentException {
56
		process(context, current, null, null);
57
	}
58

    
59
	private void processProject(final Context context, final Oaf current, final Oaf other, final Float trust)
60
			throws IOException, InterruptedException, DocumentException {
61

    
62
		if (other == null) {
63
			for (final Oaf oafRel : current.getEntity().getCachedOafRelList()) {
64

    
65
				final String provenance = oafRel.getDataInfo().getProvenanceaction().getClassid();
66
				if (inferenceProvenance.contains(provenance)) {
67
					final OafEntity project = oafRel.getRel().getCachedOafTarget().getEntity();
68
					doProcessProject(context, current, current, project, provenance, Topic.ENRICH_MISSING_PROJECT, trust(trust, oafRel));
69
				}
70
			}
71
		} else {
72
			for (final Oaf currentOafRel : current.getEntity().getCachedOafRelList()) {
73
				for (final Oaf otherOafRel : other.getEntity().getCachedOafRelList()) {
74

    
75
					final String currentTarget = currentOafRel.getRel().getTarget();
76
					final String otherTarget = otherOafRel.getRel().getTarget();
77

    
78
					if (!currentTarget.equals(otherTarget) && !DedupUtils.isRoot(current.getEntity().getId())) {
79

    
80
						final String provenance = otherOafRel.getDataInfo().getProvenanceaction().getClassid();
81

    
82
						final OafEntity project = otherOafRel.getRel().getCachedOafTarget().getEntity();
83

    
84
						boolean currentHasProject = Iterables.tryFind(current.getEntity().getCachedOafRelList(), new Predicate<Oaf>() {
85
							@Override
86
							public boolean apply(final Oaf oaf) {
87
								final String currentProjectId = oaf.getRel().getCachedOafTarget().getEntity().getId();
88
								//System.out.println(String.format("%s = %s ? %s", currentProjectId, project.getId(), currentProjectId.equals(project.getId())));
89
								return currentProjectId.equals(project.getId());
90
							}
91
						}).isPresent();
92

    
93
						if (!currentHasProject) {
94
							//System.out.println(String.format("generating event for other = %s\n\nproject = %s", other, project));
95
							doProcessProject(context, current, other, project, provenance, Topic.ENRICH_MISSING_PROJECT, trust(trust, currentOafRel));
96
						}
97
					}
98
				}
99
			}
100
		}
101
	}
102

    
103
	private void doProcessProject(final Context context,
104
			final Oaf current,
105
			final Oaf other,
106
			final OafEntity project,
107
			final String provenance,
108
			final Topic topic,
109
			final Float trust)
110
			throws IOException, InterruptedException, DocumentException {
111

    
112
		final OafEntity currentEntity = current.getEntity();
113
		final OafEntity otherEntity = other.getEntity();
114

    
115
		//System.out.println("ProjectEventFactory.doProcessProject = " + entity);
116

    
117
		final Provenance prov = getProvenance(otherEntity, provenance);
118

    
119
		final OpenAireEventPayload payload = addProject(OpenAireEventPayloadFactory.fromOAF(currentEntity, trust, prov), project);
120

    
121
		final EventMessage event = asEvent(currentEntity, topic, payload, otherEntity, trust);
122
		event.setPayload(HighlightFactory.highlightEnrichProject(payload, project, provenance).toJSON());
123

    
124
		context.write(tKey, new Text(event.toString()));
125
		context.getCounter(COUNTER_GROUP, "Event:  " + topic.getValue()).increment(1);
126
	}
127

    
128
	private OpenAireEventPayload addProject(final OpenAireEventPayload payload, final OafEntity project) {
129
		final Map<String, Project> projects = Maps.newHashMap();
130
		for(Project prj : payload.getPublication().getProjects()) {
131
			projects.put(prj.getCode() + prj.getTitle(), prj);
132
		}
133
		final Project hlProject = mapRelatedProject(project.getProject());
134
		projects.put(hlProject.getCode() + hlProject.getTitle(), hlProject);
135

    
136
		payload.getPublication().setProjects(Lists.newArrayList(projects.values()));
137

    
138
		return payload;
139
	}
140

    
141
	private Provenance getProvenance(final OafEntity entity, final String provenance) {
142
		if (inferenceProvenance.contains(provenance)) {
143
			return new Provenance()
144
					.setRepositoryName("OpenAIRE")
145
					.setUrl("https://beta.openaire.eu/search/publication?articleId=" + StringUtils.substringAfter(entity.getId(), "|"))
146
					.setId(Iterables.getFirst(entity.getOriginalIdList(), ""));
147
		}
148
		return new Provenance()
149
				.setRepositoryName(getValue(entity.getCollectedfromList()))
150
				.setUrl(Iterables.getFirst(mapInstances(entity.getResult().getInstanceList()), new Instance()).getUrl())
151
				.setId(getValue(entity.getOriginalIdList()));
152
	}
153

    
154
	private Float trust(final Float trust, final Oaf oaf) {
155
		final Float provenanceTrust = Float.valueOf(oaf.getDataInfo().getTrust());
156
		return trust != null ? trust * provenanceTrust : provenanceTrust;
157
	}
158

    
159
}
(10-10/13)