Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hbase;
2

    
3
import java.util.Map;
4
import java.util.Set;
5
import javax.annotation.Resource;
6

    
7
import com.google.common.collect.Sets;
8
import com.googlecode.sarasvati.Arc;
9
import com.googlecode.sarasvati.NodeToken;
10
import eu.dnetlib.data.hadoop.rmi.HadoopService;
11
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13
import eu.dnetlib.msro.rmi.MSROException;
14
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
15
import org.apache.commons.lang.StringUtils;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18

    
19
/**
20
 * The PrepareCopyTableJobNode prepares the parameters needed to run the CopyTable job.
21
 */
22
public class PrepareCopyTableJobNode extends SimpleJobNode {
23

    
24
	/**
25
	 * The Constant ZOOKEEPER_ZNODE_PARENT.
26
	 */
27
	private static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent";
28

    
29
	/**
30
	 * The Constant HBASE_ZOOKEEPER_CLIENT_PORT.
31
	 */
32
	private static final String HBASE_ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.client.port";
33

    
34
	/**
35
	 * The Constant HBASE_ZOOKEEPER_QUORUM.
36
	 */
37
	private static final String HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum";
38

    
39
	/**
40
	 * logger.
41
	 */
42
	private static final Log log = LogFactory.getLog(PrepareCopyTableJobNode.class);
43

    
44
	/**
45
	 * The source table.
46
	 */
47
	private String sourceCluster;
48

    
49
	/**
50
	 * The target cluster.
51
	 */
52
	private String targetCluster;
53

    
54
	private String sourceTable;
55

    
56
	private String targetTable;
57

    
58
	@Resource
59
	private UniqueServiceLocator serviceLocator;
60

    
61
	/*
62
	 * (non-Javadoc)
63
	 * 
64
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
65
	 */
66
	@Override
67
	protected String execute(final NodeToken token) throws Exception {
68

    
69
		checkNodeParams();
70

    
71
		final String outputQuorum = getOutputQuorum();
72
		log.info("build hbase quorum: " + outputQuorum);
73
		token.getEnv().setAttribute("peer.adr", outputQuorum);
74
		token.getEnv().setAttribute("sourceCluster", getSourceCluster());
75
		token.getEnv().setAttribute("sourceTable", getSourceTable());
76

    
77
		token.getEnv().setAttribute("targetCluster", getTargetCluster());
78
		token.getEnv().setAttribute("targetTable", getTargetTable());
79

    
80
		return Arc.DEFAULT_ARC;
81
	}
82

    
83
	/**
84
	 * Builds the output quorum.
85
	 *
86
	 * @return the output quorum
87
	 * @throws HadoopServiceException when cannot retrieve the clustr configuration
88
	 * @throws MSROException          when some of the needed properties is missing in the cluster configuration
89
	 */
90
	private String getOutputQuorum() throws HadoopServiceException, MSROException {
91
		Map<String, String> conf = serviceLocator.getService(HadoopService.class).getClusterConfiguration(getTargetCluster());
92
		log.debug(conf);
93

    
94
		String hbaseQuorum = conf.get(HBASE_ZOOKEEPER_QUORUM);
95
		String hbasePort = conf.get(HBASE_ZOOKEEPER_CLIENT_PORT);
96
		String znodeParent = conf.get(ZOOKEEPER_ZNODE_PARENT);
97

    
98
		checkParamExist(hbaseQuorum, String.format("unable to find property '%s' in cluster configuration: %s", HBASE_ZOOKEEPER_QUORUM, hbaseQuorum));
99
		checkParamExist(hbasePort, String.format("unable to find property '%s' in cluster configuration: %s", HBASE_ZOOKEEPER_CLIENT_PORT, hbasePort));
100
		checkParamExist(znodeParent, String.format("unable to find property '%s' in cluster configuration: %s", ZOOKEEPER_ZNODE_PARENT, znodeParent));
101

    
102
		String outputQuorum = String.format("%s:%s:%s", hbaseQuorum, hbasePort, znodeParent);
103
		return outputQuorum;
104
	}
105

    
106
	/**
107
	 * Checks the wf params.
108
	 *
109
	 * @throws MSROException          the MSRO exception
110
	 * @throws HadoopServiceException
111
	 */
112
	private void checkNodeParams() throws MSROException, HadoopServiceException {
113

    
114
		checkParamExist(getSourceCluster(), "source cluster must be set");
115
		checkParamExist(getTargetCluster(), "target cluster must be set");
116
		checkParamExist(getSourceTable(), "source table must be set");
117
		checkParamExist(getTargetTable(), "target table must be set");
118

    
119
		final HadoopService hadoop = serviceLocator.getService(HadoopService.class);
120

    
121
		Set<String> clusters = Sets.newHashSet(hadoop.listClusters());
122
		if (!clusters.contains(getSourceCluster())) { throw new MSROException(String.format("source cluster '%s' doesn not exists", getSourceCluster())); }
123
		if (!clusters.contains(getTargetCluster())) { throw new MSROException(String.format("target cluster '%s' doesn not exists", getTargetCluster())); }
124

    
125
		if (!hadoop.existHbaseTable(getSourceCluster(), getSourceTable())) {
126
			throw new MSROException(String.format(
127
					"source table '%s' doesn not exists on cluster '%s'", getSourceTable(), getSourceCluster()));
128
		}
129
	}
130

    
131
	/**
132
	 * Check parameter existence.
133
	 *
134
	 * @param param the param
135
	 * @param msg   the msg
136
	 * @throws MSROException the MSRO exception
137
	 */
138
	private void checkParamExist(final String param, final String msg) throws MSROException {
139
		if (StringUtils.isBlank(param)) { throw new MSROException(msg); }
140
	}
141

    
142
	public String getSourceCluster() {
143
		return sourceCluster;
144
	}
145

    
146
	public void setSourceCluster(final String sourceCluster) {
147
		this.sourceCluster = sourceCluster;
148
	}
149

    
150
	public String getTargetCluster() {
151
		return targetCluster;
152
	}
153

    
154
	public void setTargetCluster(final String targetCluster) {
155
		this.targetCluster = targetCluster;
156
	}
157

    
158
	public String getSourceTable() {
159
		return sourceTable;
160
	}
161

    
162
	public void setSourceTable(final String sourceTable) {
163
		this.sourceTable = sourceTable;
164
	}
165

    
166
	public String getTargetTable() {
167
		return targetTable;
168
	}
169

    
170
	public void setTargetTable(final String targetTable) {
171
		this.targetTable = targetTable;
172
	}
173

    
174
}
    (1-1/1)