Project

General

Profile

« Previous | Next » 

Revision 37751

added workflow to export the representative publications as json on hdfs

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/RootExportMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup.gt;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Counter;
12

  
13
import com.google.protobuf.InvalidProtocolBufferException;
14
import com.googlecode.protobuf.format.JsonFormat;
15

  
16
import eu.dnetlib.data.mapreduce.util.DedupUtils;
17
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
18
import eu.dnetlib.data.proto.KindProtos.Kind;
19
import eu.dnetlib.data.proto.OafProtos.Oaf;
20
import eu.dnetlib.data.proto.TypeProtos.Type;
21

  
22
public class RootExportMapper extends TableMapper<Text, Text> {
23

  
24
	private Text outKey;
25

  
26
	private Text outValue;
27

  
28
	@Override
29
	protected void setup(final Context context) throws IOException, InterruptedException {
30
		outKey = new Text("");
31
		outValue = new Text();
32
	}
33

  
34
	@Override
35
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
36

  
37
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
38

  
39
		final Type type = keyDecoder.getType();
40
		if (!Type.result.equals(type) || !DedupUtils.isRoot(keyDecoder.getId())) return;
41

  
42
		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString()));
43
		final byte[] bodyB = map.get(DedupUtils.BODY_B);
44
		if (bodyB == null) {
45
			incrementCounter(context, "missing body (map)", type.toString(), 1);
46
			return;
47
		}
48

  
49
		final Oaf oaf = decodeProto(context, bodyB);
50

  
51
		if (!isValid(oaf)) {
52
			incrementCounter(context, "missing body (map)", type.toString(), 1);
53
			return;
54
		}
55

  
56
		emit(new String(keyIn.copyBytes()), context, oaf);
57
		incrementCounter(context, Kind.entity.toString(), getEntityType(oaf, type), 1);
58
	}
59

  
60
	private boolean isValid(final Oaf oaf) {
61
		return (oaf != null) && oaf.isInitialized();
62
	}
63

  
64
	private Oaf decodeProto(final Context context, final byte[] body) {
65
		try {
66
			return Oaf.parseFrom(body);
67
		} catch (final InvalidProtocolBufferException e) {
68
			e.printStackTrace(System.err);
69
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
70
		}
71
		return null;
72
	}
73

  
74
	private void emit(final String key, final Context context, final Oaf oaf) throws IOException, InterruptedException {
75
		// outKey.set(key);
76
		outValue.set(JsonFormat.printToString(oaf));
77

  
78
		context.write(outKey, outValue);
79
	}
80

  
81
	private void incrementCounter(final Context context, final String k, final String t, final int n) {
82
		getCounter(context, k, t).increment(n);
83
	}
84

  
85
	private Counter getCounter(final Context context, final String k, final String t) {
86
		return context.getCounter(k, t);
87
	}
88

  
89
	private String getEntityType(final Oaf oaf, final Type type) {
90
		switch (type) {
91
		case result:
92
			return oaf.getEntity().getResult().getMetadata().getResulttype().getClassid();
93
		default:
94
			return type.toString();
95
		}
96
	}
97

  
98
}
modules/dnet-openaireplus-profiles/trunk/src/main/resources/eu/dnetlib/test/profiles/HadoopJobConfigurationDSResources/HadoopJobConfigurationDSResourceType/dedupRootsExportJob.xml
1
<RESOURCE_PROFILE>
2
    <HEADER>
3
        <RESOURCE_IDENTIFIER value="53f2a9b4-adf3-4ceb-9308-d88b53dc44c5_SGFkb29wSm9iQ29uZmlndXJhdGlvbkRTUmVzb3VyY2VzL0hhZG9vcEpvYkNvbmZpZ3VyYXRpb25EU1Jlc291cmNlVHlwZQ=="/>
4
        <RESOURCE_TYPE value="HadoopJobConfigurationDSResourceType"/>
5
        <RESOURCE_KIND value="HadoopJobConfigurationDSResources"/>
6
        <RESOURCE_URI value=""/>
7
        <DATE_OF_CREATION value="2001-12-31T12:00:00"/>
8
    </HEADER>
9
    <BODY>
10
        <HADOOP_JOB name="dedupRootsExportJob" type="mapreduce">
11
 			<DESCRIPTION>map only job that exports the representative publications as json</DESCRIPTION>
12
        	<STATIC_CONFIGURATION>
13
        	
14
				<!-- I/O FORMAT -->
15
				<PROPERTY key="mapreduce.inputformat.class" value="org.apache.hadoop.hbase.mapreduce.TableInputFormat" />
16
				<PROPERTY key="mapreduce.outputformat.class" value="org.apache.hadoop.mapreduce.lib.output.TextOutputFormat" />
17
        	
18
        		<!-- MAPPER -->
19
				<PROPERTY key="mapreduce.map.class" value="eu.dnetlib.data.mapreduce.hbase.dedup.gt.RootExportMapper" />
20
				<PROPERTY key="mapred.mapoutput.key.class" value="org.apache.hadoop.io.Text" />
21
				<PROPERTY key="mapred.mapoutput.value.class" value="org.apache.hadoop.io.Text" />
22
			
23
			
24
				<!-- MISC -->
25

  
26
				<PROPERTY key="mapred.reduce.tasks.speculative.execution" value="false" />	
27
				<PROPERTY key="mapred.map.tasks.speculative.execution" value="false" />	
28
				<PROPERTY key="mapreduce.map.speculative" value="false" />
29
				<PROPERTY key="mapreduce.reduce.speculative" value="false" />	
30

  
31
				<PROPERTY key="dfs.blocksize" value="256M" />
32
			
33
				<PROPERTY key="mapred.reduce.tasks" value="1" />
34
				<!-- <PROPERTY key="user.name" value="dnet" /> -->
35
				
36
		<!--  	Uncomment to override the default lib path -->			
37
		<!--	<PROPERTY key="job.lib" value="/user/dnet/dnet-mapreduce-jobs-0.0.2-SNAPSHOT-jar-with-dependencies.jar"/> -->
38
        	</STATIC_CONFIGURATION>
39
        	<JOB_INTERFACE>
40
        		<PARAM name="hbase.mapred.inputtable" required="true" description="source hbase table" />
41
        		<PARAM name="hbase.mapreduce.inputtable" required="true" description="source hbase table" />
42
        		
43
        		<PARAM name="mapred.output.dir" required="true" description="target sequence file on hdfs" />         		
44
        	</JOB_INTERFACE>
45
        	<SCAN>
46
        		<FILTERS operator="MUST_PASS_ALL">
47
        			<FILTER type="prefix" param="entityTypeId" />
48
        		</FILTERS>
49
        		<FAMILIES>
50
	       			<FAMILY param="entityType" />
51
        		</FAMILIES>
52
        	</SCAN>
53
        </HADOOP_JOB>
54
        <STATUS>
55
            <LAST_SUBMISSION_DATE value="2001-12-31T12:00:00"/>
56
            <RUNNING_INSTANCES value="0"/>
57
            <CUMULATIVE_RUN value="0" />
58
        </STATUS>
59
        <SECURITY_PARAMETERS>SECURITY_PARAMETERS</SECURITY_PARAMETERS>
60
    </BODY>
61
</RESOURCE_PROFILE>
modules/dnet-deduplication/trunk/src/main/resources/eu/dnetlib/test/profiles/meta/offline.export.meta.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<RESOURCE_PROFILE>
3
	<HEADER>
4
		<RESOURCE_IDENTIFIER value="cac21a4c-9207-424b-8d9a-01c53248f9e9_TWV0YVdvcmtmbG93RFNSZXNvdXJjZXMvTWV0YVdvcmtmbG93RFNSZXNvdXJjZVR5cGU=" />
5
		<RESOURCE_TYPE value="MetaWorkflowDSResourceType" />
6
		<RESOURCE_KIND value="MetaWorkflowDSResources" />
7
		<RESOURCE_URI value="" />
8
		<DATE_OF_CREATION value="2006-05-04T18:13:51.0Z" />
9
	</HEADER>
10
	<BODY>
11
		<METAWORKFLOW_NAME family="Export">Export</METAWORKFLOW_NAME>
12
		<METAWORKFLOW_DESCRIPTION></METAWORKFLOW_DESCRIPTION>
13
		<METAWORKFLOW_SECTION>InfoSpace Deduplication</METAWORKFLOW_SECTION>
14
		<ADMIN_EMAIL />
15
		<CONFIGURATION status="EXECUTABLE">
16
            <WORKFLOW id="7d183ac5-5aae-4022-84b6-37a9e4f69ab0_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="export roots"/>
17
		</CONFIGURATION>
18
		<SCHEDULING enabled="false">
19
			<CRON>29 5 22 ? * *</CRON>
20
			<MININTERVAL>10080</MININTERVAL>
21
		</SCHEDULING>
22
	</BODY>
23
</RESOURCE_PROFILE>
modules/dnet-deduplication/trunk/src/main/resources/eu/dnetlib/test/profiles/meta/workflows/dedup.roots.export.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<RESOURCE_PROFILE xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
3
	<HEADER>
4
		<RESOURCE_IDENTIFIER value="7d183ac5-5aae-4022-84b6-37a9e4f69ab0_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" />
5
		<RESOURCE_TYPE value="WorkflowDSResourceType" />
6
		<RESOURCE_KIND value="WorkflowDSResources" />
7
		<RESOURCE_URI value="" />
8
		<DATE_OF_CREATION value="2006-05-04T18:13:51.0Z" />
9
	</HEADER>
10
	<BODY>
11
		<WORKFLOW_NAME>Deduplication export</WORKFLOW_NAME>
12
		<WORKFLOW_TYPE>Deduplication</WORKFLOW_TYPE>
13
		<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
14
		<CONFIGURATION start="manual">
15
        	<NODE name="setCsvPath" type="SetEnvParameter" isStart="true">
16
				<DESCRIPTION>Set the CSV file path on HDFS</DESCRIPTION>
17
				<PARAMETERS>
18
					<PARAM managedBy="system" name="parameterName" required="true" type="string">csvPath</PARAM>
19
					<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM>
20
				</PARAMETERS>
21
				<ARCS>
22
					<ARC to="cleanupCsv" />
23
				</ARCS>
24
			</NODE>
25
			
26
			<NODE name="setDedupConfigs" type="SetDedupConfiguration" isStart="true">
27
				<DESCRIPTION>Set Dedup conf</DESCRIPTION>
28
				<PARAMETERS>
29
					<PARAM function="obtainValues('dedupOrchestrations', {})" required="true" type="string" name="dedupConfigSequence" managedBy="user"></PARAM>
30
					<PARAM required="true" type="string" name="dedupConfigSequenceParam" managedBy="system">dedup.conf.queue</PARAM>
31
				</PARAMETERS>
32
				<ARCS>
33
					<ARC to="cleanupCsv" />
34
				</ARCS>
35
			</NODE>
36
			
37
			<NODE name="hadoopConfig" type="SetClusterAndTable" isStart="true">
38
				<DESCRIPTION>Set table name</DESCRIPTION>
39
				<PARAMETERS>
40
					<PARAM required="true" type="string" name="cluster" managedBy="system">DM</PARAM>
41
					<PARAM required="true" type="string" name="tableParam" managedBy="system">tableName</PARAM>
42
				</PARAMETERS>
43
				<ARCS>
44
					<ARC to="cleanupCsv" />
45
				</ARCS>
46
			</NODE>				
47
			
48
			<NODE name="cleanupCsv" type="DeleteHdfsPathJob" isJoin="true">
49
				<DESCRIPTION>CSV files cleanup</DESCRIPTION>
50
				<PARAMETERS>
51
					<PARAM required="true" type="string" name="envParams" managedBy="system">
52
						{ 	
53
							'path' : 'csvPath',
54
							'cluster' : 'cluster'
55
						}
56
					</PARAM>					
57
				</PARAMETERS>
58
				<ARCS>
59
					<ARC to="export" />
60
				</ARCS>
61
			</NODE>
62
				
63
			<NODE name="export" type="SubmitHadoopJob">
64
				<DESCRIPTION>export the representative publications</DESCRIPTION>
65
				<PARAMETERS>
66
					<PARAM required="true" type="string" name="hadoopJob" managedBy="system">dedupRootsExportJob</PARAM>
67
					<PARAM required="true" type="string" name="sysParams" managedBy="system">
68
						{ 	
69
							'hbase.mapred.inputtable' : 'hbase.mapred.datatable', 
70
							'hbase.mapreduce.inputtable' : 'hbase.mapred.datatable'
71
						}
72
					</PARAM>					
73
					<PARAM required="true" type="string" name="envParams" managedBy="system">
74
						{ 	
75
							'cluster' : 'cluster',
76
							'entityTypeId' : 'entityTypeId',
77
							'entityType' : 'entityType',
78
							'hbase.mapred.inputtable' : 'tableName', 
79
							'hbase.mapreduce.inputtable' : 'tableName',
80
							'mapred.output.dir' : 'csvPath'
81
						}
82
					</PARAM>
83
				</PARAMETERS>
84
				<ARCS>
85
					<ARC to="success" />
86
				</ARCS>
87
			</NODE>
88
		</CONFIGURATION>
89
		<STATUS />
90
	</BODY>
91
</RESOURCE_PROFILE>
92

  
93

  
94

  
95

  
96

  
97

  

Also available in: Unified diff