Project

General

Profile

« Previous | Next » 

Revision 54764

Implemented ORCID event generation process and relative configuration profile
Added workflow to orchestrate the event generation for software links

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/PIDEventFactory.java
1 1
package eu.dnetlib.data.mapreduce.hbase.broker;
2 2

  
3
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
3
import com.google.common.collect.Lists;
4
import eu.dnetlib.broker.objects.OpenAireEventPayload;
5
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
6
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
7
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
8
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
9
import eu.dnetlib.data.proto.OafProtos.Oaf;
10
import org.apache.commons.lang3.StringUtils;
4 11

  
5
import java.io.IOException;
6 12
import java.util.Collection;
7 13
import java.util.List;
8 14
import java.util.Objects;
......
10 16
import java.util.stream.Collectors;
11 17
import java.util.stream.Stream;
12 18

  
13
import org.apache.commons.lang3.StringUtils;
14
import org.dom4j.DocumentException;
19
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
15 20

  
16
import com.google.common.collect.Lists;
17

  
18
import eu.dnetlib.broker.objects.OpenAireEventPayload;
19
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
20
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
21
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
22
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
23
import eu.dnetlib.data.proto.OafProtos.Oaf;
24

  
25 21
/**
26 22
 * Created by claudio on 26/07/16.
27 23
 */
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/enrich/EnrichmentReducer.java
76 76
									events.addAll(OAVersionEventFactory.process(current, other, trust, untrustedOaDsList));
77 77
									events.addAll(AbstractEventFactory.process(current, other, trust));
78 78
									events.addAll(PublicationDateEventFactory.process(current, other, trust));
79
									events.addAll(OrcidEventFactory.process(current, other, trust));
79 80
								}
80 81

  
81 82
								events.addAll(SubjectEventFactory.process(context, current, other, trust));
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/enrich/SoftwareEnrichmentMapper.java
21 21
public class SoftwareEnrichmentMapper extends AbstractEnrichmentMapper {
22 22

  
23 23
	public static final String SOFTWARE = "software";
24
	public static final String RESULT_RESULT_RELATIONSHIP_IS_RELATED_TO = "resultResult_relationship_isRelatedTo";
25 24
	public static final String PUBLICATION = "publication";
26 25

  
26
	public static final String RESULT_RESULT_RELATIONSHIP_IS_RELATED_TO = "resultResult_relationship_isRelatedTo";
27

  
27 28
	@Override
28 29
	protected String counterGroup() {
29 30
		return "Broker Enrichment Software";
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/mapping/HighlightFactory.java
5 5

  
6 6
import com.google.common.collect.Lists;
7 7
import eu.dnetlib.broker.objects.OpenAireEventPayload;
8
import eu.dnetlib.data.proto.FieldTypeProtos;
8 9
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
9 10
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
10 11
import eu.dnetlib.data.proto.OafProtos.OafEntity;
11 12
import eu.dnetlib.data.proto.ResultProtos.Result.Instance;
12 13

  
13
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getValue;
14
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.listValues;
14
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.*;
15 15

  
16 16
/**
17 17
 * Created by alessia on 21/07/16.
......
38 38
		return p;
39 39
	}
40 40

  
41
	public static OpenAireEventPayload highlightEnrichOrcidAuthor(final OpenAireEventPayload p, final FieldTypeProtos.Author author) {
42
		p.getHighlight().getCreators().add(mapValue(author));
43
		return p;
44
	}
45

  
41 46
	public static OpenAireEventPayload highlightEnrichSubject(final OpenAireEventPayload p, final List<StructuredProperty> subjects) {
42 47
		p.getHighlight().setSubjects(listValues(subjects));
43 48
		return p;
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/Topic.java
18 18
	ENRICH_MISSING_SUBJECT_DDC("ENRICH/MISSING/SUBJECT/DDC"),
19 19
	ENRICH_MISSING_SUBJECT_ACM("ENRICH/MISSING/SUBJECT/ACM"),
20 20
	ENRICH_MISSING_SUBJECT_RVK("ENRICH/MISSING/SUBJECT/RVK"),
21
	ENRICH_MISSING_AUTHOR_ORCID("ENRICH/MISSING/AUTHOR/ORCID"),
21 22

  
22 23
	// ENRICHMENT MORE
23 24
	ENRICH_MORE_PID("ENRICH/MORE/PID"),
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/util/OafHbaseUtils.java
78 78
		return Lists.newArrayList(Iterables.transform(ts, t -> mapValue(t)));
79 79
	}
80 80

  
81
	public static <T> List<String> listObjects(Iterable<T> ts) {
82
		return Lists.newArrayList(Iterables.transform(ts, t -> mapObject(t)));
83
	}
84

  
81 85
	public static <T> String getKey(Iterable<T> ts) {
82 86
		return Iterables.getFirst(listKeys(ts), "");
83 87
	}
......
98 102
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
99 103
	}
100 104

  
101
	private static <T> String mapValue(final T t) {
105
	public static <T> String mapValue(final T t) {
102 106
		if (t instanceof StructuredProperty) return ((StructuredProperty) t).getValue();
103 107
		if (t instanceof KeyValue) return ((KeyValue) t).getValue();
104 108
		if (t instanceof String) return (String) t;
105 109
		if (t instanceof StringField) return ((StringField) t).getValue();
106 110
		if (t instanceof Qualifier) return ((Qualifier) t).getClassname();
107
		if (t instanceof Author) return ((Author) t).getFullname();
111
		if (t instanceof Author) {
112
			Author a = (Author) t;
113
			if (a.getPidCount() == 0) {
114
				return a.getFullname();
115
			} else {
116
				return a.getFullname() + " " + listObjects(a.getPidList());
117
			}
118
		}
108 119

  
109 120
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
110 121
	}
111 122

  
123
	public static <T> String mapObject(final T t) {
124
		if (t instanceof KeyValue) {
125
			KeyValue kv = (KeyValue) t;
126
			return kv.getKey() + ":" + kv.getValue();
127
		}
128
		if (t instanceof String) return (String) t;
129

  
130
		throw new IllegalArgumentException(String.format("type %s not mapped", t.getClass()));
131
	}
132

  
112 133
	public static List<String> getPropertyValues(final Reducer.Context context, final String name) {
113 134
		return doGetPropertyValues(context.getConfiguration().get(name, ""));
114 135
	}
modules/dnet-openaireplus-workflows/branches/solr75/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/broker/calculateEnrichmentEvents.meta.xml
15 15
			<WORKFLOW id="a6901d21-c09d-43dc-8fc9-236c32069293_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="Calculate person distribution">
16 16
				<WORKFLOW id="b9b6cd8a-8ee2-4a74-9344-f087eeac29ed_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="Calculate enrichment events"/>
17 17
				<WORKFLOW id="6472af0e-f2ed-4ee5-835f-de698d89a7a7_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="Calculate enrichment Project events"/>
18
				<WORKFLOW id="120a074a-8b1b-485e-9d1c-1d0466b056ac_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="Calculate enrichment Software links events"/>
18 19
			</WORKFLOW>
19 20
		</CONFIGURATION>
20 21
		<SCHEDULING enabled="false">
modules/dnet-openaireplus-workflows/branches/solr75/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/broker/calculateEnrichmentSoftwareLinksEvents.xml
1
<RESOURCE_PROFILE>
2
	<HEADER>
3
		<RESOURCE_IDENTIFIER value="120a074a-8b1b-485e-9d1c-1d0466b056ac_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
4
		<RESOURCE_TYPE value="WorkflowDSResourceType"/>
5
		<RESOURCE_KIND value="WorkflowDSResources"/>
6
		<RESOURCE_URI value=""/>
7
		<DATE_OF_CREATION value="2015-10-12T14:10:32+00:00"/>
8
	</HEADER>
9
	<BODY>
10
		<WORKFLOW_NAME>Calculate enrichment Software links events</WORKFLOW_NAME>
11
		<WORKFLOW_TYPE>Notification Broker</WORKFLOW_TYPE>
12
		<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
13
		<CONFIGURATION start="manual">
14

  
15
			<NODE name="hadoopConfig" type="SetClusterAndTable" isStart="true">
16
				<DESCRIPTION>Set table name</DESCRIPTION>
17
				<PARAMETERS>
18
					<PARAM required="true" type="string" name="cluster" managedBy="system">DM</PARAM>
19
					<PARAM required="true" type="string" name="tableParam" managedBy="system">tableName</PARAM>
20
					<PARAM required="true" type="string" name="table" managedBy="user"/>
21
				</PARAMETERS>
22
				<ARCS>
23
					<ARC to="cleanupHDFS"/>
24
				</ARCS>
25
			</NODE>
26

  
27
			<NODE name="setDedupConfigs" type="SetDedupConfiguration" isStart="true">
28
				<DESCRIPTION>Set Dedup conf</DESCRIPTION>
29
				<PARAMETERS>
30
					<PARAM function="obtainValues('dedupOrchestrations', {})" required="true" type="string" name="dedupConfigSequence" managedBy="user"></PARAM>
31
					<PARAM required="true" type="string" name="dedupConfigSequenceParam" managedBy="system">dedup.conf.queue</PARAM>
32
				</PARAMETERS>
33
				<ARCS>
34
					<ARC to="cleanupHDFS"/>
35
				</ARCS>
36
			</NODE>
37

  
38
			<NODE name="cleanupHDFS" type="DeleteHdfsPathJob" isJoin="true">
39
				<DESCRIPTION>hdfs cleanup (xml)</DESCRIPTION>
40
				<PARAMETERS>
41
					<PARAM required="true" type="string" name="cluster" managedBy="system">DM</PARAM>
42
					<PARAM required="true" type="string" name="envParams" managedBy="system">
43
						{
44
						'path' : 'brokerEventsPath'
45
						}
46
					</PARAM>
47
				</PARAMETERS>
48
				<ARCS>
49
					<ARC to="enrichment"/>
50
				</ARCS>
51
			</NODE>
52

  
53

  
54
			<NODE name="enrichment" type="DedupConfigurationAwareJob">
55
				<DESCRIPTION>Run M/R Job</DESCRIPTION>
56
				<PARAMETERS>
57
					<PARAM managedBy="system" name="cluster" required="true" type="string">DM</PARAM>
58
					<PARAM managedBy="system" name="hadoopJob" required="true" type="string">brokerEnrichmentSoftwareLinksJob</PARAM>
59
					<PARAM required="true" type="string" name="dedupConfigSequenceParam" managedBy="system">dedup.conf.queue</PARAM>
60
					<PARAM managedBy="system" name="envParams" required="false" type="string">
61
						{
62
						'hbase.mapred.inputtable' : 'tableName',
63
						'hbase.mapreduce.inputtable' : 'tableName'
64
						}
65
					</PARAM>
66
				</PARAMETERS>
67
				<ARCS>
68
					<ARC to="success"/>
69
				</ARCS>
70
			</NODE>
71

  
72
		</CONFIGURATION>
73
		<STATUS/>
74
	</BODY>
75
</RESOURCE_PROFILE>
modules/dnet-openaireplus-profiles/trunk/src/main/resources/eu/dnetlib/test/profiles/HadoopJobConfigurationDSResources/HadoopJobConfigurationDSResourceType/brokerEnrichmentSoftwareLinksJob.xml
1
<RESOURCE_PROFILE>
2
    <HEADER>
3
        <RESOURCE_IDENTIFIER value="0631da9f-8a4f-4412-828b-37f7a0aa6d71_SGFkb29wSm9iQ29uZmlndXJhdGlvbkRTUmVzb3VyY2VzL0hhZG9vcEpvYkNvbmZpZ3VyYXRpb25EU1Jlc291cmNlVHlwZQ=="/>
4
        <RESOURCE_TYPE value="HadoopJobConfigurationDSResourceType"/>
5
        <RESOURCE_KIND value="HadoopJobConfigurationDSResources"/>
6
        <RESOURCE_URI value=""/>
7
        <DATE_OF_CREATION value="2001-12-31T12:00:00"/>
8
    </HEADER>
9
    <BODY>
10
        <HADOOP_JOB name="brokerEnrichmentSoftwareLinksJob" type="mapreduce">
11
 			<DESCRIPTION>map reduce job that calculates the enrichment events based on the publications dedup results</DESCRIPTION>
12

  
13
	        <STATIC_CONFIGURATION><!-- I/O FORMAT -->
14
		        <PROPERTY key="mapreduce.inputformat.class" value="org.apache.hadoop.hbase.mapreduce.TableInputFormat"/>
15
		        <PROPERTY key="mapreduce.outputformat.class" value="org.elasticsearch.hadoop.mr.EsOutputFormat"/>
16

  
17
		        <!-- MAPPER -->
18
		        <PROPERTY key="mapreduce.map.class" value="eu.dnetlib.data.mapreduce.hbase.broker.enrich.SoftwareEnrichmentMapper"/>
19
		        <PROPERTY key="mapred.mapoutput.key.class" value="org.apache.hadoop.hbase.io.ImmutableBytesWritable"/>
20
		        <PROPERTY key="mapred.mapoutput.value.class" value="org.apache.hadoop.hbase.io.ImmutableBytesWritable"/>
21

  
22
		        <!-- REDUCER -->
23
		        <PROPERTY key="mapreduce.reduce.class" value="eu.dnetlib.data.mapreduce.hbase.broker.enrich.SoftwareEnrichmentReducer"/>
24
		        <PROPERTY key="mapred.output.key.class" value="org.apache.hadoop.io.Text"/>
25
		        <PROPERTY key="mapred.output.value.class" value="org.apache.hadoop.io.Text"/>
26

  
27
		        <!-- MISC -->
28
		        <PROPERTY key="mapred.reduce.tasks.speculative.execution" value="false"/>
29
		        <PROPERTY key="mapred.map.tasks.speculative.execution" value="false"/>
30
		        <PROPERTY key="mapreduce.map.speculative" value="false"/>
31
		        <PROPERTY key="mapreduce.reduce.speculative" value="false"/>
32
		        <PROPERTY key="dfs.blocksize" value="256M"/>
33
		        <PROPERTY key="mapred.reduce.tasks" value="4"/>
34

  
35
				<PROPERTY key="broker.baseurl.publication" value="https://explore.openaire.eu/search/publication?articleId=%s"/>
36
				<PROPERTY key="broker.baseurl.dataset" value="https://explore.openaire.eu/search/dataset?datasetId=%s"/>
37
				<PROPERTY key="broker.baseurl.software" value="https://explore.openaire.eu/search/software?softwareId=%s"/>
38
				<PROPERTY key="broker.baseurl.other" value="https://explore.openaire.eu/search/other?orpId=%s"/>
39

  
40

  
41
		        <!-- ES -->
42
		        <PROPERTY key="es.nodes" value="ip-90-147-167-137.ct1.garrservices.it:9200,ip-90-147-167-126.ct1.garrservices.it:9200,ip-90-147-167-13.ct1.garrservices.it:9200,ip-90-147-167-125.ct1.garrservices.it:9200"/>
43
		        <PROPERTY key="es.nodes.resolve.hostname" value="false"/>
44
		        <PROPERTY key="es.batch.write.retry.count " value="10"/>
45
		        <PROPERTY key="es.batch.size.entries " value="500"/>
46
		        <PROPERTY key="es.nodes.wan.only" value="true"/>
47
		        <PROPERTY key="es.resource" value="events_{infra}/event"/>
48
		        <PROPERTY key="es.input.json" value="yes"/>
49
		        <PROPERTY key="es.mapping.id" value="eventId"/>
50

  
51
		        <!-- BROKER -->
52
		        <PROPERTY key="broker.datasource.id.whitelist" value=""/>
53
		        <PROPERTY key="broker.datasource.id.blacklist" value=""/>
54
		        <PROPERTY key="broker.datasource.untrusted.oa.list" value="opendoar____::8b6dd7db9af49e67306feb59a8bdc52c"/>
55
		        <PROPERTY key="broker.datasource.type.whitelist" value="pubsrepository::unknown,pubsrepository::institutional,pubsrepository::thematic"/>
56

  
57
		        <!-- <PROPERTY key="user.name" value="dnet" /> -->
58
		        <!--  	Uncomment to override the default lib path -->
59
		        <!--	<PROPERTY key="job.lib" value="/user/dnet/dnet-mapreduce-jobs-0.0.2-SNAPSHOT-jar-with-dependencies.jar"/> -->
60
	        </STATIC_CONFIGURATION>
61
	        <JOB_INTERFACE>
62
		        <PARAM description="source hbase table" name="hbase.mapred.inputtable" required="true"/>
63
		        <PARAM description="source hbase table" name="hbase.mapreduce.inputtable" required="true"/>
64
	        </JOB_INTERFACE>
65
        	<SCAN>
66
        		<FILTERS operator="MUST_PASS_ONE">
67
			        <FILTER type="prefix" value="40" />
68
        			<FILTER type="prefix" value="50" />
69
        		</FILTERS>
70
        		<FAMILIES>
71
	       			<FAMILY value="result" />
72
        			<FAMILY value="resultResult_relationship_isRelatedTo" />
73
        		</FAMILIES>
74
        	</SCAN>
75
        </HADOOP_JOB>
76
        <STATUS>
77
            <LAST_SUBMISSION_DATE value="2001-12-31T12:00:00"/>
78
            <RUNNING_INSTANCES value="0"/>
79
            <CUMULATIVE_RUN value="0" />
80
        </STATUS>
81
        <SECURITY_PARAMETERS>SECURITY_PARAMETERS</SECURITY_PARAMETERS>
82
    </BODY>
83
</RESOURCE_PROFILE>

Also available in: Unified diff