Revision 54764
Added by Claudio Atzori over 5 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/PIDEventFactory.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.broker; |
2 | 2 |
|
3 |
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent; |
|
3 |
import com.google.common.collect.Lists; |
|
4 |
import eu.dnetlib.broker.objects.OpenAireEventPayload; |
|
5 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory; |
|
6 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory; |
|
7 |
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper; |
|
8 |
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty; |
|
9 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
10 |
import org.apache.commons.lang3.StringUtils; |
|
4 | 11 |
|
5 |
import java.io.IOException; |
|
6 | 12 |
import java.util.Collection; |
7 | 13 |
import java.util.List; |
8 | 14 |
import java.util.Objects; |
... | ... | |
10 | 16 |
import java.util.stream.Collectors; |
11 | 17 |
import java.util.stream.Stream; |
12 | 18 |
|
13 |
import org.apache.commons.lang3.StringUtils; |
|
14 |
import org.dom4j.DocumentException; |
|
19 |
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent; |
|
15 | 20 |
|
16 |
import com.google.common.collect.Lists; |
|
17 |
|
|
18 |
import eu.dnetlib.broker.objects.OpenAireEventPayload; |
|
19 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory; |
|
20 |
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory; |
|
21 |
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper; |
|
22 |
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty; |
|
23 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
24 |
|
|
25 | 21 |
/** |
26 | 22 |
* Created by claudio on 26/07/16. |
27 | 23 |
*/ |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/enrich/EnrichmentReducer.java | ||
---|---|---|
76 | 76 |
events.addAll(OAVersionEventFactory.process(current, other, trust, untrustedOaDsList)); |
77 | 77 |
events.addAll(AbstractEventFactory.process(current, other, trust)); |
78 | 78 |
events.addAll(PublicationDateEventFactory.process(current, other, trust)); |
79 |
events.addAll(OrcidEventFactory.process(current, other, trust)); |
|
79 | 80 |
} |
80 | 81 |
|
81 | 82 |
events.addAll(SubjectEventFactory.process(context, current, other, trust)); |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/enrich/SoftwareEnrichmentMapper.java | ||
---|---|---|
21 | 21 |
public class SoftwareEnrichmentMapper extends AbstractEnrichmentMapper { |
22 | 22 |
|
23 | 23 |
public static final String SOFTWARE = "software"; |
24 |
public static final String RESULT_RESULT_RELATIONSHIP_IS_RELATED_TO = "resultResult_relationship_isRelatedTo"; |
|
25 | 24 |
public static final String PUBLICATION = "publication"; |
26 | 25 |
|
26 |
public static final String RESULT_RESULT_RELATIONSHIP_IS_RELATED_TO = "resultResult_relationship_isRelatedTo"; |
|
27 |
|
|
27 | 28 |
@Override |
28 | 29 |
protected String counterGroup() { |
29 | 30 |
return "Broker Enrichment Software"; |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/mapping/HighlightFactory.java | ||
---|---|---|
5 | 5 |
|
6 | 6 |
import com.google.common.collect.Lists; |
7 | 7 |
import eu.dnetlib.broker.objects.OpenAireEventPayload; |
8 |
import eu.dnetlib.data.proto.FieldTypeProtos; |
|
8 | 9 |
import eu.dnetlib.data.proto.FieldTypeProtos.StringField; |
9 | 10 |
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty; |
10 | 11 |
import eu.dnetlib.data.proto.OafProtos.OafEntity; |
11 | 12 |
import eu.dnetlib.data.proto.ResultProtos.Result.Instance; |
12 | 13 |
|
13 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue; |
|
14 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.listValues; |
|
14 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.*; |
|
15 | 15 |
|
16 | 16 |
/** |
17 | 17 |
* Created by alessia on 21/07/16. |
... | ... | |
38 | 38 |
return p; |
39 | 39 |
} |
40 | 40 |
|
41 |
public static OpenAireEventPayload highlightEnrichOrcidAuthor(final OpenAireEventPayload p, final FieldTypeProtos.Author author) { |
|
42 |
p.getHighlight().getCreators().add(mapValue(author)); |
|
43 |
return p; |
|
44 |
} |
|
45 |
|
|
41 | 46 |
public static OpenAireEventPayload highlightEnrichSubject(final OpenAireEventPayload p, final List<StructuredProperty> subjects) { |
42 | 47 |
p.getHighlight().setSubjects(listValues(subjects)); |
43 | 48 |
return p; |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/Topic.java | ||
---|---|---|
18 | 18 |
ENRICH_MISSING_SUBJECT_DDC("ENRICH/MISSING/SUBJECT/DDC"), |
19 | 19 |
ENRICH_MISSING_SUBJECT_ACM("ENRICH/MISSING/SUBJECT/ACM"), |
20 | 20 |
ENRICH_MISSING_SUBJECT_RVK("ENRICH/MISSING/SUBJECT/RVK"), |
21 |
ENRICH_MISSING_AUTHOR_ORCID("ENRICH/MISSING/AUTHOR/ORCID"), |
|
21 | 22 |
|
22 | 23 |
// ENRICHMENT MORE |
23 | 24 |
ENRICH_MORE_PID("ENRICH/MORE/PID"), |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/OafHbaseUtils.java | ||
---|---|---|
78 | 78 |
return Lists.newArrayList(Iterables.transform(ts, t -> mapValue(t))); |
79 | 79 |
} |
80 | 80 |
|
81 |
public static <T> List<String> listObjects(Iterable<T> ts) { |
|
82 |
return Lists.newArrayList(Iterables.transform(ts, t -> mapObject(t))); |
|
83 |
} |
|
84 |
|
|
81 | 85 |
public static <T> String getKey(Iterable<T> ts) { |
82 | 86 |
return Iterables.getFirst(listKeys(ts), ""); |
83 | 87 |
} |
... | ... | |
98 | 102 |
throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass())); |
99 | 103 |
} |
100 | 104 |
|
101 |
private static <T> String mapValue(final T t) {
|
|
105 |
public static <T> String mapValue(final T t) {
|
|
102 | 106 |
if (t instanceof StructuredProperty) return ((StructuredProperty) t).getValue(); |
103 | 107 |
if (t instanceof KeyValue) return ((KeyValue) t).getValue(); |
104 | 108 |
if (t instanceof String) return (String) t; |
105 | 109 |
if (t instanceof StringField) return ((StringField) t).getValue(); |
106 | 110 |
if (t instanceof Qualifier) return ((Qualifier) t).getClassname(); |
107 |
if (t instanceof Author) return ((Author) t).getFullname(); |
|
111 |
if (t instanceof Author) { |
|
112 |
Author a = (Author) t; |
|
113 |
if (a.getPidCount() == 0) { |
|
114 |
return a.getFullname(); |
|
115 |
} else { |
|
116 |
return a.getFullname() + " " + listObjects(a.getPidList()); |
|
117 |
} |
|
118 |
} |
|
108 | 119 |
|
109 | 120 |
throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass())); |
110 | 121 |
} |
111 | 122 |
|
123 |
public static <T> String mapObject(final T t) { |
|
124 |
if (t instanceof KeyValue) { |
|
125 |
KeyValue kv = (KeyValue) t; |
|
126 |
return kv.getKey() + ":" + kv.getValue(); |
|
127 |
} |
|
128 |
if (t instanceof String) return (String) t; |
|
129 |
|
|
130 |
throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass())); |
|
131 |
} |
|
132 |
|
|
112 | 133 |
public static List<String> getPropertyValues(final Reducer.Context context, final String name) { |
113 | 134 |
return doGetPropertyValues(context.getConfiguration().get(name, "")); |
114 | 135 |
} |
modules/dnet-openaireplus-workflows/branches/solr75/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/broker/calculateEnrichmentEvents.meta.xml | ||
---|---|---|
15 | 15 |
<WORKFLOW id="a6901d21-c09d-43dc-8fc9-236c32069293_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="Calculate person distribution"> |
16 | 16 |
<WORKFLOW id="b9b6cd8a-8ee2-4a74-9344-f087eeac29ed_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="Calculate enrichment events"/> |
17 | 17 |
<WORKFLOW id="6472af0e-f2ed-4ee5-835f-de698d89a7a7_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="Calculate enrichment Project events"/> |
18 |
<WORKFLOW id="120a074a-8b1b-485e-9d1c-1d0466b056ac_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="Calculate enrichment Software links events"/> |
|
18 | 19 |
</WORKFLOW> |
19 | 20 |
</CONFIGURATION> |
20 | 21 |
<SCHEDULING enabled="false"> |
modules/dnet-openaireplus-workflows/branches/solr75/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/broker/calculateEnrichmentSoftwareLinksEvents.xml | ||
---|---|---|
1 |
<RESOURCE_PROFILE> |
|
2 |
<HEADER> |
|
3 |
<RESOURCE_IDENTIFIER value="120a074a-8b1b-485e-9d1c-1d0466b056ac_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/> |
|
4 |
<RESOURCE_TYPE value="WorkflowDSResourceType"/> |
|
5 |
<RESOURCE_KIND value="WorkflowDSResources"/> |
|
6 |
<RESOURCE_URI value=""/> |
|
7 |
<DATE_OF_CREATION value="2015-10-12T14:10:32+00:00"/> |
|
8 |
</HEADER> |
|
9 |
<BODY> |
|
10 |
<WORKFLOW_NAME>Calculate enrichment Software links events</WORKFLOW_NAME> |
|
11 |
<WORKFLOW_TYPE>Notification Broker</WORKFLOW_TYPE> |
|
12 |
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY> |
|
13 |
<CONFIGURATION start="manual"> |
|
14 |
|
|
15 |
<NODE name="hadoopConfig" type="SetClusterAndTable" isStart="true"> |
|
16 |
<DESCRIPTION>Set table name</DESCRIPTION> |
|
17 |
<PARAMETERS> |
|
18 |
<PARAM required="true" type="string" name="cluster" managedBy="system">DM</PARAM> |
|
19 |
<PARAM required="true" type="string" name="tableParam" managedBy="system">tableName</PARAM> |
|
20 |
<PARAM required="true" type="string" name="table" managedBy="user"/> |
|
21 |
</PARAMETERS> |
|
22 |
<ARCS> |
|
23 |
<ARC to="cleanupHDFS"/> |
|
24 |
</ARCS> |
|
25 |
</NODE> |
|
26 |
|
|
27 |
<NODE name="setDedupConfigs" type="SetDedupConfiguration" isStart="true"> |
|
28 |
<DESCRIPTION>Set Dedup conf</DESCRIPTION> |
|
29 |
<PARAMETERS> |
|
30 |
<PARAM function="obtainValues('dedupOrchestrations', {})" required="true" type="string" name="dedupConfigSequence" managedBy="user"></PARAM> |
|
31 |
<PARAM required="true" type="string" name="dedupConfigSequenceParam" managedBy="system">dedup.conf.queue</PARAM> |
|
32 |
</PARAMETERS> |
|
33 |
<ARCS> |
|
34 |
<ARC to="cleanupHDFS"/> |
|
35 |
</ARCS> |
|
36 |
</NODE> |
|
37 |
|
|
38 |
<NODE name="cleanupHDFS" type="DeleteHdfsPathJob" isJoin="true"> |
|
39 |
<DESCRIPTION>hdfs cleanup (xml)</DESCRIPTION> |
|
40 |
<PARAMETERS> |
|
41 |
<PARAM required="true" type="string" name="cluster" managedBy="system">DM</PARAM> |
|
42 |
<PARAM required="true" type="string" name="envParams" managedBy="system"> |
|
43 |
{ |
|
44 |
'path' : 'brokerEventsPath' |
|
45 |
} |
|
46 |
</PARAM> |
|
47 |
</PARAMETERS> |
|
48 |
<ARCS> |
|
49 |
<ARC to="enrichment"/> |
|
50 |
</ARCS> |
|
51 |
</NODE> |
|
52 |
|
|
53 |
|
|
54 |
<NODE name="enrichment" type="DedupConfigurationAwareJob"> |
|
55 |
<DESCRIPTION>Run M/R Job</DESCRIPTION> |
|
56 |
<PARAMETERS> |
|
57 |
<PARAM managedBy="system" name="cluster" required="true" type="string">DM</PARAM> |
|
58 |
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">brokerEnrichmentSoftwareLinksJob</PARAM> |
|
59 |
<PARAM required="true" type="string" name="dedupConfigSequenceParam" managedBy="system">dedup.conf.queue</PARAM> |
|
60 |
<PARAM managedBy="system" name="envParams" required="false" type="string"> |
|
61 |
{ |
|
62 |
'hbase.mapred.inputtable' : 'tableName', |
|
63 |
'hbase.mapreduce.inputtable' : 'tableName' |
|
64 |
} |
|
65 |
</PARAM> |
|
66 |
</PARAMETERS> |
|
67 |
<ARCS> |
|
68 |
<ARC to="success"/> |
|
69 |
</ARCS> |
|
70 |
</NODE> |
|
71 |
|
|
72 |
</CONFIGURATION> |
|
73 |
<STATUS/> |
|
74 |
</BODY> |
|
75 |
</RESOURCE_PROFILE> |
modules/dnet-openaireplus-profiles/trunk/src/main/resources/eu/dnetlib/test/profiles/HadoopJobConfigurationDSResources/HadoopJobConfigurationDSResourceType/brokerEnrichmentSoftwareLinksJob.xml | ||
---|---|---|
1 |
<RESOURCE_PROFILE> |
|
2 |
<HEADER> |
|
3 |
<RESOURCE_IDENTIFIER value="0631da9f-8a4f-4412-828b-37f7a0aa6d71_SGFkb29wSm9iQ29uZmlndXJhdGlvbkRTUmVzb3VyY2VzL0hhZG9vcEpvYkNvbmZpZ3VyYXRpb25EU1Jlc291cmNlVHlwZQ=="/> |
|
4 |
<RESOURCE_TYPE value="HadoopJobConfigurationDSResourceType"/> |
|
5 |
<RESOURCE_KIND value="HadoopJobConfigurationDSResources"/> |
|
6 |
<RESOURCE_URI value=""/> |
|
7 |
<DATE_OF_CREATION value="2001-12-31T12:00:00"/> |
|
8 |
</HEADER> |
|
9 |
<BODY> |
|
10 |
<HADOOP_JOB name="brokerEnrichmentSoftwareLinksJob" type="mapreduce"> |
|
11 |
<DESCRIPTION>map reduce job that calculates the enrichment events based on the publications dedup results</DESCRIPTION> |
|
12 |
|
|
13 |
<STATIC_CONFIGURATION><!-- I/O FORMAT --> |
|
14 |
<PROPERTY key="mapreduce.inputformat.class" value="org.apache.hadoop.hbase.mapreduce.TableInputFormat"/> |
|
15 |
<PROPERTY key="mapreduce.outputformat.class" value="org.elasticsearch.hadoop.mr.EsOutputFormat"/> |
|
16 |
|
|
17 |
<!-- MAPPER --> |
|
18 |
<PROPERTY key="mapreduce.map.class" value="eu.dnetlib.data.mapreduce.hbase.broker.enrich.SoftwareEnrichmentMapper"/> |
|
19 |
<PROPERTY key="mapred.mapoutput.key.class" value="org.apache.hadoop.hbase.io.ImmutableBytesWritable"/> |
|
20 |
<PROPERTY key="mapred.mapoutput.value.class" value="org.apache.hadoop.hbase.io.ImmutableBytesWritable"/> |
|
21 |
|
|
22 |
<!-- REDUCER --> |
|
23 |
<PROPERTY key="mapreduce.reduce.class" value="eu.dnetlib.data.mapreduce.hbase.broker.enrich.SoftwareEnrichmentReducer"/> |
|
24 |
<PROPERTY key="mapred.output.key.class" value="org.apache.hadoop.io.Text"/> |
|
25 |
<PROPERTY key="mapred.output.value.class" value="org.apache.hadoop.io.Text"/> |
|
26 |
|
|
27 |
<!-- MISC --> |
|
28 |
<PROPERTY key="mapred.reduce.tasks.speculative.execution" value="false"/> |
|
29 |
<PROPERTY key="mapred.map.tasks.speculative.execution" value="false"/> |
|
30 |
<PROPERTY key="mapreduce.map.speculative" value="false"/> |
|
31 |
<PROPERTY key="mapreduce.reduce.speculative" value="false"/> |
|
32 |
<PROPERTY key="dfs.blocksize" value="256M"/> |
|
33 |
<PROPERTY key="mapred.reduce.tasks" value="4"/> |
|
34 |
|
|
35 |
<PROPERTY key="broker.baseurl.publication" value="https://explore.openaire.eu/search/publication?articleId=%s"/> |
|
36 |
<PROPERTY key="broker.baseurl.dataset" value="https://explore.openaire.eu/search/dataset?datasetId=%s"/> |
|
37 |
<PROPERTY key="broker.baseurl.software" value="https://explore.openaire.eu/search/software?softwareId=%s"/> |
|
38 |
<PROPERTY key="broker.baseurl.other" value="https://explore.openaire.eu/search/other?orpId=%s"/> |
|
39 |
|
|
40 |
|
|
41 |
<!-- ES --> |
|
42 |
<PROPERTY key="es.nodes" value="ip-90-147-167-137.ct1.garrservices.it:9200,ip-90-147-167-126.ct1.garrservices.it:9200,ip-90-147-167-13.ct1.garrservices.it:9200,ip-90-147-167-125.ct1.garrservices.it:9200"/> |
|
43 |
<PROPERTY key="es.nodes.resolve.hostname" value="false"/> |
|
44 |
<PROPERTY key="es.batch.write.retry.count " value="10"/> |
|
45 |
<PROPERTY key="es.batch.size.entries " value="500"/> |
|
46 |
<PROPERTY key="es.nodes.wan.only" value="true"/> |
|
47 |
<PROPERTY key="es.resource" value="events_{infra}/event"/> |
|
48 |
<PROPERTY key="es.input.json" value="yes"/> |
|
49 |
<PROPERTY key="es.mapping.id" value="eventId"/> |
|
50 |
|
|
51 |
<!-- BROKER --> |
|
52 |
<PROPERTY key="broker.datasource.id.whitelist" value=""/> |
|
53 |
<PROPERTY key="broker.datasource.id.blacklist" value=""/> |
|
54 |
<PROPERTY key="broker.datasource.untrusted.oa.list" value="opendoar____::8b6dd7db9af49e67306feb59a8bdc52c"/> |
|
55 |
<PROPERTY key="broker.datasource.type.whitelist" value="pubsrepository::unknown,pubsrepository::institutional,pubsrepository::thematic"/> |
|
56 |
|
|
57 |
<!-- <PROPERTY key="user.name" value="dnet" /> --> |
|
58 |
<!-- Uncomment to override the default lib path --> |
|
59 |
<!-- <PROPERTY key="job.lib" value="/user/dnet/dnet-mapreduce-jobs-0.0.2-SNAPSHOT-jar-with-dependencies.jar"/> --> |
|
60 |
</STATIC_CONFIGURATION> |
|
61 |
<JOB_INTERFACE> |
|
62 |
<PARAM description="source hbase table" name="hbase.mapred.inputtable" required="true"/> |
|
63 |
<PARAM description="source hbase table" name="hbase.mapreduce.inputtable" required="true"/> |
|
64 |
</JOB_INTERFACE> |
|
65 |
<SCAN> |
|
66 |
<FILTERS operator="MUST_PASS_ONE"> |
|
67 |
<FILTER type="prefix" value="40" /> |
|
68 |
<FILTER type="prefix" value="50" /> |
|
69 |
</FILTERS> |
|
70 |
<FAMILIES> |
|
71 |
<FAMILY value="result" /> |
|
72 |
<FAMILY value="resultResult_relationship_isRelatedTo" /> |
|
73 |
</FAMILIES> |
|
74 |
</SCAN> |
|
75 |
</HADOOP_JOB> |
|
76 |
<STATUS> |
|
77 |
<LAST_SUBMISSION_DATE value="2001-12-31T12:00:00"/> |
|
78 |
<RUNNING_INSTANCES value="0"/> |
|
79 |
<CUMULATIVE_RUN value="0" /> |
|
80 |
</STATUS> |
|
81 |
<SECURITY_PARAMETERS>SECURITY_PARAMETERS</SECURITY_PARAMETERS> |
|
82 |
</BODY> |
|
83 |
</RESOURCE_PROFILE> |
Also available in: Unified diff
Implemented ORCID event generation process and relative configuration profile
Added workflow to orchestrate the event generation for software links