Revision 53103
Added by Claudio Atzori over 5 years ago
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/GridAcImportMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataimport; |
|
2 |
|
|
3 |
import com.googlecode.protobuf.format.JsonFormat; |
|
4 |
import eu.dnetlib.actionmanager.actions.ActionFactory; |
|
5 |
import eu.dnetlib.actionmanager.actions.AtomicAction; |
|
6 |
import eu.dnetlib.actionmanager.common.Agent; |
|
7 |
import eu.dnetlib.data.proto.OafProtos; |
|
8 |
import org.apache.hadoop.io.LongWritable; |
|
9 |
import org.apache.hadoop.io.Text; |
|
10 |
import org.apache.hadoop.mapreduce.Mapper; |
|
11 |
|
|
12 |
import java.io.IOException; |
|
13 |
|
|
14 |
public class GridAcImportMapper extends Mapper<LongWritable, Text, Text, Text> { |
|
15 |
|
|
16 |
private String setName; |
|
17 |
private Agent agent; |
|
18 |
private Text keyout; |
|
19 |
private Text valueOut; |
|
20 |
private ActionFactory factory; |
|
21 |
|
|
22 |
@Override |
|
23 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
24 |
setName = context.getConfiguration().get("setName"); |
|
25 |
agent = new Agent(context.getConfiguration().get("agentId"), context.getConfiguration().get("agentName"), Agent.AGENT_TYPE.service); |
|
26 |
keyout = new Text(""); |
|
27 |
valueOut = new Text(""); |
|
28 |
factory = new ActionFactory(); |
|
29 |
} |
|
30 |
|
|
31 |
@Override |
|
32 |
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { |
|
33 |
final String inputJson = value.toString(); |
|
34 |
final OafProtos.Oaf.Builder oaf = OafProtos.Oaf.newBuilder(); |
|
35 |
JsonFormat.merge(value.toString(), oaf); |
|
36 |
|
|
37 |
try { |
|
38 |
final AtomicAction action = GridAcToActions.generateActionsFromDump(oaf.build(), factory, setName, agent); |
|
39 |
|
|
40 |
keyout.set(action.getRowKey()); |
|
41 |
valueOut.set(action.toJSON()); |
|
42 |
context.write(keyout, valueOut); |
|
43 |
|
|
44 |
} catch (Throwable e) { |
|
45 |
System.err.println(inputJson); |
|
46 |
throw e; |
|
47 |
} |
|
48 |
|
|
49 |
|
|
50 |
} |
|
51 |
|
|
52 |
|
|
53 |
} |
modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/GridAcToActions.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataimport; |
|
2 |
|
|
3 |
import eu.dnetlib.actionmanager.actions.ActionFactory; |
|
4 |
import eu.dnetlib.actionmanager.actions.AtomicAction; |
|
5 |
import eu.dnetlib.actionmanager.common.Agent; |
|
6 |
import eu.dnetlib.data.proto.OafProtos; |
|
7 |
import org.apache.hadoop.hbase.util.Bytes; |
|
8 |
|
|
9 |
import static eu.dnetlib.data.proto.OafProtos.Oaf; |
|
10 |
|
|
11 |
public class GridAcToActions { |
|
12 |
|
|
13 |
private final static String columnFamily = "organizationOrganization_dedupSimilarity_isSimilarTo"; |
|
14 |
|
|
15 |
public static AtomicAction generateActionsFromDump(final Oaf oaf, ActionFactory factory, final String setName, final Agent agent) { |
|
16 |
|
|
17 |
switch (oaf.getKind()) { |
|
18 |
case entity: |
|
19 |
return factory.createAtomicAction(setName, agent, oaf.getEntity().getId(), "organization", "body", oaf.toByteArray()); |
|
20 |
case relation: |
|
21 |
final OafProtos.OafRel rel = oaf.getRel(); |
|
22 |
return factory.createAtomicAction(setName, agent, rel.getSource(), columnFamily, rel.getTarget(), Bytes.toBytes("")); |
|
23 |
} |
|
24 |
|
|
25 |
// should not happen |
|
26 |
return null; |
|
27 |
} |
|
28 |
|
|
29 |
} |
modules/dnet-openaireplus-profiles/trunk/src/main/resources/eu/dnetlib/test/profiles/HadoopJobConfigurationDSResources/HadoopJobConfigurationDSResourceType/importGridAcJob.xml | ||
---|---|---|
1 |
<RESOURCE_PROFILE> |
|
2 |
<HEADER> |
|
3 |
<RESOURCE_IDENTIFIER |
|
4 |
value="2b2f25aa-fbc8-4322-8ede-5441c0d09a5a_SGFkb29wSm9iQ29uZmlndXJhdGlvbkRTUmVzb3VyY2VzL0hhZG9vcEpvYkNvbmZpZ3VyYXRpb25EU1Jlc291cmNlVHlwZQ=="/> |
|
5 |
<RESOURCE_TYPE value="HadoopJobConfigurationDSResourceType"/> |
|
6 |
<RESOURCE_KIND value="HadoopJobConfigurationDSResources"/> |
|
7 |
<RESOURCE_URI value=""/> |
|
8 |
<DATE_OF_CREATION value="2001-12-31T12:00:00"/> |
|
9 |
</HEADER> |
|
10 |
<BODY> |
|
11 |
<HADOOP_JOB name="importGridAcJob" type="mapreduce"> |
|
12 |
<DESCRIPTION>map reduce job that import the scholexplorer dump into actions</DESCRIPTION> |
|
13 |
<STATIC_CONFIGURATION> |
|
14 |
|
|
15 |
<!-- I/O FORMAT --> |
|
16 |
<PROPERTY key="mapreduce.inputformat.class" value="org.apache.hadoop.mapreduce.lib.input.TextInputFormat"/> |
|
17 |
<PROPERTY key="mapreduce.outputformat.class" value="org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat"/> |
|
18 |
|
|
19 |
<!-- MAPPER --> |
|
20 |
<PROPERTY key="mapreduce.map.class" value="eu.dnetlib.data.mapreduce.hbase.dataimport.GridAcImportMapper"/> |
|
21 |
<PROPERTY key="mapred.mapoutput.key.class" value="org.apache.hadoop.io.Text"/> |
|
22 |
<PROPERTY key="mapred.mapoutput.value.class" value="org.apache.hadoop.io.Text"/> |
|
23 |
|
|
24 |
<PROPERTY key="mapred.output.key.class" value="org.apache.hadoop.io.Text"/> |
|
25 |
<PROPERTY key="mapred.output.value.class" value="org.apache.hadoop.io.Text"/> |
|
26 |
|
|
27 |
<!-- MISC --> |
|
28 |
<PROPERTY key="mapred.compress.map.output" value="false"/> |
|
29 |
<PROPERTY key="mapred.map.tasks.speculative.execution" value="false"/> |
|
30 |
<PROPERTY key="mapreduce.map.speculative" value="false"/> |
|
31 |
|
|
32 |
<PROPERTY key="mapred.reduce.tasks" value="0"/> |
|
33 |
<PROPERTY key="dfs.blocksize" value="256M"/> |
|
34 |
|
|
35 |
<!-- Crossref Mapper Properties --> |
|
36 |
<PROPERTY key="setName" value="crossref-dump"/> |
|
37 |
<PROPERTY key="agentId" value="dnet"/> |
|
38 |
<PROPERTY key="agentName" value="D-Net"/> |
|
39 |
<PROPERTY key="invisible" value="true"/> |
|
40 |
|
|
41 |
<!-- <PROPERTY key="user.name" value="dnet" /> --> |
|
42 |
|
|
43 |
<!-- Uncomment to override the default lib path --> |
|
44 |
<!-- <PROPERTY key="job.lib" value="/user/dnet/dnet-mapreduce-jobs-0.0.2-SNAPSHOT-jar-with-dependencies.jar"/> --> |
|
45 |
</STATIC_CONFIGURATION> |
|
46 |
<JOB_INTERFACE> |
|
47 |
<PARAM name="mapred.input.dir" required="true" description="source sequence file on hdfs"/> |
|
48 |
<PARAM name="mapred.output.dir" required="true" description="target sequence file on hdfs"/> |
|
49 |
</JOB_INTERFACE> |
|
50 |
<SCAN> |
|
51 |
<FILTERS/> |
|
52 |
<FAMILIES/> |
|
53 |
</SCAN> |
|
54 |
</HADOOP_JOB> |
|
55 |
<STATUS> |
|
56 |
<LAST_SUBMISSION_DATE value="2001-12-31T12:00:00"/> |
|
57 |
<RUNNING_INSTANCES value="0"/> |
|
58 |
<CUMULATIVE_RUN value="0"/> |
|
59 |
</STATUS> |
|
60 |
<SECURITY_PARAMETERS>SECURITY_PARAMETERS</SECURITY_PARAMETERS> |
|
61 |
</BODY> |
|
62 |
</RESOURCE_PROFILE> |
|
63 |
|
Also available in: Unified diff
added mapper and hadoop job configuration file for importing Grid.AC organization data