Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.msro.openaireplus.workflows.nodes.hbase;
2
3
import java.util.Map;
4
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Required;
8
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11
import com.googlecode.sarasvati.env.Env;
12
13
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
14
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
16
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions;
17
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
18
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
19
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
20
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
21
import eu.dnetlib.msro.workflows.util.ProgressProvider;
22
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
23
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
24
25
public class StoreHdfsRecordsJobNode extends BlackboardJobNode implements ProgressJobNode {
26
27
	private static final Log log = LogFactory.getLog(StoreHdfsRecordsJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
28
29
	private String inputEprParam;
30
	private String hdfsPathParam;
31
	private String cluster;
32
33
	private ProgressProvider progressProvider;
34
35
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
36
37
	@Override
38
	protected String getXqueryForServiceId(final NodeToken token) {
39
		return "collection('/db/DRIVER/ServiceResources/HadoopServiceResourceType')//RESOURCE_IDENTIFIER/@value/string()";
40
	}
41
42
	@Override
43
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
44
		log.info("Invoking blackboard method");
45
46
		job.setAction(HadoopBlackboardActions.IMPORT_EPR_HDFS.toString());
47
		job.getParameters().put("input_epr", DnetXsltFunctions.encodeBase64(prepareEpr(token)));
48
		job.getParameters().put("path", token.getEnv().getAttribute(getHdfsPathParam()));
49
		job.getParameters().put("cluster", getCluster());
50
	}
51
52
	@Override
53
	protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
54
		return new BlackboardWorkflowJobListener(engine, token) {
55
            @Override
56
            protected void populateEnv(final Env env, Map<String, String> responseParams) {
57
                    env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getName() + ":count", responseParams.get("count"));
58
            }
59
		};
60
	}
61
62
	private String prepareEpr(final NodeToken token) throws ResultSetException {
63
		final String epr = token.getEnv().getAttribute(inputEprParam);
64
		final ResultsetProgressProvider resultsetProgressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
65
66
		setProgressProvider(resultsetProgressProvider);
67
68
		return resultsetProgressProvider.getEpr().toString();
69
	}
70
71
	public String getInputEprParam() {
72
		return inputEprParam;
73
	}
74
75
	public void setInputEprParam(final String inputEprParam) {
76
		this.inputEprParam = inputEprParam;
77
	}
78
79
	@Required
80
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
81
		this.processCountingResultSetFactory = processCountingResultSetFactory;
82
	}
83
84
	@Override
85
	public ProgressProvider getProgressProvider() {
86
		return progressProvider;
87
	}
88
89
	public void setProgressProvider(final ProgressProvider progressProvider) {
90
		this.progressProvider = progressProvider;
91
	}
92
93
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
94
		return processCountingResultSetFactory;
95
	}
96
97
	public String getCluster() {
98
		return cluster;
99
	}
100
101
	public void setCluster(final String cluster) {
102
		this.cluster = cluster;
103
	}
104
105
	public String getHdfsPathParam() {
106
		return hdfsPathParam;
107
	}
108
109
	public void setHdfsPathParam(String hdfsPathParam) {
110
		this.hdfsPathParam = hdfsPathParam;
111
	}
112
113
}