Project

General

Profile

1
package eu.dnetlib.msro.workflows.hadoop.hbase;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5

    
6
import org.apache.commons.io.IOUtils;
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Required;
10

    
11
import com.googlecode.sarasvati.Engine;
12
import com.googlecode.sarasvati.NodeToken;
13
import com.googlecode.sarasvati.env.Env;
14

    
15
import eu.dnetlib.data.hadoop.rmi.HadoopBlackboardActions;
16
import eu.dnetlib.data.hadoop.rmi.HadoopService;
17
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
18
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
19
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions;
20
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
21
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
22
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
23
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
24
import eu.dnetlib.msro.workflows.util.ProgressProvider;
25
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
26
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
27

    
28
public class DeleteHBaseRecordsJobNode extends BlackboardJobNode implements ProgressJobNode {
29

    
30
	private static final Log log = LogFactory.getLog(DeleteHBaseRecordsJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
31

    
32
	private String inputEprParam;
33
	private String hbaseTableProperty;
34
	private String cluster;
35
	private String xslt;
36

    
37
	private boolean simulation = false;
38

    
39
	private ProgressProvider progressProvider;
40

    
41
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
42

    
43
	@Override
44
	protected String obtainServiceId(final NodeToken token) {
45
		return getServiceLocator().getServiceId(HadoopService.class);
46
	}
47

    
48
	@Override
49
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
50
		log.info("Invoking blackboard method");
51

    
52
		job.setAction(HadoopBlackboardActions.DELETE_EPR_HBASE.toString());
53
		job.getParameters().put("input_epr", DnetXsltFunctions.encodeBase64(prepareEpr(token)));
54
		job.getParameters().put("xslt", DnetXsltFunctions.encodeBase64(prepareXslt()));
55
		job.getParameters().put("table", getPropertyFetcher().getProperty(getHbaseTableProperty()));
56
		job.getParameters().put("cluster", cluster);
57
		job.getParameters().put("simulation", String.valueOf(isSimulation()));
58
	}
59

    
60
	@Override
61
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
62
		return new BlackboardWorkflowJobListener(engine, token) {
63

    
64
			@Override
65
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
66
				final String count = responseParams.get("count");
67
				log.info(String.format("Deleted %s objects from HBase table %s, cluster %s", count, getPropertyFetcher().getProperty(getHbaseTableProperty()),
68
						getCluster()));
69
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getName() + ":count", count);
70
			}
71
		};
72
	}
73

    
74
	private String prepareEpr(final NodeToken token) throws ResultSetException {
75
		final String epr = token.getEnv().getAttribute(inputEprParam);
76
		final ResultsetProgressProvider resultsetProgressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), epr);
77

    
78
		setProgressProvider(resultsetProgressProvider);
79

    
80
		return resultsetProgressProvider.getEpr().toString();
81
	}
82

    
83
	private String prepareXslt() throws IOException {
84
		return (xslt == null) || xslt.isEmpty() ? "" : IOUtils.toString(getClass().getResourceAsStream(xslt));
85
	}
86

    
87
	public String getInputEprParam() {
88
		return inputEprParam;
89
	}
90

    
91
	public void setInputEprParam(final String inputEprParam) {
92
		this.inputEprParam = inputEprParam;
93
	}
94

    
95
	public String getHbaseTableProperty() {
96
		return hbaseTableProperty;
97
	}
98

    
99
	public void setHbaseTableProperty(final String hbaseTableProperty) {
100
		this.hbaseTableProperty = hbaseTableProperty;
101
	}
102

    
103
	@Required
104
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
105
		this.processCountingResultSetFactory = processCountingResultSetFactory;
106
	}
107

    
108
	@Override
109
	public ProgressProvider getProgressProvider() {
110
		return progressProvider;
111
	}
112

    
113
	public void setProgressProvider(final ProgressProvider progressProvider) {
114
		this.progressProvider = progressProvider;
115
	}
116

    
117
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
118
		return processCountingResultSetFactory;
119
	}
120

    
121
	public String getXslt() {
122
		return xslt;
123
	}
124

    
125
	public void setXslt(final String xslt) {
126
		this.xslt = xslt;
127
	}
128

    
129
	public String getCluster() {
130
		return cluster;
131
	}
132

    
133
	public void setCluster(final String cluster) {
134
		this.cluster = cluster;
135
	}
136

    
137
	public boolean isSimulation() {
138
		return simulation;
139
	}
140

    
141
	public void setSimulation(final boolean simulation) {
142
		this.simulation = simulation;
143
	}
144

    
145
}
(4-4/8)