Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hbase;
2

    
3
import java.util.Map;
4

    
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Required;
8

    
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11
import com.googlecode.sarasvati.env.Env;
12

    
13
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
14
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
16
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions;
17
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
18
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
19
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
20
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
21
import eu.dnetlib.msro.workflows.util.ProgressProvider;
22
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
23
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
24

    
25
public class StoreHdfsRecordsJobNode extends BlackboardJobNode implements ProgressJobNode {
26

    
27
	private static final Log log = LogFactory.getLog(StoreHdfsRecordsJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
28

    
29
	private String inputEprParam;
30
	private String hdfsPathParam;
31
	private String cluster;
32

    
33
	private ProgressProvider progressProvider;
34

    
35
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
36

    
37
	@Override
38
	protected String getXqueryForServiceId(final NodeToken token) {
39
		return "collection('/db/DRIVER/ServiceResources/HadoopServiceResourceType')//RESOURCE_IDENTIFIER/@value/string()";
40
	}
41

    
42
	@Override
43
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
44
		log.info("Invoking blackboard method");
45

    
46
		job.setAction(HadoopBlackboardActions.IMPORT_EPR_HDFS.toString());
47
		job.getParameters().put("input_epr", DnetXsltFunctions.encodeBase64(prepareEpr(token)));
48
		job.getParameters().put("path", token.getEnv().getAttribute(getHdfsPathParam()));
49
		job.getParameters().put("cluster", getCluster());
50
	}
51
	
52
	@Override
53
	protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
54
		return new BlackboardWorkflowJobListener(engine, token) {
55
            @Override
56
            protected void populateEnv(final Env env, Map<String, String> responseParams) {
57
                    env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getName() + ":count", responseParams.get("count"));
58
            }
59
		};
60
	}	
61

    
62
	private String prepareEpr(final NodeToken token) throws ResultSetException {
63
		final String epr = token.getEnv().getAttribute(inputEprParam);
64
		final ResultsetProgressProvider resultsetProgressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
65

    
66
		setProgressProvider(resultsetProgressProvider);
67

    
68
		return resultsetProgressProvider.getEpr().toString();
69
	}
70

    
71
	public String getInputEprParam() {
72
		return inputEprParam;
73
	}
74

    
75
	public void setInputEprParam(final String inputEprParam) {
76
		this.inputEprParam = inputEprParam;
77
	}
78

    
79
	@Required
80
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
81
		this.processCountingResultSetFactory = processCountingResultSetFactory;
82
	}
83

    
84
	@Override
85
	public ProgressProvider getProgressProvider() {
86
		return progressProvider;
87
	}
88

    
89
	public void setProgressProvider(final ProgressProvider progressProvider) {
90
		this.progressProvider = progressProvider;
91
	}
92

    
93
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
94
		return processCountingResultSetFactory;
95
	}
96

    
97
	public String getCluster() {
98
		return cluster;
99
	}
100

    
101
	public void setCluster(final String cluster) {
102
		this.cluster = cluster;
103
	}
104

    
105
	public String getHdfsPathParam() {
106
		return hdfsPathParam;
107
	}
108

    
109
	public void setHdfsPathParam(String hdfsPathParam) {
110
		this.hdfsPathParam = hdfsPathParam;
111
	}
112

    
113
}
(9-9/10)