Project

General

Profile

« Previous | Next » 

Revision 54922

implementation of the procedure to export native softwares on hdfs
addition of needed workflows and classes

View differences:

modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportSoftwaresMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

  
3
import com.googlecode.protobuf.format.JsonFormat;
4
import eu.dnetlib.data.mapreduce.util.DedupUtils;
5
import eu.dnetlib.data.mapreduce.util.OafDecoder;
6
import eu.dnetlib.data.proto.OafProtos;
7
import eu.dnetlib.data.proto.ResultProtos;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.hbase.util.Bytes;
14
import org.apache.hadoop.io.Text;
15

  
16
import java.io.IOException;
17

  
18
/**
19
 * Exports the result identifiers as json.
20
 *
21
 * @author claudio
22
 */
23
public class ExportSoftwaresMapper extends TableMapper<Text, Text> {
24

  
25
	/**
26
	 * logger.
27
	 */
28
	private static final Log log = LogFactory.getLog(ExportSoftwaresMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
29

  
30
	private static final String CF = "result";
31

  
32
	private Text keyOut;
33

  
34
	private Text valueOut;
35

  
36
	@Override
37
	protected void setup(final Context context) throws IOException, InterruptedException {
38
		super.setup(context);
39

  
40
		keyOut = new Text("");
41
		valueOut = new Text("");
42
	}
43

  
44
	@Override
45
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
46
		try {
47
			final byte[] body = value.getValue(Bytes.toBytes(CF), DedupUtils.BODY_B);
48

  
49
			if (body == null) {
50
				context.getCounter(CF, "missing body").increment(1);
51
				return;
52
			}
53

  
54
			final OafProtos.Oaf d = OafDecoder.decode(body).getOaf();
55

  
56
			String classId = d.getEntity().getResult().getMetadata().getResulttype().getClassid();
57
			if (!classId.equals("software")){
58
				context.getCounter(CF, classId).increment(1);
59
				return;
60
			}
61

  
62
			String rowId = new String(keyIn.copyBytes());
63
			if (rowId.contains("dedup_wf")){
64
				context.getCounter(CF, "root record").increment(1);
65
				return;
66
			}
67

  
68
			valueOut.set(JsonFormat.printToString(d.getEntity()));
69

  
70
			context.write(keyOut, valueOut);
71

  
72
		} catch (final Throwable e) {
73
			log.error("error exporting the following record from HBase: " + value.toString(), e);
74
			context.getCounter("error", e.getClass().getName()).increment(1);
75
			throw new RuntimeException(e);
76
		}
77
	}
78

  
79
}
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/dm/export.native.softwares.xml
1
<RESOURCE_PROFILE xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
2
    <HEADER>
3
        <RESOURCE_IDENTIFIER value="66fbd899-a981-4fd7-b27b-e546f8823ac7_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl"/>
4
        <RESOURCE_TYPE value="WorkflowDSResourceType"/>
5
        <RESOURCE_KIND value="WorkflowDSResources"/>
6
        <RESOURCE_URI value=""/>
7
        <DATE_OF_CREATION value="2018-10-31T11:02:13+00:00"/>
8
    </HEADER>
9
    <BODY>
10
        <WORKFLOW_NAME>Softwares Exporter</WORKFLOW_NAME>
11
        <WORKFLOW_TYPE>Export Infospace</WORKFLOW_TYPE>
12
        <WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY>
13
        <CONFIGURATION start="manual">
14
            <NODE isStart="true" name="setOutputPath" type="SetEnvParameter">
15
                <DESCRIPTION>Set the output file path on HDFS</DESCRIPTION>
16
                <PARAMETERS>
17
                    <PARAM managedBy="system" name="parameterName" required="true" type="string">outputPath</PARAM>
18
                    <PARAM managedBy="user" name="parameterValue" required="true" type="string">/tmp/organizations.2018.10.31</PARAM>
19
                </PARAMETERS>
20
                <ARCS>
21
                    <ARC to="cleanupOutput"/>
22
                </ARCS>
23
            </NODE>
24
            <NODE isStart="true" name="hadoopConfig" type="SetClusterAndTable">
25
                <DESCRIPTION>Set table name</DESCRIPTION>
26
                <PARAMETERS>
27
                    <PARAM managedBy="system" name="cluster" required="true" type="string">DM</PARAM>
28
                    <PARAM managedBy="system" name="tableParam" required="true" type="string">tableName</PARAM>
29
                    <PARAM managedBy="user" name="table" required="true" type="string">db_openaireplus_services_beta</PARAM>
30
                </PARAMETERS>
31
                <ARCS>
32
                    <ARC to="cleanupOutput"/>
33
                </ARCS>
34
            </NODE>
35
            <NODE isJoin="true" name="cleanupOutput" type="DeleteHdfsPathJob">
36
                <DESCRIPTION>CSV files cleanup</DESCRIPTION>
37
                <PARAMETERS>
38
                    <PARAM managedBy="system" name="envParams" required="true" type="string">
39
                        {
40
                        'path' : 'outputPath',
41
                        'cluster' : 'cluster'
42
                        }
43
                    </PARAM>
44
                </PARAMETERS>
45
                <ARCS>
46
                    <ARC to="export"/>
47
                </ARCS>
48
            </NODE>
49
            <NODE name="export" type="SubmitHadoopJob">
50
                <DESCRIPTION>export the information space as sequence file</DESCRIPTION>
51
                <PARAMETERS>
52
                    <PARAM managedBy="system" name="hadoopJob" required="true" type="string">informationSpaceSoftwareExportJob</PARAM>
53
                    <PARAM managedBy="system" name="envParams" required="true" type="string">
54
                        {
55
                        'cluster' : 'cluster',
56
                        'hbase.mapred.inputtable' : 'tableName',
57
                        'hbase.mapreduce.inputtable' : 'tableName',
58
                        'mapred.output.dir' : 'outputPath'
59
                        }
60
                    </PARAM>
61
                </PARAMETERS>
62
                <ARCS>
63
                    <ARC to="success"/>
64
                </ARCS>
65
            </NODE>
66
        </CONFIGURATION>
67
        <STATUS>
68
            <LAST_EXECUTION_ID>wf_20181031_114608_13</LAST_EXECUTION_ID>
69
            <LAST_EXECUTION_DATE>2018-10-31T11:49:33+00:00</LAST_EXECUTION_DATE>
70
            <LAST_EXECUTION_STATUS>SUCCESS</LAST_EXECUTION_STATUS>
71
            <LAST_EXECUTION_ERROR/>
72
        </STATUS>
73
    </BODY>
74
</RESOURCE_PROFILE>
modules/dnet-openaireplus-profiles/trunk/src/main/resources/eu/dnetlib/test/profiles/HadoopJobConfigurationDSResources/HadoopJobConfigurationDSResourceType/informationSpaceSoftwareExportJob.xml
1
<RESOURCE_PROFILE>
2
    <HEADER>
3
        <RESOURCE_IDENTIFIER value="42f42d16-873c-4dec-97ef-56128da012e8_SGFkb29wSm9iQ29uZmlndXJhdGlvbkRTUmVzb3VyY2VzL0hhZG9vcEpvYkNvbmZpZ3VyYXRpb25EU1Jlc291cmNlVHlwZQ=="/>
4
        <RESOURCE_TYPE value="HadoopJobConfigurationDSResourceType"/>
5
        <RESOURCE_KIND value="HadoopJobConfigurationDSResources"/>
6
        <RESOURCE_URI value=""/>
7
        <DATE_OF_CREATION value="2001-12-31T12:00:00"/>
8
    </HEADER>
9
    <BODY>
10
        <HADOOP_JOB name="informationSpaceSoftwareExportJob" type="mapreduce">
11
 			<DESCRIPTION>map reduce job that exports the softwares</DESCRIPTION>
12
        	<STATIC_CONFIGURATION>
13
        	
14
				<!-- I/O FORMAT -->
15
				<PROPERTY key="mapreduce.inputformat.class" value="org.apache.hadoop.hbase.mapreduce.TableInputFormat" />
16
				<PROPERTY key="mapreduce.outputformat.class" value="org.apache.hadoop.mapreduce.lib.output.TextOutputFormat" />
17
        	
18
        		<!-- MAPPER -->
19
				<PROPERTY key="mapreduce.map.class" value="eu.dnetlib.data.mapreduce.hbase.dataexport.ExportSoftwaresMapper" />
20
				<PROPERTY key="mapred.mapoutput.key.class" value="org.apache.hadoop.io.Text" />
21
				<PROPERTY key="mapred.mapoutput.value.class" value="org.apache.hadoop.io.Text" />
22
			
23
			
24
				<PROPERTY key="mapred.output.key.class" value="org.apache.hadoop.io.Text" />				
25
				<PROPERTY key="mapred.output.value.class" value="org.apache.hadoop.io.Text" />
26

  
27

  
28
				<!-- MISC -->
29
				<PROPERTY key="mapred.reduce.tasks.speculative.execution" value="false" />
30
				<PROPERTY key="mapred.map.tasks.speculative.execution" value="false" />	
31
				<PROPERTY key="mapreduce.map.speculative" value="false" />
32
				<PROPERTY key="mapreduce.reduce.speculative" value="false" />	
33

  
34
				<PROPERTY key="dfs.blocksize" value="256M" />
35

  
36
				<PROPERTY key="mapred.reduce.tasks" value="1"/>
37
				<!-- <PROPERTY key="user.name" value="dnet" /> -->
38
				
39
		<!--  	Uncomment to override the default lib path -->			
40
		<!--	<PROPERTY key="job.lib" value="/user/dnet/dnet-mapreduce-jobs-0.0.2-SNAPSHOT-jar-with-dependencies.jar"/> -->
41
        	</STATIC_CONFIGURATION>
42
        	<JOB_INTERFACE>
43
        		<PARAM name="hbase.mapred.inputtable" required="true" description="source hbase table" />
44
        		<PARAM name="hbase.mapreduce.inputtable" required="true" description="source hbase table" />
45
        		
46
        		<PARAM name="mapred.output.dir" required="true" description="target sequence file on hdfs" />         		
47
        	</JOB_INTERFACE>
48
        	<SCAN>
49
        		<FILTERS />
50
        		<FAMILIES />
51
        	</SCAN>
52
        </HADOOP_JOB>
53
        <STATUS>
54
            <LAST_SUBMISSION_DATE value="2001-12-31T12:00:00"/>
55
            <RUNNING_INSTANCES value="0"/>
56
            <CUMULATIVE_RUN value="0" />
57
        </STATUS>
58
        <SECURITY_PARAMETERS>SECURITY_PARAMETERS</SECURITY_PARAMETERS>
59
    </BODY>
60
</RESOURCE_PROFILE>

Also available in: Unified diff