Revision 48145
Added by Claudio Atzori almost 7 years ago
ProjectEventFactory.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.broker; |
2 | 2 |
|
3 |
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent; |
|
4 |
|
|
5 | 3 |
import java.io.IOException; |
4 |
import java.util.Map; |
|
5 |
import java.util.Set; |
|
6 | 6 |
|
7 |
import org.apache.hadoop.io.Text; |
|
8 |
import org.apache.hadoop.mapreduce.Reducer.Context; |
|
9 |
import org.dom4j.DocumentException; |
|
10 |
|
|
7 |
import com.google.common.base.Predicate; |
|
11 | 8 |
import com.google.common.collect.Iterables; |
12 |
|
|
9 |
import com.google.common.collect.Lists; |
|
10 |
import com.google.common.collect.Maps; |
|
11 |
import com.google.common.collect.Sets; |
|
12 |
import eu.dnetlib.broker.objects.Instance; |
|
13 | 13 |
import eu.dnetlib.broker.objects.OpenAireEventPayload; |
14 |
import eu.dnetlib.broker.objects.Project; |
|
15 |
import eu.dnetlib.broker.objects.Provenance; |
|
14 | 16 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory; |
15 | 17 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory; |
18 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.ProtoMapping; |
|
16 | 19 |
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage; |
20 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
17 | 21 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
18 | 22 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
19 |
import eu.dnetlib.data.proto.OafProtos.OafRel; |
|
23 |
import org.apache.commons.lang.StringUtils; |
|
24 |
import org.apache.hadoop.io.Text; |
|
25 |
import org.apache.hadoop.mapreduce.Reducer.Context; |
|
26 |
import org.dom4j.DocumentException; |
|
20 | 27 |
|
28 |
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent; |
|
29 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue; |
|
30 |
|
|
21 | 31 |
@SuppressWarnings("rawtypes") |
22 |
public class ProjectEventFactory { |
|
32 |
public class ProjectEventFactory extends ProtoMapping {
|
|
23 | 33 |
|
34 |
private static String COUNTER_GROUP = "Broker Enrichment projects"; |
|
35 |
|
|
36 |
private Set<String> inferenceProvenance = Sets.newHashSet("sysimport:mining:repository", "iis"); |
|
37 |
private Set<String> aggregationProvenance = Sets.newHashSet("sysimport:crosswalk:repository"); |
|
38 |
|
|
24 | 39 |
protected Text tKey = new Text(""); |
25 | 40 |
|
26 |
public static void process(final Context context, final Oaf current, final Oaf other, final float trust)
|
|
41 |
public static void process(final Context context, final Oaf current, final Oaf other, final Float trust)
|
|
27 | 42 |
throws IOException, InterruptedException, DocumentException { |
43 |
|
|
44 |
/* |
|
45 |
if (!current.getRel().hasCachedOafTarget() || (other != null && !other.getRel().hasCachedOafTarget())) { |
|
46 |
context.getCounter(COUNTER_GROUP, "events skipped: missing project 2nd step").increment(1); |
|
47 |
return; |
|
48 |
} |
|
49 |
*/ |
|
50 |
|
|
28 | 51 |
new ProjectEventFactory().processProject(context, current, other, trust); |
29 | 52 |
} |
30 | 53 |
|
31 |
private void processProject(final Context context, final Oaf current, final Oaf other, final float trust)
|
|
54 |
public static void process(final Context context, final Oaf current)
|
|
32 | 55 |
throws IOException, InterruptedException, DocumentException { |
56 |
process(context, current, null, null); |
|
57 |
} |
|
33 | 58 |
|
59 |
private void processProject(final Context context, final Oaf current, final Oaf other, final Float trust) |
|
60 |
throws IOException, InterruptedException, DocumentException { |
|
61 |
|
|
34 | 62 |
if (other == null) { |
35 |
for (final OafRel rel : current.getEntity().getCachedRelList()) { |
|
36 |
doProcessProject(context, current, rel.getCachedTarget(), getProvenance(rel), Topic.ENRICH_MISSING_PROJECT, trust); |
|
63 |
for (final Oaf oafRel : current.getEntity().getCachedOafRelList()) { |
|
64 |
|
|
65 |
final String provenance = oafRel.getDataInfo().getProvenanceaction().getClassid(); |
|
66 |
if (inferenceProvenance.contains(provenance)) { |
|
67 |
final OafEntity project = oafRel.getRel().getCachedOafTarget().getEntity(); |
|
68 |
doProcessProject(context, current, current, project, provenance, Topic.ENRICH_MISSING_PROJECT, trust(trust, oafRel)); |
|
69 |
} |
|
37 | 70 |
} |
38 | 71 |
} else { |
39 |
for (final OafRel currentRel : current.getEntity().getCachedRelList()) { |
|
40 |
for (final OafRel otherRel : other.getEntity().getCachedRelList()) { |
|
41 |
if (!currentRel.getTarget().equals(otherRel.getTarget())) { |
|
42 |
doProcessProject(context, current, other.getEntity(), getProvenance(otherRel), Topic.ENRICH_MISSING_PROJECT, trust); |
|
72 |
for (final Oaf currentOafRel : current.getEntity().getCachedOafRelList()) { |
|
73 |
for (final Oaf otherOafRel : other.getEntity().getCachedOafRelList()) { |
|
74 |
|
|
75 |
final String currentTarget = currentOafRel.getRel().getTarget(); |
|
76 |
final String otherTarget = otherOafRel.getRel().getTarget(); |
|
77 |
|
|
78 |
if (!currentTarget.equals(otherTarget) && !DedupUtils.isRoot(current.getEntity().getId())) { |
|
79 |
|
|
80 |
final String provenance = otherOafRel.getDataInfo().getProvenanceaction().getClassid(); |
|
81 |
|
|
82 |
final OafEntity project = otherOafRel.getRel().getCachedOafTarget().getEntity(); |
|
83 |
|
|
84 |
boolean currentHasProject = Iterables.tryFind(current.getEntity().getCachedOafRelList(), new Predicate<Oaf>() { |
|
85 |
@Override |
|
86 |
public boolean apply(final Oaf oaf) { |
|
87 |
final String currentProjectId = oaf.getRel().getCachedOafTarget().getEntity().getId(); |
|
88 |
//System.out.println(String.format("%s = %s ? %s", currentProjectId, project.getId(), currentProjectId.equals(project.getId()))); |
|
89 |
return currentProjectId.equals(project.getId()); |
|
90 |
} |
|
91 |
}).isPresent(); |
|
92 |
|
|
93 |
if (!currentHasProject) { |
|
94 |
//System.out.println(String.format("generating event for other = %s\n\nproject = %s", other, project)); |
|
95 |
doProcessProject(context, current, other, project, provenance, Topic.ENRICH_MISSING_PROJECT, trust(trust, currentOafRel)); |
|
96 |
} |
|
43 | 97 |
} |
44 |
|
|
45 | 98 |
} |
46 | 99 |
} |
47 | 100 |
} |
48 | 101 |
} |
49 | 102 |
|
50 |
private String getProvenance(final OafRel rel) { |
|
51 |
return Iterables.getOnlyElement(rel.getCollectedfromList()).getKey(); |
|
52 |
} |
|
53 |
|
|
54 | 103 |
private void doProcessProject(final Context context, |
55 | 104 |
final Oaf current, |
105 |
final Oaf other, |
|
56 | 106 |
final OafEntity project, |
57 | 107 |
final String provenance, |
58 | 108 |
final Topic topic, |
59 |
final float trust)
|
|
109 |
final Float trust)
|
|
60 | 110 |
throws IOException, InterruptedException, DocumentException { |
61 | 111 |
|
62 |
// final OafEntity oaf, final Topic topic, final OpenAireEventPayload payload, final OafEntity source, final float trust |
|
112 |
final OafEntity currentEntity = current.getEntity(); |
|
113 |
final OafEntity otherEntity = other.getEntity(); |
|
63 | 114 |
|
64 |
final OpenAireEventPayload payload = OpenAireEventPayloadFactory.fromOAF(current.getEntity(), current.getEntity(), trust); |
|
65 |
final EventMessage event = asEvent(current.getEntity(), topic, payload, current.getEntity(), trust); |
|
115 |
//System.out.println("ProjectEventFactory.doProcessProject = " + entity); |
|
66 | 116 |
|
117 |
final Provenance prov = getProvenance(otherEntity, provenance); |
|
118 |
|
|
119 |
final OpenAireEventPayload payload = addProject(OpenAireEventPayloadFactory.fromOAF(currentEntity, trust, prov), project); |
|
120 |
|
|
121 |
final EventMessage event = asEvent(currentEntity, topic, payload, otherEntity, trust); |
|
67 | 122 |
event.setPayload(HighlightFactory.highlightEnrichProject(payload, project, provenance).toJSON()); |
68 | 123 |
|
69 | 124 |
context.write(tKey, new Text(event.toString())); |
70 |
context.getCounter("event", topic.getValue()).increment(1);
|
|
125 |
context.getCounter(COUNTER_GROUP, "Event: " + topic.getValue()).increment(1);
|
|
71 | 126 |
} |
72 | 127 |
|
128 |
private OpenAireEventPayload addProject(final OpenAireEventPayload payload, final OafEntity project) { |
|
129 |
final Map<String, Project> projects = Maps.newHashMap(); |
|
130 |
for(Project prj : payload.getPublication().getProjects()) { |
|
131 |
projects.put(prj.getCode() + prj.getTitle(), prj); |
|
132 |
} |
|
133 |
final Project hlProject = mapRelatedProject(project.getProject()); |
|
134 |
projects.put(hlProject.getCode() + hlProject.getTitle(), hlProject); |
|
135 |
|
|
136 |
payload.getPublication().setProjects(Lists.newArrayList(projects.values())); |
|
137 |
|
|
138 |
return payload; |
|
139 |
} |
|
140 |
|
|
141 |
private Provenance getProvenance(final OafEntity entity, final String provenance) { |
|
142 |
if (inferenceProvenance.contains(provenance)) { |
|
143 |
return new Provenance() |
|
144 |
.setRepositoryName("OpenAIRE") |
|
145 |
.setUrl("https://beta.openaire.eu/search/publication?articleId=" + StringUtils.substringAfter(entity.getId(), "|")) |
|
146 |
.setId(Iterables.getFirst(entity.getOriginalIdList(), "")); |
|
147 |
} |
|
148 |
return new Provenance() |
|
149 |
.setRepositoryName(getValue(entity.getCollectedfromList())) |
|
150 |
.setUrl(Iterables.getFirst(mapInstances(entity.getResult().getInstanceList()), new Instance()).getUrl()) |
|
151 |
.setId(getValue(entity.getOriginalIdList())); |
|
152 |
} |
|
153 |
|
|
154 |
private Float trust(final Float trust, final Oaf oaf) { |
|
155 |
final Float provenanceTrust = Float.valueOf(oaf.getDataInfo().getTrust()); |
|
156 |
return trust != null ? trust * provenanceTrust : provenanceTrust; |
|
157 |
} |
|
158 |
|
|
73 | 159 |
} |
Also available in: Unified diff
integrated latest changes from dnet40