Project

General

Profile

« Previous | Next » 

Revision 48145

integrated latest changes from dnet40

View differences:

ProjectEventFactory.java
1 1
package eu.dnetlib.data.mapreduce.hbase.broker;
2 2

  
3
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
4

  
5 3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Set;
6 6

  
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Reducer.Context;
9
import org.dom4j.DocumentException;
10

  
7
import com.google.common.base.Predicate;
11 8
import com.google.common.collect.Iterables;
12

  
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.broker.objects.Instance;
13 13
import eu.dnetlib.broker.objects.OpenAireEventPayload;
14
import eu.dnetlib.broker.objects.Project;
15
import eu.dnetlib.broker.objects.Provenance;
14 16
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
15 17
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
18
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.ProtoMapping;
16 19
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
20
import eu.dnetlib.data.mapreduce.util.DedupUtils;
17 21
import eu.dnetlib.data.proto.OafProtos.Oaf;
18 22
import eu.dnetlib.data.proto.OafProtos.OafEntity;
19
import eu.dnetlib.data.proto.OafProtos.OafRel;
23
import org.apache.commons.lang.StringUtils;
24
import org.apache.hadoop.io.Text;
25
import org.apache.hadoop.mapreduce.Reducer.Context;
26
import org.dom4j.DocumentException;
20 27

  
28
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
29
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue;
30

  
21 31
@SuppressWarnings("rawtypes")
22
public class ProjectEventFactory {
32
public class ProjectEventFactory extends ProtoMapping {
23 33

  
34
	private static String COUNTER_GROUP = "Broker Enrichment projects";
35

  
36
	private Set<String> inferenceProvenance = Sets.newHashSet("sysimport:mining:repository", "iis");
37
	private Set<String> aggregationProvenance = Sets.newHashSet("sysimport:crosswalk:repository");
38

  
24 39
	protected Text tKey = new Text("");
25 40

  
26
	public static void process(final Context context, final Oaf current, final Oaf other, final float trust)
41
	public static void process(final Context context, final Oaf current, final Oaf other, final Float trust)
27 42
			throws IOException, InterruptedException, DocumentException {
43

  
44
		/*
45
		if (!current.getRel().hasCachedOafTarget() || (other != null && !other.getRel().hasCachedOafTarget())) {
46
			context.getCounter(COUNTER_GROUP, "events skipped: missing project 2nd step").increment(1);
47
			return;
48
		}
49
		*/
50

  
28 51
		new ProjectEventFactory().processProject(context, current, other, trust);
29 52
	}
30 53

  
31
	private void processProject(final Context context, final Oaf current, final Oaf other, final float trust)
54
	public static void process(final Context context, final Oaf current)
32 55
			throws IOException, InterruptedException, DocumentException {
56
		process(context, current, null, null);
57
	}
33 58

  
59
	private void processProject(final Context context, final Oaf current, final Oaf other, final Float trust)
60
			throws IOException, InterruptedException, DocumentException {
61

  
34 62
		if (other == null) {
35
			for (final OafRel rel : current.getEntity().getCachedRelList()) {
36
				doProcessProject(context, current, rel.getCachedTarget(), getProvenance(rel), Topic.ENRICH_MISSING_PROJECT, trust);
63
			for (final Oaf oafRel : current.getEntity().getCachedOafRelList()) {
64

  
65
				final String provenance = oafRel.getDataInfo().getProvenanceaction().getClassid();
66
				if (inferenceProvenance.contains(provenance)) {
67
					final OafEntity project = oafRel.getRel().getCachedOafTarget().getEntity();
68
					doProcessProject(context, current, current, project, provenance, Topic.ENRICH_MISSING_PROJECT, trust(trust, oafRel));
69
				}
37 70
			}
38 71
		} else {
39
			for (final OafRel currentRel : current.getEntity().getCachedRelList()) {
40
				for (final OafRel otherRel : other.getEntity().getCachedRelList()) {
41
					if (!currentRel.getTarget().equals(otherRel.getTarget())) {
42
						doProcessProject(context, current, other.getEntity(), getProvenance(otherRel), Topic.ENRICH_MISSING_PROJECT, trust);
72
			for (final Oaf currentOafRel : current.getEntity().getCachedOafRelList()) {
73
				for (final Oaf otherOafRel : other.getEntity().getCachedOafRelList()) {
74

  
75
					final String currentTarget = currentOafRel.getRel().getTarget();
76
					final String otherTarget = otherOafRel.getRel().getTarget();
77

  
78
					if (!currentTarget.equals(otherTarget) && !DedupUtils.isRoot(current.getEntity().getId())) {
79

  
80
						final String provenance = otherOafRel.getDataInfo().getProvenanceaction().getClassid();
81

  
82
						final OafEntity project = otherOafRel.getRel().getCachedOafTarget().getEntity();
83

  
84
						boolean currentHasProject = Iterables.tryFind(current.getEntity().getCachedOafRelList(), new Predicate<Oaf>() {
85
							@Override
86
							public boolean apply(final Oaf oaf) {
87
								final String currentProjectId = oaf.getRel().getCachedOafTarget().getEntity().getId();
88
								//System.out.println(String.format("%s = %s ? %s", currentProjectId, project.getId(), currentProjectId.equals(project.getId())));
89
								return currentProjectId.equals(project.getId());
90
							}
91
						}).isPresent();
92

  
93
						if (!currentHasProject) {
94
							//System.out.println(String.format("generating event for other = %s\n\nproject = %s", other, project));
95
							doProcessProject(context, current, other, project, provenance, Topic.ENRICH_MISSING_PROJECT, trust(trust, currentOafRel));
96
						}
43 97
					}
44

  
45 98
				}
46 99
			}
47 100
		}
48 101
	}
49 102

  
50
	private String getProvenance(final OafRel rel) {
51
		return Iterables.getOnlyElement(rel.getCollectedfromList()).getKey();
52
	}
53

  
54 103
	private void doProcessProject(final Context context,
55 104
			final Oaf current,
105
			final Oaf other,
56 106
			final OafEntity project,
57 107
			final String provenance,
58 108
			final Topic topic,
59
			final float trust)
109
			final Float trust)
60 110
			throws IOException, InterruptedException, DocumentException {
61 111

  
62
		// final OafEntity oaf, final Topic topic, final OpenAireEventPayload payload, final OafEntity source, final float trust
112
		final OafEntity currentEntity = current.getEntity();
113
		final OafEntity otherEntity = other.getEntity();
63 114

  
64
		final OpenAireEventPayload payload = OpenAireEventPayloadFactory.fromOAF(current.getEntity(), current.getEntity(), trust);
65
		final EventMessage event = asEvent(current.getEntity(), topic, payload, current.getEntity(), trust);
115
		//System.out.println("ProjectEventFactory.doProcessProject = " + entity);
66 116

  
117
		final Provenance prov = getProvenance(otherEntity, provenance);
118

  
119
		final OpenAireEventPayload payload = addProject(OpenAireEventPayloadFactory.fromOAF(currentEntity, trust, prov), project);
120

  
121
		final EventMessage event = asEvent(currentEntity, topic, payload, otherEntity, trust);
67 122
		event.setPayload(HighlightFactory.highlightEnrichProject(payload, project, provenance).toJSON());
68 123

  
69 124
		context.write(tKey, new Text(event.toString()));
70
		context.getCounter("event", topic.getValue()).increment(1);
125
		context.getCounter(COUNTER_GROUP, "Event:  " + topic.getValue()).increment(1);
71 126
	}
72 127

  
128
	private OpenAireEventPayload addProject(final OpenAireEventPayload payload, final OafEntity project) {
129
		final Map<String, Project> projects = Maps.newHashMap();
130
		for(Project prj : payload.getPublication().getProjects()) {
131
			projects.put(prj.getCode() + prj.getTitle(), prj);
132
		}
133
		final Project hlProject = mapRelatedProject(project.getProject());
134
		projects.put(hlProject.getCode() + hlProject.getTitle(), hlProject);
135

  
136
		payload.getPublication().setProjects(Lists.newArrayList(projects.values()));
137

  
138
		return payload;
139
	}
140

  
141
	private Provenance getProvenance(final OafEntity entity, final String provenance) {
142
		if (inferenceProvenance.contains(provenance)) {
143
			return new Provenance()
144
					.setRepositoryName("OpenAIRE")
145
					.setUrl("https://beta.openaire.eu/search/publication?articleId=" + StringUtils.substringAfter(entity.getId(), "|"))
146
					.setId(Iterables.getFirst(entity.getOriginalIdList(), ""));
147
		}
148
		return new Provenance()
149
				.setRepositoryName(getValue(entity.getCollectedfromList()))
150
				.setUrl(Iterables.getFirst(mapInstances(entity.getResult().getInstanceList()), new Instance()).getUrl())
151
				.setId(getValue(entity.getOriginalIdList()));
152
	}
153

  
154
	private Float trust(final Float trust, final Oaf oaf) {
155
		final Float provenanceTrust = Float.valueOf(oaf.getDataInfo().getTrust());
156
		return trust != null ? trust * provenanceTrust : provenanceTrust;
157
	}
158

  
73 159
}

Also available in: Unified diff