Revision 60316
Added by Claudio Atzori over 3 years ago
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/PrepareEnvTransformHadoopJobNode.java | ||
---|---|---|
16 | 16 |
|
17 | 17 |
private static final Log log = LogFactory.getLog(PrepareEnvTransformHadoopJobNode.class); |
18 | 18 |
|
19 |
private String hdfsBasePath; |
|
20 |
|
|
21 | 19 |
private String ruleId; |
22 | 20 |
|
23 | 21 |
@Autowired |
24 | 22 |
private UniqueServiceLocator serviceLocator; |
25 | 23 |
|
24 |
private String mdstoreInput; |
|
25 |
|
|
26 |
private String mdstoreOutput; |
|
27 |
|
|
26 | 28 |
@Override |
27 | 29 |
protected String execute(final NodeToken token) throws Exception { |
28 | 30 |
|
29 |
final String reading_mdId = token.getEnv().getAttribute("reading_mdId"); |
|
30 |
final String reading_versionId = token.getEnv().getAttribute("reading_versionId"); |
|
31 |
|
|
32 |
final String writing_mdId = token.getEnv().getAttribute("mdId"); |
|
33 |
final String writing_versionId = token.getEnv().getAttribute("versionId"); |
|
34 |
|
|
35 |
final String hdfsNativePath = String.format("%s/%s/%s/store", hdfsBasePath, reading_mdId, reading_versionId); |
|
36 |
final String hdfsTransformedPath = String.format("%s/%s/%s/store", hdfsBasePath, writing_mdId, writing_versionId); |
|
37 |
|
|
38 |
|
|
39 | 31 |
final String trRule = serviceLocator.getService(ISLookUpService.class).getResourceProfile(getRuleId()); |
40 | 32 |
|
41 | 33 |
token.getEnv().setAttribute("timestamp", "" + System.currentTimeMillis()); |
42 | 34 |
token.getEnv().setAttribute("workflowId", token.getProcess().getEnv().getAttribute("system:processId")); |
43 |
token.getEnv().setAttribute("mdstoreInputPath", hdfsNativePath);
|
|
44 |
token.getEnv().setAttribute("mdstoreOutputPath", hdfsTransformedPath);
|
|
35 |
token.getEnv().setAttribute("mdstoreInput", getMdstoreInput());
|
|
36 |
token.getEnv().setAttribute("mdstoreOutput", getMdstoreOutput());
|
|
45 | 37 |
token.getEnv().setAttribute("transformationRule", DHPUtils.compressString(trRule)); |
46 | 38 |
return Arc.DEFAULT_ARC; |
47 | 39 |
|
48 | 40 |
} |
49 | 41 |
|
50 |
public String getHdfsBasePath() { |
|
51 |
return hdfsBasePath; |
|
52 |
} |
|
53 |
|
|
54 |
public void setHdfsBasePath(final String hdfsBasePath) { |
|
55 |
this.hdfsBasePath = hdfsBasePath; |
|
56 |
} |
|
57 |
|
|
58 | 42 |
public String getRuleId() { |
59 | 43 |
return ruleId; |
60 | 44 |
} |
... | ... | |
70 | 54 |
public void setServiceLocator(UniqueServiceLocator serviceLocator) { |
71 | 55 |
this.serviceLocator = serviceLocator; |
72 | 56 |
} |
57 |
|
|
58 |
public String getMdstoreInput() { |
|
59 |
return mdstoreInput; |
|
60 |
} |
|
61 |
|
|
62 |
public void setMdstoreInput(String mdstoreInput) { |
|
63 |
this.mdstoreInput = mdstoreInput; |
|
64 |
} |
|
65 |
|
|
66 |
public String getMdstoreOutput() { |
|
67 |
return mdstoreOutput; |
|
68 |
} |
|
69 |
|
|
70 |
public void setMdstoreOutput(String mdstoreOutput) { |
|
71 |
this.mdstoreOutput = mdstoreOutput; |
|
72 |
} |
|
73 | 73 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/PrepareEnvCollectHadoopJobNode.java | ||
---|---|---|
5 | 5 |
|
6 | 6 |
import eu.dnetlib.dhp.model.mdstore.Provenance; |
7 | 7 |
import eu.dnetlib.enabling.datasources.common.Datasource; |
8 |
import eu.dnetlib.enabling.datasources.common.DsmNotFoundException; |
|
8 | 9 |
import org.apache.commons.logging.Log; |
9 | 10 |
import org.apache.commons.logging.LogFactory; |
10 | 11 |
import org.springframework.beans.factory.annotation.Autowired; |
... | ... | |
22 | 23 |
public class PrepareEnvCollectHadoopJobNode extends SimpleJobNode { |
23 | 24 |
|
24 | 25 |
private static final Log log = LogFactory.getLog(PrepareEnvCollectHadoopJobNode.class); |
26 |
public static final String METADATA_IDENTIFIER_PATH = "metadata_identifier_path"; |
|
25 | 27 |
|
26 | 28 |
@Autowired |
27 | 29 |
private LocalDatasourceManager<?, ?> dsManager; |
28 | 30 |
|
29 |
private String hdfsBasePath; |
|
31 |
/** |
|
32 |
* MDStore identifier |
|
33 |
*/ |
|
34 |
private String mdId; |
|
30 | 35 |
|
36 |
/** |
|
37 |
* REFRESH | INCREMENTAL |
|
38 |
*/ |
|
39 |
private String collectionMode; |
|
40 |
|
|
31 | 41 |
@Override |
32 | 42 |
protected String execute(final NodeToken token) throws Exception { |
33 | 43 |
|
... | ... | |
36 | 46 |
// param 3 : nameNode |
37 | 47 |
|
38 | 48 |
final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID); |
49 |
log.info("dsId: " + dsId); |
|
39 | 50 |
final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE); |
40 |
final String mdId = token.getEnv().getAttribute("mdId"); |
|
41 |
final String versionId = token.getEnv().getAttribute("versionId"); |
|
51 |
log.info("apiId: " + apiId); |
|
42 | 52 |
|
43 | 53 |
final Optional<ApiDescriptor> opt = dsManager.getApis(dsId) |
44 | 54 |
.stream() |
... | ... | |
48 | 58 |
res.setBaseUrl(a.getBaseurl()); |
49 | 59 |
res.setId(a.getId()); |
50 | 60 |
res.setProtocol(a.getProtocol()); |
51 |
res.getParams().put("metadata_identifier_path", a.getMetadataIdentifierPath());
|
|
61 |
res.getParams().put(METADATA_IDENTIFIER_PATH, a.getMetadataIdentifierPath());
|
|
52 | 62 |
res.getParams().putAll(a.getApiParams() |
53 | 63 |
.stream() |
54 | 64 |
.map(o -> (ApiParam) o) |
... | ... | |
58 | 68 |
.findFirst(); |
59 | 69 |
|
60 | 70 |
if (opt.isPresent()) { |
71 |
token.getEnv().setAttribute("mdId", getMdId()); |
|
72 |
token.getEnv().setAttribute("collectionMode", getCollectionMode()); |
|
73 |
|
|
61 | 74 |
final ApiDescriptor api = opt.get(); |
62 |
final String hdfsPath = String.format("%s/%s/%s/store", hdfsBasePath, mdId, versionId); |
|
63 |
final String seqFilePath = String.format("%s/%s/%s/seqFile", hdfsBasePath, mdId, versionId); |
|
64 | 75 |
token.getEnv().setAttribute("apiDescription", new Gson().toJson(api)); |
65 |
token.getEnv().setAttribute("mdStorePath", hdfsPath); |
|
66 |
token.getEnv().setAttribute("sequenceFilePath", seqFilePath); |
|
76 |
|
|
67 | 77 |
final Provenance provenance = new Provenance(); |
68 | 78 |
provenance.setDatasourceId(dsId); |
69 | 79 |
final Datasource<?, ?> ds = dsManager.getDs(dsId); |
70 | 80 |
provenance.setDatasourceName(ds.getOfficialname()); |
71 | 81 |
provenance.setNsPrefix(ds.getNamespaceprefix()); |
72 |
token.getEnv().setAttribute("dataSourceInfo", new Gson().toJson(provenance)); |
|
82 |
final String dsProvenance = new Gson().toJson(provenance); |
|
83 |
log.info("datasource provenance: " + dsProvenance); |
|
84 |
|
|
85 |
token.getEnv().setAttribute("dataSourceInfo", dsProvenance); |
|
73 | 86 |
token.getEnv().setAttribute("timestamp", ""+System.currentTimeMillis()); |
74 |
token.getEnv().setAttribute("identifierPath",api.getParams().get("metadata_identifier_path"));
|
|
87 |
token.getEnv().setAttribute("identifierPath",api.getParams().get(METADATA_IDENTIFIER_PATH));
|
|
75 | 88 |
token.getEnv().setAttribute("workflowId",token.getProcess().getEnv().getAttribute("system:processId")); |
76 | 89 |
|
77 | 90 |
token.getEnv().setAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL, api.getBaseUrl()); |
... | ... | |
85 | 98 |
|
86 | 99 |
return Arc.DEFAULT_ARC; |
87 | 100 |
} else { |
88 |
return "abort";
|
|
101 |
throw new DsmNotFoundException("cannot find ds interface: " + apiId);
|
|
89 | 102 |
} |
103 |
} |
|
90 | 104 |
|
105 |
public String getMdId() { |
|
106 |
return mdId; |
|
91 | 107 |
} |
92 | 108 |
|
93 |
public String getHdfsBasePath() {
|
|
94 |
return hdfsBasePath;
|
|
109 |
public void setMdId(String mdId) {
|
|
110 |
this.mdId = mdId;
|
|
95 | 111 |
} |
96 | 112 |
|
97 |
public void setHdfsBasePath(String hdfsBasePath) {
|
|
98 |
this.hdfsBasePath = hdfsBasePath;
|
|
113 |
public String getCollectionMode() {
|
|
114 |
return collectionMode;
|
|
99 | 115 |
} |
116 |
|
|
117 |
public void setCollectionMode(String collectionMode) { |
|
118 |
this.collectionMode = collectionMode; |
|
119 |
} |
|
100 | 120 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/collection.wf.st | ||
---|---|---|
1 |
<NODE name="SET_INFO" isStart="true" type="SetProviderInfo">
|
|
1 |
<NODE name="SET_INFO" isStart="true" type="SetProviderInfo"> |
|
2 | 2 |
<DESCRIPTION>Set information about current provider</DESCRIPTION> |
3 | 3 |
<PARAMETERS> |
4 | 4 |
<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM> |
... | ... | |
16 | 16 |
<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM> |
17 | 17 |
</PARAMETERS> |
18 | 18 |
<ARCS> |
19 |
<ARC to="PREPARE_STORE_VERSION"/>
|
|
19 |
<ARC to="PREPARE_ENV_COLLECTION"/>
|
|
20 | 20 |
</ARCS> |
21 | 21 |
</NODE> |
22 |
<NODE name="PREPARE_STORE_VERSION" type="PrepareMDStoreVersion"> |
|
23 |
<DESCRIPTION>Prepare a new MdStore Version</DESCRIPTION> |
|
24 |
<PARAMETERS> |
|
25 |
<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM> |
|
26 |
</PARAMETERS> |
|
27 |
<ARCS> |
|
28 |
<ARC to="PREPARE_ENV_COLLECTION"/> |
|
29 |
</ARCS> |
|
30 |
</NODE> |
|
22 |
|
|
31 | 23 |
<NODE name="PREPARE_ENV_COLLECTION" type="PrepareEnvCollectHadoopJobNode"> |
32 |
<DESCRIPTION>Put in the environment all the variable needed to the collection oozie job </DESCRIPTION>
|
|
24 |
<DESCRIPTION>Set in the environment all the variable needed to the collection oozie job</DESCRIPTION>
|
|
33 | 25 |
<PARAMETERS> |
34 |
<PARAM managedBy="user" name="hdfsBasePath" required="true" type="string"></PARAM> |
|
26 |
<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM> |
|
27 |
<PARAM required="true" type="string" name="collectionMode" managedBy="user" function="validValues(['REFRESH','INCREMENTAL'])"></PARAM> |
|
35 | 28 |
</PARAMETERS> |
36 | 29 |
<ARCS> |
37 | 30 |
<ARC to="COLLECT_HADOOP"/> |
38 | 31 |
</ARCS> |
39 | 32 |
</NODE> |
40 |
<NODE name="COLLECT_HADOOP" type="SubmitDnetHadoopJobNode"> |
|
33 |
|
|
34 |
<NODE name="COLLECT_HADOOP" type="SubmitHadoopJob"> |
|
41 | 35 |
<DESCRIPTION>Start the Hadoop Job</DESCRIPTION> |
42 | 36 |
<PARAMETERS> |
43 |
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopCollection</PARAM>
|
|
37 |
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
|
|
44 | 38 |
<PARAM managedBy="user" name="cluster" required="true" type="string">DHP</PARAM> |
45 | 39 |
<PARAM managedBy="system" name="envParams" required="true" type="string"> |
46 | 40 |
{ |
41 |
"mdId":"mdId", |
|
42 |
"collectionMode":"collectionMode", |
|
47 | 43 |
"apiDescription":"apiDescription", |
48 |
"mdStorePath":"mdStorePath", |
|
49 | 44 |
"sequenceFilePath":"sequenceFilePath", |
50 |
"dataSourceInfo":"dataSourceInfo" ,
|
|
45 |
"dataSourceInfo":"dataSourceInfo", |
|
51 | 46 |
"timestamp":"timestamp", |
52 | 47 |
"identifierPath":"identifierPath", |
53 | 48 |
"workflowId":"workflowId" |
... | ... | |
55 | 50 |
</PARAM> |
56 | 51 |
</PARAMETERS> |
57 | 52 |
<ARCS> |
58 |
<ARC to="COMMIT_VERSION"/> |
|
59 |
<ARC name="abort" to="ABORT_VERSION"/> |
|
53 |
<ARC to="UPDATE_INFO"/> |
|
60 | 54 |
</ARCS> |
61 | 55 |
</NODE> |
62 | 56 |
|
63 |
<NODE name="COMMIT_VERSION" type="CommitMDStoreVersion"> |
|
64 |
<DESCRIPTION>Commit the mdstore version</DESCRIPTION> |
|
65 |
<PARAMETERS/> |
|
66 |
<ARCS> |
|
67 |
<ARC to="UPDATE_INFO"/> |
|
68 |
</ARCS> |
|
69 |
</NODE> |
|
70 |
|
|
71 |
<NODE name="ABORT_VERSION" type="AbortMDStoreVersion"> |
|
72 |
<DESCRIPTION>Abort the mdstore version</DESCRIPTION> |
|
73 |
<PARAMETERS/> |
|
74 |
<ARCS> |
|
75 |
<ARC to="failure"/> |
|
76 |
</ARCS> |
|
77 |
</NODE> |
|
78 |
|
|
79 | 57 |
<NODE name="UPDATE_INFO" type="MDStoreToApiExtraFieldHadoop"> |
80 | 58 |
<DESCRIPTION>Update datasouce API extra fields</DESCRIPTION> |
81 | 59 |
<PARAMETERS> |
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/pubsRepositoryHadoop/transform.wf.st | ||
---|---|---|
16 | 16 |
<PARAM required="true" type="string" name="providerId" managedBy="system">$params.("dataprovider:id")$</PARAM> |
17 | 17 |
</PARAMETERS> |
18 | 18 |
<ARCS> |
19 |
<ARC to="START_READING"/>
|
|
19 |
<ARC to="PREPARE_ENV_TRANSFORMATION"/>
|
|
20 | 20 |
</ARCS> |
21 | 21 |
</NODE> |
22 | 22 |
|
23 |
<NODE name="START_READING" type="StartReadingMDStore"> |
|
24 |
<DESCRIPTION>Start reading Hadoop MD Store</DESCRIPTION> |
|
25 |
<PARAMETERS> |
|
26 |
<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("harv_id")$</PARAM> |
|
27 |
</PARAMETERS> |
|
28 |
<ARCS> |
|
29 |
<ARC to="PREPARE_STORE_VERSION"/> |
|
30 |
</ARCS> |
|
31 |
</NODE> |
|
32 |
|
|
33 |
<NODE name="PREPARE_STORE_VERSION" type="PrepareMDStoreVersion"> |
|
34 |
<DESCRIPTION>Prepare a new MdStore Version</DESCRIPTION> |
|
35 |
<PARAMETERS> |
|
36 |
<PARAM required="true" type="string" name="mdId" managedBy="system">$params.("tran_id")$</PARAM> |
|
37 |
</PARAMETERS> |
|
38 |
<ARCS> |
|
39 |
<ARC to="PREPARE_ENV_TRANSFORMATION"/> |
|
40 |
</ARCS> |
|
41 |
</NODE> |
|
42 | 23 |
<NODE name="PREPARE_ENV_TRANSFORMATION" type="PrepareEnvTransformHadoopJobNode"> |
43 | 24 |
<DESCRIPTION>Retrieve all the parameters needed to run the transformation workflow</DESCRIPTION> |
44 | 25 |
<PARAMETERS> |
45 | 26 |
<PARAM category="TRANSFORMATION_RULE_ID" function="listProfiles('TransformationRuleDSResourceType', '//TITLE')" managedBy="user" name="ruleId" required="true" type="string"></PARAM> |
46 |
<PARAM managedBy="user" name="hdfsBasePath" required="true" type="string"></PARAM> |
|
27 |
<PARAM required="true" type="string" name="mdstoreInput" managedBy="system">$params.("harv_id")$</PARAM> |
|
28 |
<PARAM required="true" type="string" name="mdstoreOutput" managedBy="system">$params.("tran_id")$</PARAM> |
|
47 | 29 |
</PARAMETERS> |
48 | 30 |
<ARCS> |
49 | 31 |
<ARC to="TRANSFORM_HADOOP"/> |
50 | 32 |
</ARCS> |
51 | 33 |
</NODE> |
52 |
<NODE name="TRANSFORM_HADOOP" type="SubmitDnetHadoopJobNode"> |
|
34 |
|
|
35 |
<NODE name="TRANSFORM_HADOOP" type="SubmitHadoopJob"> |
|
53 | 36 |
<DESCRIPTION>Start the Hadoop Job</DESCRIPTION> |
54 | 37 |
<PARAMETERS> |
55 |
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">dnetHadoopTrasnformation</PARAM>
|
|
38 |
<PARAM managedBy="system" name="hadoopJob" required="true" type="string">executeOozieJob</PARAM>
|
|
56 | 39 |
<PARAM managedBy="user" name="cluster" required="true" type="string">DHP</PARAM> |
57 | 40 |
<PARAM managedBy="system" name="envParams" required="true" type="string"> |
58 | 41 |
{ |
59 |
"mdstoreInputPath":"mdstoreInputPath",
|
|
60 |
"mdstoreOutputPath":"mdstoreOutputPath",
|
|
42 |
"mdstoreInput":"mdstoreInput",
|
|
43 |
"mdstoreOutput":"mdstoreOutput",
|
|
61 | 44 |
"transformationRule":"transformationRule", |
62 | 45 |
"timestamp":"timestamp", |
63 | 46 |
"workflowId":"workflowId" |
... | ... | |
65 | 48 |
</PARAM> |
66 | 49 |
</PARAMETERS> |
67 | 50 |
<ARCS> |
68 |
<ARC to="COMMIT_VERSION"/> |
|
69 |
<ARC name="abort" to="ABORT_VERSION"/> |
|
70 |
</ARCS> |
|
71 |
</NODE> |
|
72 |
<NODE name="COMMIT_VERSION" type="CommitMDStoreVersion"> |
|
73 |
<DESCRIPTION>Commit the mdstore version</DESCRIPTION> |
|
74 |
<PARAMETERS/> |
|
75 |
<ARCS> |
|
76 |
<ARC to="END_READING"/> |
|
77 |
</ARCS> |
|
78 |
</NODE> |
|
79 |
<NODE name="END_READING" type="EndReadingMDStore"> |
|
80 |
<DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION> |
|
81 |
<PARAMETERS/> |
|
82 |
<ARCS> |
|
83 | 51 |
<ARC to="UPDATE_INFO"/> |
84 | 52 |
</ARCS> |
85 | 53 |
</NODE> |
86 |
<NODE name="ABORT_VERSION" type="AbortMDStoreVersion"> |
|
87 |
<DESCRIPTION>Abort the mdstore version</DESCRIPTION> |
|
88 |
<PARAMETERS/> |
|
89 |
<ARCS> |
|
90 |
<ARC to="END_READING_ABORT"/> |
|
91 |
</ARCS> |
|
92 |
</NODE> |
|
93 |
<NODE name="END_READING_ABORT" type="EndReadingMDStore"> |
|
94 |
<DESCRIPTION>End reading Hadoop MD Store</DESCRIPTION> |
|
95 |
<PARAMETERS/> |
|
96 |
<ARCS> |
|
97 |
<ARC to="failure"/> |
|
98 |
</ARCS> |
|
99 |
</NODE> |
|
100 | 54 |
|
101 | 55 |
<NODE name="UPDATE_INFO" type="MDStoreToApiExtraFieldHadoop"> |
102 | 56 |
<DESCRIPTION>Update datasouce API extra fields</DESCRIPTION> |
Also available in: Unified diff
updated hadoop-specific aggregation workflows