Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.hadoop;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5

    
6
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
7
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
8
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
9
import eu.dnetlib.miscutils.functional.xml.DnetXsltFunctions;
10
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12
import eu.dnetlib.msro.workflows.procs.Env;
13
import eu.dnetlib.msro.workflows.procs.Token;
14
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
15
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
16
import eu.dnetlib.rmi.common.ResultSet;
17
import eu.dnetlib.rmi.data.hadoop.HadoopBlackboardActions;
18
import eu.dnetlib.rmi.data.hadoop.HadoopService;
19
import eu.dnetlib.rmi.enabling.ISLookUpException;
20
import eu.dnetlib.rmi.enabling.ISLookUpService;
21
import eu.dnetlib.rmi.manager.MSROException;
22
import org.apache.commons.lang3.StringUtils;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.springframework.beans.factory.annotation.Autowired;
26

    
27
/**
28
 * Created by claudio on 08/04/16.
29
 */
30
public abstract class AbstractHBaseJobNode extends BlackboardJobNode {
31

    
32
	private static final Log log = LogFactory.getLog(AbstractHBaseJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
33

    
34
	private final String INPUT_HBASE_TABLE_PARAM = "hbaseTable";
35
	private final String INPUT_EPR_PARAM = "input_epr";
36
	private final String INPUT_CLUSTER_PARAM = "cluster";
37

    
38
	private final String XSLT_PARAM = "xslt";
39

    
40
	private final String OUTPUT_HBASE_TABLE_PARAM = "table";
41
	private final String OUTPUT_CLUSTER_PARAM = "cluster";
42
	private final String SIMULATION_PARAM = "simulation";
43

    
44
	@Autowired
45
	protected UniqueServiceLocator serviceLocator;
46

    
47
	@Autowired
48
	protected ResultSetClient resultSetClient;
49

    
50
	protected String inputEprParam;
51

    
52
	protected String hbaseTable;
53
	protected String cluster;
54
	protected String mapping;
55

    
56
	protected boolean simulation = false;
57

    
58
	protected abstract HadoopBlackboardActions getAction();
59

    
60
	@Override
61
	protected String obtainServiceId(final Env env) {
62
		return getServiceLocator().getServiceId(HadoopService.class);
63
	}
64

    
65
	@Override
66
	protected void prepareJob(final BlackboardJob job, final Token token) throws Exception {
67
		log.info("Invoking blackboard method: " + getAction().toString());
68

    
69
		final ResultSet<?> rs = token.getEnv().getAttribute(getInputEprParam(), ResultSet.class);
70
		token.setProgressProvider(new ResultsetProgressProvider(rs, resultSetClient));
71

    
72
		job.setAction(getAction().toString());
73
		job.getParameters().put(INPUT_EPR_PARAM, rs.toJson());
74
		job.getParameters().put(XSLT_PARAM, DnetXsltFunctions.encodeBase64(readXslt(getMapping())));
75
		job.getParameters().put(OUTPUT_HBASE_TABLE_PARAM, getHbaseTable());
76
		job.getParameters().put(OUTPUT_CLUSTER_PARAM, getCluster());
77
		job.getParameters().put(SIMULATION_PARAM, String.valueOf(isSimulation()));
78
	}
79

    
80
	@Override
81
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Token token) {
82
		return new BlackboardWorkflowJobListener(token) {
83

    
84
			@Override
85
			protected void responseToEnv(final Env env, final Map<String, String> responseParams) {
86
				final String count = responseParams.get("count");
87
				log.info(String.format("%s %s objects to HBase table %s, cluster %s", getAction().toString(), count, getHbaseTable(), getCluster()));
88
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + getNodeName() + ":count", count);
89
			}
90
		};
91
	}
92

    
93
	private String readXslt(final String profileId) throws IOException, MSROException, ISLookUpException {
94
		if (StringUtils.isBlank(profileId)) throw new MSROException("missing profile id");
95

    
96
		log.info("loading mapping from profile id: " + profileId);
97

    
98
		final String xquery =
99
				String.format("/RESOURCE_PROFILE[.//RESOURCE_IDENTIFIER/@value='%s']/BODY/CONFIGURATION/SCRIPT/CODE/*[local-name()='stylesheet']", profileId);
100
		return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(xquery);
101
	}
102

    
103
	public String getInputEprParam() {
104
		return inputEprParam;
105
	}
106

    
107
	public void setInputEprParam(final String inputEprParam) {
108
		this.inputEprParam = inputEprParam;
109
	}
110

    
111
	public String getMapping() {
112
		return mapping;
113
	}
114

    
115
	public void setMapping(final String mapping) {
116
		this.mapping = mapping;
117
	}
118

    
119
	public String getCluster() {
120
		return cluster;
121
	}
122

    
123
	public void setCluster(final String cluster) {
124
		this.cluster = cluster;
125
	}
126

    
127
	public boolean isSimulation() {
128
		return simulation;
129
	}
130

    
131
	public void setSimulation(final boolean simulation) {
132
		this.simulation = simulation;
133
	}
134

    
135
	public String getHbaseTable() {
136
		return hbaseTable;
137
	}
138

    
139
	public void setHbaseTable(final String hbaseTable) {
140
		this.hbaseTable = hbaseTable;
141
	}
142

    
143
}
(2-2/22)