Revision 32891
Added by Michele Artini almost 10 years ago
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/SubmitHadoopJobNode.java | ||
---|---|---|
12 | 12 |
|
13 | 13 |
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions; |
14 | 14 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobType; |
15 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
15 | 16 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
16 | 17 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
17 | 18 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
... | ... | |
35 | 36 |
private boolean simulation = false; |
36 | 37 |
|
37 | 38 |
@Override |
38 |
protected String getXqueryForServiceId(final NodeToken token) {
|
|
39 |
return "collection('/db/DRIVER/ServiceResources/HadoopServiceResourceType')//RESOURCE_IDENTIFIER/@value/string()";
|
|
39 |
protected String obtainServiceId(final NodeToken token) {
|
|
40 |
return getServiceLocator().getServiceId(HadoopService.class);
|
|
40 | 41 |
} |
41 | 42 |
|
42 | 43 |
@Override |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/StoreHBaseRecordsJobNode.java | ||
---|---|---|
13 | 13 |
import com.googlecode.sarasvati.env.Env; |
14 | 14 |
|
15 | 15 |
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions; |
16 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
16 | 17 |
import eu.dnetlib.enabling.resultset.rmi.ResultSetException; |
17 | 18 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
18 | 19 |
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions; |
... | ... | |
38 | 39 |
private ProcessCountingResultSetFactory processCountingResultSetFactory; |
39 | 40 |
|
40 | 41 |
@Override |
41 |
protected String getXqueryForServiceId(final NodeToken token) {
|
|
42 |
return "collection('/db/DRIVER/ServiceResources/HadoopServiceResourceType')//RESOURCE_IDENTIFIER/@value/string()";
|
|
42 |
protected String obtainServiceId(final NodeToken token) {
|
|
43 |
return getServiceLocator().getServiceId(HadoopService.class);
|
|
43 | 44 |
} |
44 | 45 |
|
45 | 46 |
@Override |
... | ... | |
54 | 55 |
} |
55 | 56 |
|
56 | 57 |
@Override |
57 |
protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
|
|
58 |
protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
|
|
58 | 59 |
return new BlackboardWorkflowJobListener(engine, token) { |
59 | 60 |
|
60 | 61 |
@Override |
61 |
protected void populateEnv(final Env env, Map<String, String> responseParams) { |
|
62 |
protected void populateEnv(final Env env, final Map<String, String> responseParams) {
|
|
62 | 63 |
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getName() + ":count", responseParams.get("count")); |
63 | 64 |
} |
64 | 65 |
}; |
... | ... | |
89 | 90 |
return hbaseTableProperty; |
90 | 91 |
} |
91 | 92 |
|
92 |
public void setHbaseTableProperty(String hbaseTableProperty) { |
|
93 |
public void setHbaseTableProperty(final String hbaseTableProperty) {
|
|
93 | 94 |
this.hbaseTableProperty = hbaseTableProperty; |
94 | 95 |
} |
95 | 96 |
|
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hbase/StoreHdfsRecordsJobNode.java | ||
---|---|---|
11 | 11 |
import com.googlecode.sarasvati.env.Env; |
12 | 12 |
|
13 | 13 |
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions; |
14 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
14 | 15 |
import eu.dnetlib.enabling.resultset.rmi.ResultSetException; |
15 | 16 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
16 | 17 |
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions; |
... | ... | |
35 | 36 |
private ProcessCountingResultSetFactory processCountingResultSetFactory; |
36 | 37 |
|
37 | 38 |
@Override |
38 |
protected String getXqueryForServiceId(final NodeToken token) {
|
|
39 |
return "collection('/db/DRIVER/ServiceResources/HadoopServiceResourceType')//RESOURCE_IDENTIFIER/@value/string()";
|
|
39 |
protected String obtainServiceId(final NodeToken token) {
|
|
40 |
return getServiceLocator().getServiceId(HadoopService.class);
|
|
40 | 41 |
} |
41 | 42 |
|
42 | 43 |
@Override |
... | ... | |
48 | 49 |
job.getParameters().put("path", token.getEnv().getAttribute(getHdfsPathParam())); |
49 | 50 |
job.getParameters().put("cluster", getCluster()); |
50 | 51 |
} |
51 |
|
|
52 |
|
|
52 | 53 |
@Override |
53 |
protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
|
|
54 |
protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
|
|
54 | 55 |
return new BlackboardWorkflowJobListener(engine, token) { |
55 |
@Override |
|
56 |
protected void populateEnv(final Env env, Map<String, String> responseParams) { |
|
57 |
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getName() + ":count", responseParams.get("count")); |
|
58 |
} |
|
56 |
|
|
57 |
@Override |
|
58 |
protected void populateEnv(final Env env, final Map<String, String> responseParams) { |
|
59 |
env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getName() + ":count", responseParams.get("count")); |
|
60 |
} |
|
59 | 61 |
}; |
60 |
}
|
|
62 |
} |
|
61 | 63 |
|
62 | 64 |
private String prepareEpr(final NodeToken token) throws ResultSetException { |
63 | 65 |
final String epr = token.getEnv().getAttribute(inputEprParam); |
... | ... | |
106 | 108 |
return hdfsPathParam; |
107 | 109 |
} |
108 | 110 |
|
109 |
public void setHdfsPathParam(String hdfsPathParam) { |
|
111 |
public void setHdfsPathParam(final String hdfsPathParam) {
|
|
110 | 112 |
this.hdfsPathParam = hdfsPathParam; |
111 | 113 |
} |
112 | 114 |
|
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/objectStore/ImportFilesJobNode.java | ||
---|---|---|
10 | 10 |
|
11 | 11 |
import com.googlecode.sarasvati.NodeToken; |
12 | 12 |
|
13 |
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService; |
|
13 | 14 |
import eu.dnetlib.data.objectstore.rmi.Protocols; |
14 | 15 |
import eu.dnetlib.enabling.resultset.IterableResultSetFactory; |
15 | 16 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
... | ... | |
53 | 54 |
} |
54 | 55 |
|
55 | 56 |
@Override |
56 |
protected String getXqueryForServiceId(final NodeToken token) {
|
|
57 |
return "//RESOURCE_IDENTIFIER[../RESOURCE_TYPE/@value='ObjectStoreServiceResourceType']/@value/string()";
|
|
57 |
protected String obtainServiceId(final NodeToken token) {
|
|
58 |
return getServiceLocator().getServiceId(ObjectStoreService.class);
|
|
58 | 59 |
} |
59 | 60 |
|
60 | 61 |
@Override |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/stats/ValidateShadowStatsJobNode.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import com.googlecode.sarasvati.NodeToken; |
4 | 4 |
|
5 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
6 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
5 | 7 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
6 | 8 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
7 | 9 |
|
... | ... | |
17 | 19 |
private String xqueryForServiceIdParam; |
18 | 20 |
|
19 | 21 |
@Override |
20 |
protected String getXqueryForServiceId(final NodeToken token) { |
|
21 |
return token.getEnv().getAttribute(getXqueryForServiceIdParam()); |
|
22 |
protected String obtainServiceId(final NodeToken token) { |
|
23 |
final String xquery = token.getEnv().getAttribute(getXqueryForServiceIdParam()); |
|
24 |
try { |
|
25 |
return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xquery); |
|
26 |
} catch (ISLookUpException e) { |
|
27 |
throw new RuntimeException("Service id not found using query: " + xquery, e); |
|
28 |
} |
|
22 | 29 |
} |
23 | 30 |
|
24 | 31 |
@Override |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/stats/RefreshShadowCacheJobNode.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import com.googlecode.sarasvati.NodeToken; |
4 | 4 |
|
5 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
6 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
5 | 7 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
6 | 8 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
7 | 9 |
|
... | ... | |
16 | 18 |
private String xqueryForServiceIdParam; |
17 | 19 |
|
18 | 20 |
@Override |
19 |
protected String getXqueryForServiceId(final NodeToken token) { |
|
20 |
return token.getEnv().getAttribute(getXqueryForServiceIdParam()); |
|
21 |
protected String obtainServiceId(final NodeToken token) { |
|
22 |
final String xquery = token.getEnv().getAttribute(getXqueryForServiceIdParam()); |
|
23 |
try { |
|
24 |
return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xquery); |
|
25 |
} catch (ISLookUpException e) { |
|
26 |
throw new RuntimeException("Service id not found using query: " + xquery, e); |
|
27 |
} |
|
21 | 28 |
} |
22 | 29 |
|
23 | 30 |
@Override |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/stats/PromoteShadowStatsJobNode.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import com.googlecode.sarasvati.NodeToken; |
4 | 4 |
|
5 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
6 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
5 | 7 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
6 | 8 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
7 | 9 |
|
... | ... | |
16 | 18 |
private String xqueryForServiceIdParam; |
17 | 19 |
|
18 | 20 |
@Override |
19 |
protected String getXqueryForServiceId(final NodeToken token) { |
|
20 |
return token.getEnv().getAttribute(getXqueryForServiceIdParam()); |
|
21 |
protected String obtainServiceId(final NodeToken token) { |
|
22 |
final String xquery = token.getEnv().getAttribute(getXqueryForServiceIdParam()); |
|
23 |
try { |
|
24 |
return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xquery); |
|
25 |
} catch (ISLookUpException e) { |
|
26 |
throw new RuntimeException("Service id not found using query: " + xquery, e); |
|
27 |
} |
|
21 | 28 |
} |
22 | 29 |
|
23 | 30 |
@Override |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/actions/PromoteActionsJobNode.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import com.googlecode.sarasvati.NodeToken; |
4 | 4 |
|
5 |
import eu.dnetlib.actionmanager.rmi.ActionManagerService; |
|
5 | 6 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
6 | 7 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
7 | 8 |
|
... | ... | |
12 | 13 |
private String set; |
13 | 14 |
|
14 | 15 |
@Override |
15 |
protected String getXqueryForServiceId(final NodeToken token) {
|
|
16 |
return "//RESOURCE_IDENTIFIER[../RESOURCE_TYPE/@value='ActionManagerServiceResourceType']/@value/string()";
|
|
16 |
protected String obtainServiceId(final NodeToken token) {
|
|
17 |
return getServiceLocator().getServiceId(ActionManagerService.class);
|
|
17 | 18 |
} |
18 | 19 |
|
19 | 20 |
@Override |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/actions/GarbageSetsJobNode.java | ||
---|---|---|
5 | 5 |
|
6 | 6 |
import com.googlecode.sarasvati.NodeToken; |
7 | 7 |
|
8 |
import eu.dnetlib.actionmanager.rmi.ActionManagerService; |
|
8 | 9 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
9 | 10 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
10 | 11 |
|
... | ... | |
16 | 17 |
private static final Log log = LogFactory.getLog(GarbageSetsJobNode.class); |
17 | 18 |
|
18 | 19 |
@Override |
19 |
protected String getXqueryForServiceId(NodeToken token) {
|
|
20 |
return "//RESOURCE_IDENTIFIER[../RESOURCE_TYPE/@value='ActionManagerServiceResourceType']/@value/string()";
|
|
20 |
protected String obtainServiceId(final NodeToken token) {
|
|
21 |
return getServiceLocator().getServiceId(ActionManagerService.class);
|
|
21 | 22 |
} |
22 | 23 |
|
23 | 24 |
@Override |
24 |
protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
|
|
25 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
|
|
25 | 26 |
log.info("preparing garbage collection for Action sets Job"); |
26 | 27 |
job.setAction("GARBAGE"); |
27 | 28 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/download/DownloadFromMetadataJobNode.java | ||
---|---|---|
4 | 4 |
|
5 | 5 |
import com.googlecode.sarasvati.NodeToken; |
6 | 6 |
|
7 |
import eu.dnetlib.data.objectstore.rmi.ObjectStoreService; |
|
7 | 8 |
import eu.dnetlib.data.objectstore.rmi.Protocols; |
8 | 9 |
import eu.dnetlib.enabling.resultset.IterableResultSetFactory; |
9 | 10 |
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils; |
... | ... | |
30 | 31 |
} |
31 | 32 |
|
32 | 33 |
@Override |
33 |
protected String getXqueryForServiceId(final NodeToken token) {
|
|
34 |
return "//RESOURCE_IDENTIFIER[../RESOURCE_TYPE/@value='ObjectStoreServiceResourceType']/@value/string()";
|
|
34 |
protected String obtainServiceId(final NodeToken token) {
|
|
35 |
return getServiceLocator().getServiceId(ObjectStoreService.class);
|
|
35 | 36 |
} |
36 | 37 |
|
37 | 38 |
@Override |
38 | 39 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception { |
39 |
final W3CEndpointReference inputEpr = (new EPRUtils()).getEpr(token.getEnv().getAttribute(inputEprParam));
|
|
40 |
final W3CEndpointReference inputEpr = new EPRUtils().getEpr(token.getEnv().getAttribute(inputEprParam));
|
|
40 | 41 |
this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), inputEpr); |
41 | 42 |
job.setAction("FEED"); |
42 | 43 |
job.getParameters().put("obsID", getObjectStoreId()); |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/DeleteHdfsPathJobNode.java | ||
---|---|---|
3 | 3 |
import com.googlecode.sarasvati.NodeToken; |
4 | 4 |
|
5 | 5 |
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions; |
6 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
6 | 7 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
7 | 8 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
8 | 9 |
|
... | ... | |
11 | 12 |
private String cluster; |
12 | 13 |
|
13 | 14 |
@Override |
14 |
protected String getXqueryForServiceId(NodeToken token) {
|
|
15 |
return "collection('/db/DRIVER/ServiceResources/HadoopServiceResourceType')//RESOURCE_IDENTIFIER/@value/string()";
|
|
15 |
protected String obtainServiceId(final NodeToken token) {
|
|
16 |
return getServiceLocator().getServiceId(HadoopService.class);
|
|
16 | 17 |
} |
17 | 18 |
|
18 | 19 |
@Override |
19 |
protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
|
|
20 |
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
|
|
20 | 21 |
|
21 | 22 |
job.setAction(HadoopBlackboardActions.DELETE_HDFS_PATH.toString()); |
22 | 23 |
job.getParameters().put("cluster", getCluster()); |
... | ... | |
29 | 30 |
return cluster; |
30 | 31 |
} |
31 | 32 |
|
32 |
public void setCluster(String cluster) { |
|
33 |
public void setCluster(final String cluster) {
|
|
33 | 34 |
this.cluster = cluster; |
34 | 35 |
} |
35 | 36 |
|
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/SwitchIndexJobNode.java | ||
---|---|---|
4 | 4 |
|
5 | 5 |
import com.googlecode.sarasvati.NodeToken; |
6 | 6 |
|
7 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
8 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
7 | 9 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
8 | 10 |
import eu.dnetlib.msro.rmi.MSROException; |
9 | 11 |
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode; |
... | ... | |
25 | 27 |
private String xqueryForServiceIdParam; |
26 | 28 |
|
27 | 29 |
@Override |
28 |
protected String getXqueryForServiceId(final NodeToken token) { |
|
29 |
return token.getEnv().getAttribute(getXqueryForServiceIdParam()); |
|
30 |
protected String obtainServiceId(final NodeToken token) { |
|
31 |
final String xquery = token.getEnv().getAttribute(getXqueryForServiceIdParam()); |
|
32 |
try { |
|
33 |
return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xquery); |
|
34 |
} catch (ISLookUpException e) { |
|
35 |
throw new RuntimeException("Service id not found using query: " + xquery, e); |
|
36 |
} |
|
30 | 37 |
} |
31 | 38 |
|
32 | 39 |
@Override |
... | ... | |
41 | 48 |
} |
42 | 49 |
|
43 | 50 |
private void checkParam(final String param, final String msg) throws MSROException { |
44 |
if (StringUtils.isBlank(param)) throw new MSROException(msg);
|
|
51 |
if (StringUtils.isBlank(param)) { throw new MSROException(msg); }
|
|
45 | 52 |
} |
46 | 53 |
|
47 | 54 |
public String getInputIndexIdParam() { |
modules/dnet-openaireplus-workflows/trunk/pom.xml | ||
---|---|---|
10 | 10 |
<groupId>eu.dnetlib</groupId> |
11 | 11 |
<artifactId>dnet-openaireplus-workflows</artifactId> |
12 | 12 |
<packaging>jar</packaging> |
13 |
<version>2.0.1-SNAPSHOT</version>
|
|
13 |
<version>2.1.0-SNAPSHOT</version>
|
|
14 | 14 |
<scm> |
15 | 15 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet40/modules/dnet-openaireplus-workflows/trunk</developerConnection> |
16 | 16 |
</scm> |
... | ... | |
23 | 23 |
<dependency> |
24 | 24 |
<groupId>eu.dnetlib</groupId> |
25 | 25 |
<artifactId>dnet-msro-service</artifactId> |
26 |
<version>[2.0.3,3.0.0)</version>
|
|
26 |
<version>[2.1.0,3.0.0)</version>
|
|
27 | 27 |
</dependency> |
28 | 28 |
<dependency> |
29 | 29 |
<groupId>eu.dnetlib</groupId> |
Also available in: Unified diff
BlackboardJobNode: obtainServiceId method