Project

General

Profile

1 35797 claudio.at
package eu.dnetlib.msro.workflows.hadoop;
2
3
import java.util.Map;
4
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Required;
8
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11
import com.googlecode.sarasvati.env.Env;
12
13
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
14
import eu.dnetlib.data.hadoop.rmi.HadoopService;
15
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
16
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
17
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions;
18
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
19
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
20
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
21
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
22
import eu.dnetlib.msro.workflows.util.ProgressProvider;
23
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
24
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
25
26
public class StoreHdfsRecordsJobNode extends BlackboardJobNode implements ProgressJobNode {
27
28
	private static final Log log = LogFactory.getLog(StoreHdfsRecordsJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
29
30
	private String inputEprParam;
31
	private String hdfsPathParam;
32
	private String cluster;
33
34
	private ProgressProvider progressProvider;
35
36
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
37
38
	@Override
39
	protected String obtainServiceId(final NodeToken token) {
40
		return getServiceLocator().getServiceId(HadoopService.class);
41
	}
42
43
	@Override
44
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
45
		log.info("Invoking blackboard method");
46
47
		job.setAction(HadoopBlackboardActions.IMPORT_EPR_HDFS.toString());
48
		job.getParameters().put("input_epr", DnetXsltFunctions.encodeBase64(prepareEpr(token)));
49
		job.getParameters().put("path", token.getEnv().getAttribute(getHdfsPathParam()));
50
		job.getParameters().put("cluster", getCluster());
51
	}
52
53
	@Override
54
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
55
		return new BlackboardWorkflowJobListener(engine, token) {
56
57
			@Override
58
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
59
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getName() + ":count", responseParams.get("count"));
60
			}
61
		};
62
	}
63
64
	private String prepareEpr(final NodeToken token) throws ResultSetException {
65
		final String epr = token.getEnv().getAttribute(inputEprParam);
66
		final ResultsetProgressProvider resultsetProgressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
67
68
		setProgressProvider(resultsetProgressProvider);
69
70
		return resultsetProgressProvider.getEpr().toString();
71
	}
72
73
	public String getInputEprParam() {
74
		return inputEprParam;
75
	}
76
77
	public void setInputEprParam(final String inputEprParam) {
78
		this.inputEprParam = inputEprParam;
79
	}
80
81
	@Required
82
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
83
		this.processCountingResultSetFactory = processCountingResultSetFactory;
84
	}
85
86
	@Override
87
	public ProgressProvider getProgressProvider() {
88
		return progressProvider;
89
	}
90
91
	public void setProgressProvider(final ProgressProvider progressProvider) {
92
		this.progressProvider = progressProvider;
93
	}
94
95
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
96
		return processCountingResultSetFactory;
97
	}
98
99
	public String getCluster() {
100
		return cluster;
101
	}
102
103
	public void setCluster(final String cluster) {
104
		this.cluster = cluster;
105
	}
106
107
	public String getHdfsPathParam() {
108
		return hdfsPathParam;
109
	}
110
111
	public void setHdfsPathParam(final String hdfsPathParam) {
112
		this.hdfsPathParam = hdfsPathParam;
113
	}
114
115
}