Project

General

Profile

« Previous | Next » 

Revision 53103

added mapper and hadoop job configuration file for importing Grid.AC organization data

View differences:

modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/GridAcImportMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

  
3
import com.googlecode.protobuf.format.JsonFormat;
4
import eu.dnetlib.actionmanager.actions.ActionFactory;
5
import eu.dnetlib.actionmanager.actions.AtomicAction;
6
import eu.dnetlib.actionmanager.common.Agent;
7
import eu.dnetlib.data.proto.OafProtos;
8
import org.apache.hadoop.io.LongWritable;
9
import org.apache.hadoop.io.Text;
10
import org.apache.hadoop.mapreduce.Mapper;
11

  
12
import java.io.IOException;
13

  
14
public class GridAcImportMapper extends Mapper<LongWritable, Text, Text, Text> {
15

  
16
    private String setName;
17
    private Agent agent;
18
    private Text keyout;
19
    private Text valueOut;
20
    private ActionFactory factory;
21

  
22
    @Override
23
    protected void setup(Context context) throws IOException, InterruptedException {
24
        setName = context.getConfiguration().get("setName");
25
        agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service);
26
        keyout = new Text("");
27
        valueOut = new Text("");
28
        factory = new ActionFactory();
29
    }
30

  
31
    @Override
32
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
33
        final String inputJson = value.toString();
34
        final OafProtos.Oaf.Builder oaf = OafProtos.Oaf.newBuilder();
35
        JsonFormat.merge(value.toString(), oaf);
36

  
37
        try {
38
            final AtomicAction action = GridAcToActions.generateActionsFromDump(oaf.build(), factory, setName, agent);
39

  
40
            keyout.set(action.getRowKey());
41
            valueOut.set(action.toJSON());
42
            context.write(keyout, valueOut);
43

  
44
        } catch (Throwable e) {
45
            System.err.println(inputJson);
46
            throw e;
47
        }
48

  
49

  
50
    }
51

  
52

  
53
}
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/GridAcToActions.java
1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

  
3
import eu.dnetlib.actionmanager.actions.ActionFactory;
4
import eu.dnetlib.actionmanager.actions.AtomicAction;
5
import eu.dnetlib.actionmanager.common.Agent;
6
import eu.dnetlib.data.proto.OafProtos;
7
import org.apache.hadoop.hbase.util.Bytes;
8

  
9
import static eu.dnetlib.data.proto.OafProtos.Oaf;
10

  
11
public class GridAcToActions {
12

  
13
    private final static String columnFamily = "organizationOrganization_dedupSimilarity_isSimilarTo";
14

  
15
    public static AtomicAction generateActionsFromDump(final Oaf oaf, ActionFactory factory, final String setName, final Agent agent) {
16

  
17
        switch (oaf.getKind()) {
18
            case entity:
19
                return factory.createAtomicAction(setName, agent, oaf.getEntity().getId(), "organization", "body", oaf.toByteArray());
20
            case relation:
21
                final OafProtos.OafRel rel = oaf.getRel();
22
                return factory.createAtomicAction(setName, agent, rel.getSource(), columnFamily, rel.getTarget(), Bytes.toBytes(""));
23
        }
24

  
25
        // should not happen
26
        return null;
27
    }
28

  
29
}
modules/dnet-openaireplus-profiles/trunk/src/main/resources/eu/dnetlib/test/profiles/HadoopJobConfigurationDSResources/HadoopJobConfigurationDSResourceType/importGridAcJob.xml
1
<RESOURCE_PROFILE>
2
	<HEADER>
3
		<RESOURCE_IDENTIFIER
4
				value="2b2f25aa-fbc8-4322-8ede-5441c0d09a5a_SGFkb29wSm9iQ29uZmlndXJhdGlvbkRTUmVzb3VyY2VzL0hhZG9vcEpvYkNvbmZpZ3VyYXRpb25EU1Jlc291cmNlVHlwZQ=="/>
5
		<RESOURCE_TYPE value="HadoopJobConfigurationDSResourceType"/>
6
		<RESOURCE_KIND value="HadoopJobConfigurationDSResources"/>
7
		<RESOURCE_URI value=""/>
8
		<DATE_OF_CREATION value="2001-12-31T12:00:00"/>
9
	</HEADER>
10
	<BODY>
11
		<HADOOP_JOB name="importGridAcJob" type="mapreduce">
12
			<DESCRIPTION>map reduce job that import the scholexplorer dump into actions</DESCRIPTION>
13
			<STATIC_CONFIGURATION>
14

  
15
				<!-- I/O FORMAT -->
16
				<PROPERTY key="mapreduce.inputformat.class" value="org.apache.hadoop.mapreduce.lib.input.TextInputFormat"/>
17
				<PROPERTY key="mapreduce.outputformat.class" value="org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"/>
18

  
19
				<!-- MAPPER -->
20
				<PROPERTY key="mapreduce.map.class" value="eu.dnetlib.data.mapreduce.hbase.dataimport.GridAcImportMapper"/>
21
				<PROPERTY key="mapred.mapoutput.key.class" value="org.apache.hadoop.io.Text"/>
22
				<PROPERTY key="mapred.mapoutput.value.class" value="org.apache.hadoop.io.Text"/>
23

  
24
				<PROPERTY key="mapred.output.key.class" value="org.apache.hadoop.io.Text"/>
25
				<PROPERTY key="mapred.output.value.class" value="org.apache.hadoop.io.Text"/>
26

  
27
				<!-- MISC -->
28
				<PROPERTY key="mapred.compress.map.output" value="false"/>
29
				<PROPERTY key="mapred.map.tasks.speculative.execution" value="false"/>
30
				<PROPERTY key="mapreduce.map.speculative" value="false"/>
31

  
32
				<PROPERTY key="mapred.reduce.tasks" value="0"/>
33
				<PROPERTY key="dfs.blocksize" value="256M"/>
34

  
35
                <!-- Crossref Mapper Properties -->
36
                <PROPERTY key="setName" value="crossref-dump"/>
37
				<PROPERTY key="agentId" value="dnet"/>
38
				<PROPERTY key="agentName" value="D-Net"/>
39
				<PROPERTY key="invisible" value="true"/>
40

  
41
				<!-- <PROPERTY key="user.name" value="dnet" /> -->
42

  
43
				<!--  	Uncomment to override the default lib path -->
44
				<!--	<PROPERTY key="job.lib" value="/user/dnet/dnet-mapreduce-jobs-0.0.2-SNAPSHOT-jar-with-dependencies.jar"/> -->
45
			</STATIC_CONFIGURATION>
46
			<JOB_INTERFACE>
47
				<PARAM name="mapred.input.dir" required="true" description="source sequence file on hdfs"/>
48
				<PARAM name="mapred.output.dir" required="true" description="target sequence file on hdfs"/>
49
			</JOB_INTERFACE>
50
			<SCAN>
51
				<FILTERS/>
52
				<FAMILIES/>
53
			</SCAN>
54
		</HADOOP_JOB>
55
		<STATUS>
56
			<LAST_SUBMISSION_DATE value="2001-12-31T12:00:00"/>
57
			<RUNNING_INSTANCES value="0"/>
58
			<CUMULATIVE_RUN value="0"/>
59
		</STATUS>
60
		<SECURITY_PARAMETERS>SECURITY_PARAMETERS</SECURITY_PARAMETERS>
61
	</BODY>
62
</RESOURCE_PROFILE>
63

  

Also available in: Unified diff