Revision 37751
Added by Claudio Atzori over 9 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/gt/RootExportMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dedup.gt; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import org.apache.hadoop.hbase.client.Result; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
import org.apache.hadoop.io.Text; |
|
11 |
import org.apache.hadoop.mapreduce.Counter; |
|
12 |
|
|
13 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
14 |
import com.googlecode.protobuf.format.JsonFormat; |
|
15 |
|
|
16 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
17 |
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder; |
|
18 |
import eu.dnetlib.data.proto.KindProtos.Kind; |
|
19 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
20 |
import eu.dnetlib.data.proto.TypeProtos.Type; |
|
21 |
|
|
22 |
public class RootExportMapper extends TableMapper<Text, Text> { |
|
23 |
|
|
24 |
private Text outKey; |
|
25 |
|
|
26 |
private Text outValue; |
|
27 |
|
|
28 |
@Override |
|
29 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
30 |
outKey = new Text(""); |
|
31 |
outValue = new Text(); |
|
32 |
} |
|
33 |
|
|
34 |
@Override |
|
35 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
|
36 |
|
|
37 |
final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes()); |
|
38 |
|
|
39 |
final Type type = keyDecoder.getType(); |
|
40 |
if (!Type.result.equals(type) || !DedupUtils.isRoot(keyDecoder.getId())) return; |
|
41 |
|
|
42 |
final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes(type.toString())); |
|
43 |
final byte[] bodyB = map.get(DedupUtils.BODY_B); |
|
44 |
if (bodyB == null) { |
|
45 |
incrementCounter(context, "missing body (map)", type.toString(), 1); |
|
46 |
return; |
|
47 |
} |
|
48 |
|
|
49 |
final Oaf oaf = decodeProto(context, bodyB); |
|
50 |
|
|
51 |
if (!isValid(oaf)) { |
|
52 |
incrementCounter(context, "missing body (map)", type.toString(), 1); |
|
53 |
return; |
|
54 |
} |
|
55 |
|
|
56 |
emit(new String(keyIn.copyBytes()), context, oaf); |
|
57 |
incrementCounter(context, Kind.entity.toString(), getEntityType(oaf, type), 1); |
|
58 |
} |
|
59 |
|
|
60 |
private boolean isValid(final Oaf oaf) { |
|
61 |
return (oaf != null) && oaf.isInitialized(); |
|
62 |
} |
|
63 |
|
|
64 |
private Oaf decodeProto(final Context context, final byte[] body) { |
|
65 |
try { |
|
66 |
return Oaf.parseFrom(body); |
|
67 |
} catch (final InvalidProtocolBufferException e) { |
|
68 |
e.printStackTrace(System.err); |
|
69 |
context.getCounter("decodeProto", e.getClass().getName()).increment(1); |
|
70 |
} |
|
71 |
return null; |
|
72 |
} |
|
73 |
|
|
74 |
private void emit(final String key, final Context context, final Oaf oaf) throws IOException, InterruptedException { |
|
75 |
// outKey.set(key); |
|
76 |
outValue.set(JsonFormat.printToString(oaf)); |
|
77 |
|
|
78 |
context.write(outKey, outValue); |
|
79 |
} |
|
80 |
|
|
81 |
private void incrementCounter(final Context context, final String k, final String t, final int n) { |
|
82 |
getCounter(context, k, t).increment(n); |
|
83 |
} |
|
84 |
|
|
85 |
private Counter getCounter(final Context context, final String k, final String t) { |
|
86 |
return context.getCounter(k, t); |
|
87 |
} |
|
88 |
|
|
89 |
private String getEntityType(final Oaf oaf, final Type type) { |
|
90 |
switch (type) { |
|
91 |
case result: |
|
92 |
return oaf.getEntity().getResult().getMetadata().getResulttype().getClassid(); |
|
93 |
default: |
|
94 |
return type.toString(); |
|
95 |
} |
|
96 |
} |
|
97 |
|
|
98 |
} |
modules/dnet-openaireplus-profiles/trunk/src/main/resources/eu/dnetlib/test/profiles/HadoopJobConfigurationDSResources/HadoopJobConfigurationDSResourceType/dedupRootsExportJob.xml | ||
---|---|---|
1 |
<RESOURCE_PROFILE> |
|
2 |
<HEADER> |
|
3 |
<RESOURCE_IDENTIFIER value="53f2a9b4-adf3-4ceb-9308-d88b53dc44c5_SGFkb29wSm9iQ29uZmlndXJhdGlvbkRTUmVzb3VyY2VzL0hhZG9vcEpvYkNvbmZpZ3VyYXRpb25EU1Jlc291cmNlVHlwZQ=="/> |
|
4 |
<RESOURCE_TYPE value="HadoopJobConfigurationDSResourceType"/> |
|
5 |
<RESOURCE_KIND value="HadoopJobConfigurationDSResources"/> |
|
6 |
<RESOURCE_URI value=""/> |
|
7 |
<DATE_OF_CREATION value="2001-12-31T12:00:00"/> |
|
8 |
</HEADER> |
|
9 |
<BODY> |
|
10 |
<HADOOP_JOB name="dedupRootsExportJob" type="mapreduce"> |
|
11 |
<DESCRIPTION>map only job that exports the representative publications as json</DESCRIPTION> |
|
12 |
<STATIC_CONFIGURATION> |
|
13 |
|
|
14 |
<!-- I/O FORMAT --> |
|
15 |
<PROPERTY key="mapreduce.inputformat.class" value="org.apache.hadoop.hbase.mapreduce.TableInputFormat" /> |
|
16 |
<PROPERTY key="mapreduce.outputformat.class" value="org.apache.hadoop.mapreduce.lib.output.TextOutputFormat" /> |
|
17 |
|
|
18 |
<!-- MAPPER --> |
|
19 |
<PROPERTY key="mapreduce.map.class" value="eu.dnetlib.data.mapreduce.hbase.dedup.gt.RootExportMapper" /> |
|
20 |
<PROPERTY key="mapred.mapoutput.key.class" value="org.apache.hadoop.io.Text" /> |
|
21 |
<PROPERTY key="mapred.mapoutput.value.class" value="org.apache.hadoop.io.Text" /> |
|
22 |
|
|
23 |
|
|
24 |
<!-- MISC --> |
|
25 |
|
|
26 |
<PROPERTY key="mapred.reduce.tasks.speculative.execution" value="false" /> |
|
27 |
<PROPERTY key="mapred.map.tasks.speculative.execution" value="false" /> |
|
28 |
<PROPERTY key="mapreduce.map.speculative" value="false" /> |
|
29 |
<PROPERTY key="mapreduce.reduce.speculative" value="false" /> |
|
30 |
|
|
31 |
<PROPERTY key="dfs.blocksize" value="256M" /> |
|
32 |
|
|
33 |
<PROPERTY key="mapred.reduce.tasks" value="1" /> |
|
34 |
<!-- <PROPERTY key="user.name" value="dnet" /> --> |
|
35 |
|
|
36 |
<!-- Uncomment to override the default lib path --> |
|
37 |
<!-- <PROPERTY key="job.lib" value="/user/dnet/dnet-mapreduce-jobs-0.0.2-SNAPSHOT-jar-with-dependencies.jar"/> --> |
|
38 |
</STATIC_CONFIGURATION> |
|
39 |
<JOB_INTERFACE> |
|
40 |
<PARAM name="hbase.mapred.inputtable" required="true" description="source hbase table" /> |
|
41 |
<PARAM name="hbase.mapreduce.inputtable" required="true" description="source hbase table" /> |
|
42 |
|
|
43 |
<PARAM name="mapred.output.dir" required="true" description="target sequence file on hdfs" /> |
|
44 |
</JOB_INTERFACE> |
|
45 |
<SCAN> |
|
46 |
<FILTERS operator="MUST_PASS_ALL"> |
|
47 |
<FILTER type="prefix" param="entityTypeId" /> |
|
48 |
</FILTERS> |
|
49 |
<FAMILIES> |
|
50 |
<FAMILY param="entityType" /> |
|
51 |
</FAMILIES> |
|
52 |
</SCAN> |
|
53 |
</HADOOP_JOB> |
|
54 |
<STATUS> |
|
55 |
<LAST_SUBMISSION_DATE value="2001-12-31T12:00:00"/> |
|
56 |
<RUNNING_INSTANCES value="0"/> |
|
57 |
<CUMULATIVE_RUN value="0" /> |
|
58 |
</STATUS> |
|
59 |
<SECURITY_PARAMETERS>SECURITY_PARAMETERS</SECURITY_PARAMETERS> |
|
60 |
</BODY> |
|
61 |
</RESOURCE_PROFILE> |
modules/dnet-deduplication/trunk/src/main/resources/eu/dnetlib/test/profiles/meta/offline.export.meta.xml | ||
---|---|---|
1 |
<?xml version="1.0" encoding="UTF-8"?> |
|
2 |
<RESOURCE_PROFILE> |
|
3 |
<HEADER> |
|
4 |
<RESOURCE_IDENTIFIER value="cac21a4c-9207-424b-8d9a-01c53248f9e9_TWV0YVdvcmtmbG93RFNSZXNvdXJjZXMvTWV0YVdvcmtmbG93RFNSZXNvdXJjZVR5cGU=" /> |
|
5 |
<RESOURCE_TYPE value="MetaWorkflowDSResourceType" /> |
|
6 |
<RESOURCE_KIND value="MetaWorkflowDSResources" /> |
|
7 |
<RESOURCE_URI value="" /> |
|
8 |
<DATE_OF_CREATION value="2006-05-04T18:13:51.0Z" /> |
|
9 |
</HEADER> |
|
10 |
<BODY> |
|
11 |
<METAWORKFLOW_NAME family="Export">Export</METAWORKFLOW_NAME> |
|
12 |
<METAWORKFLOW_DESCRIPTION></METAWORKFLOW_DESCRIPTION> |
|
13 |
<METAWORKFLOW_SECTION>InfoSpace Deduplication</METAWORKFLOW_SECTION> |
|
14 |
<ADMIN_EMAIL /> |
|
15 |
<CONFIGURATION status="EXECUTABLE"> |
|
16 |
<WORKFLOW id="7d183ac5-5aae-4022-84b6-37a9e4f69ab0_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" name="export roots"/> |
|
17 |
</CONFIGURATION> |
|
18 |
<SCHEDULING enabled="false"> |
|
19 |
<CRON>29 5 22 ? * *</CRON> |
|
20 |
<MININTERVAL>10080</MININTERVAL> |
|
21 |
</SCHEDULING> |
|
22 |
</BODY> |
|
23 |
</RESOURCE_PROFILE> |
modules/dnet-deduplication/trunk/src/main/resources/eu/dnetlib/test/profiles/meta/workflows/dedup.roots.export.xml | ||
---|---|---|
1 |
<?xml version="1.0" encoding="UTF-8"?> |
|
2 |
<RESOURCE_PROFILE xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"> |
|
3 |
<HEADER> |
|
4 |
<RESOURCE_IDENTIFIER value="7d183ac5-5aae-4022-84b6-37a9e4f69ab0_V29ya2Zsb3dEU1Jlc291cmNlcy9Xb3JrZmxvd0RTUmVzb3VyY2VUeXBl" /> |
|
5 |
<RESOURCE_TYPE value="WorkflowDSResourceType" /> |
|
6 |
<RESOURCE_KIND value="WorkflowDSResources" /> |
|
7 |
<RESOURCE_URI value="" /> |
|
8 |
<DATE_OF_CREATION value="2006-05-04T18:13:51.0Z" /> |
|
9 |
</HEADER> |
|
10 |
<BODY> |
|
11 |
<WORKFLOW_NAME>Deduplication export</WORKFLOW_NAME> |
|
12 |
<WORKFLOW_TYPE>Deduplication</WORKFLOW_TYPE> |
|
13 |
<WORKFLOW_PRIORITY>30</WORKFLOW_PRIORITY> |
|
14 |
<CONFIGURATION start="manual"> |
|
15 |
<NODE name="setCsvPath" type="SetEnvParameter" isStart="true"> |
|
16 |
<DESCRIPTION>Set the CSV file path on HDFS</DESCRIPTION> |
|
17 |
<PARAMETERS> |
|
18 |
<PARAM managedBy="system" name="parameterName" required="true" type="string">csvPath</PARAM> |
|
19 |
<PARAM managedBy="user" name="parameterValue" required="true" type="string"></PARAM> |
|
20 |
</PARAMETERS> |
|
21 |
<ARCS> |
|
22 |
<ARC to="cleanupCsv" /> |
|
23 |
</ARCS> |
|
24 |
</NODE> |
|
25 |
|
|
26 |
<NODE name="setDedupConfigs" type="SetDedupConfiguration" isStart="true"> |
|
27 |
<DESCRIPTION>Set Dedup conf</DESCRIPTION> |
|
28 |
<PARAMETERS> |
|
29 |
<PARAM function="obtainValues('dedupOrchestrations', {})" required="true" type="string" name="dedupConfigSequence" managedBy="user"></PARAM> |
|
30 |
<PARAM required="true" type="string" name="dedupConfigSequenceParam" managedBy="system">dedup.conf.queue</PARAM> |
|
31 |
</PARAMETERS> |
|
32 |
<ARCS> |
|
33 |
<ARC to="cleanupCsv" /> |
|
34 |
</ARCS> |
|
35 |
</NODE> |
|
36 |
|
|
37 |
<NODE name="hadoopConfig" type="SetClusterAndTable" isStart="true"> |
|
38 |
<DESCRIPTION>Set table name</DESCRIPTION> |
|
39 |
<PARAMETERS> |
|
40 |
<PARAM required="true" type="string" name="cluster" managedBy="system">DM</PARAM> |
|
41 |
<PARAM required="true" type="string" name="tableParam" managedBy="system">tableName</PARAM> |
|
42 |
</PARAMETERS> |
|
43 |
<ARCS> |
|
44 |
<ARC to="cleanupCsv" /> |
|
45 |
</ARCS> |
|
46 |
</NODE> |
|
47 |
|
|
48 |
<NODE name="cleanupCsv" type="DeleteHdfsPathJob" isJoin="true"> |
|
49 |
<DESCRIPTION>CSV files cleanup</DESCRIPTION> |
|
50 |
<PARAMETERS> |
|
51 |
<PARAM required="true" type="string" name="envParams" managedBy="system"> |
|
52 |
{ |
|
53 |
'path' : 'csvPath', |
|
54 |
'cluster' : 'cluster' |
|
55 |
} |
|
56 |
</PARAM> |
|
57 |
</PARAMETERS> |
|
58 |
<ARCS> |
|
59 |
<ARC to="export" /> |
|
60 |
</ARCS> |
|
61 |
</NODE> |
|
62 |
|
|
63 |
<NODE name="export" type="SubmitHadoopJob"> |
|
64 |
<DESCRIPTION>export the representative publications</DESCRIPTION> |
|
65 |
<PARAMETERS> |
|
66 |
<PARAM required="true" type="string" name="hadoopJob" managedBy="system">dedupRootsExportJob</PARAM> |
|
67 |
<PARAM required="true" type="string" name="sysParams" managedBy="system"> |
|
68 |
{ |
|
69 |
'hbase.mapred.inputtable' : 'hbase.mapred.datatable', |
|
70 |
'hbase.mapreduce.inputtable' : 'hbase.mapred.datatable' |
|
71 |
} |
|
72 |
</PARAM> |
|
73 |
<PARAM required="true" type="string" name="envParams" managedBy="system"> |
|
74 |
{ |
|
75 |
'cluster' : 'cluster', |
|
76 |
'entityTypeId' : 'entityTypeId', |
|
77 |
'entityType' : 'entityType', |
|
78 |
'hbase.mapred.inputtable' : 'tableName', |
|
79 |
'hbase.mapreduce.inputtable' : 'tableName', |
|
80 |
'mapred.output.dir' : 'csvPath' |
|
81 |
} |
|
82 |
</PARAM> |
|
83 |
</PARAMETERS> |
|
84 |
<ARCS> |
|
85 |
<ARC to="success" /> |
|
86 |
</ARCS> |
|
87 |
</NODE> |
|
88 |
</CONFIGURATION> |
|
89 |
<STATUS /> |
|
90 |
</BODY> |
|
91 |
</RESOURCE_PROFILE> |
|
92 |
|
|
93 |
|
|
94 |
|
|
95 |
|
|
96 |
|
|
97 |
|
Also available in: Unified diff
added workflow to export the representative publications as json on hdfs