Revision 48139
Added by Alessia Bardi almost 7 years ago
PrepareCopyTableJobNode.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.util.Map; |
4 | 4 |
import java.util.Set; |
5 |
|
|
6 | 5 |
import javax.annotation.Resource; |
7 | 6 |
|
8 |
import org.apache.commons.lang.StringUtils; |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
|
|
12 | 7 |
import com.google.common.collect.Sets; |
13 | 8 |
import com.googlecode.sarasvati.Arc; |
14 | 9 |
import com.googlecode.sarasvati.NodeToken; |
15 |
|
|
16 | 10 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
17 | 11 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
18 | 12 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
19 | 13 |
import eu.dnetlib.msro.rmi.MSROException; |
20 | 14 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
15 |
import org.apache.commons.lang.StringUtils; |
|
16 |
import org.apache.commons.logging.Log; |
|
17 |
import org.apache.commons.logging.LogFactory; |
|
21 | 18 |
|
22 | 19 |
/** |
23 | 20 |
* The PrepareCopyTableJobNode prepares the parameters needed to run the CopyTable job. |
24 | 21 |
*/ |
25 | 22 |
public class PrepareCopyTableJobNode extends SimpleJobNode { |
26 | 23 |
|
27 |
/** The Constant ZOOKEEPER_ZNODE_PARENT. */ |
|
24 |
/** |
|
25 |
* The Constant ZOOKEEPER_ZNODE_PARENT. |
|
26 |
*/ |
|
28 | 27 |
private static final String ZOOKEEPER_ZNODE_PARENT = "zookeeper.znode.parent"; |
29 | 28 |
|
30 |
/** The Constant HBASE_ZOOKEEPER_CLIENT_PORT. */ |
|
29 |
/** |
|
30 |
* The Constant HBASE_ZOOKEEPER_CLIENT_PORT. |
|
31 |
*/ |
|
31 | 32 |
private static final String HBASE_ZOOKEEPER_CLIENT_PORT = "hbase.zookeeper.client.port"; |
32 | 33 |
|
33 |
/** The Constant HBASE_ZOOKEEPER_QUORUM. */ |
|
34 |
/** |
|
35 |
* The Constant HBASE_ZOOKEEPER_QUORUM. |
|
36 |
*/ |
|
34 | 37 |
private static final String HBASE_ZOOKEEPER_QUORUM = "hbase.zookeeper.quorum"; |
35 | 38 |
|
36 | 39 |
/** |
... | ... | |
38 | 41 |
*/ |
39 | 42 |
private static final Log log = LogFactory.getLog(PrepareCopyTableJobNode.class); |
40 | 43 |
|
41 |
/** The source table. */ |
|
44 |
/** |
|
45 |
* The source table. |
|
46 |
*/ |
|
42 | 47 |
private String sourceCluster; |
43 | 48 |
|
44 |
/** The target cluster. */ |
|
49 |
/** |
|
50 |
* The target cluster. |
|
51 |
*/ |
|
45 | 52 |
private String targetCluster; |
46 | 53 |
|
47 | 54 |
private String sourceTable; |
... | ... | |
75 | 82 |
|
76 | 83 |
/** |
77 | 84 |
* Builds the output quorum. |
78 |
*
|
|
85 |
* |
|
79 | 86 |
* @return the output quorum |
80 |
* @throws HadoopServiceException |
|
81 |
* when cannot retrieve the clustr configuration |
|
82 |
* @throws MSROException |
|
83 |
* when some of the needed properties is missing in the cluster configuration |
|
87 |
* @throws HadoopServiceException when cannot retrieve the clustr configuration |
|
88 |
* @throws MSROException when some of the needed properties is missing in the cluster configuration |
|
84 | 89 |
*/ |
85 | 90 |
private String getOutputQuorum() throws HadoopServiceException, MSROException { |
86 | 91 |
Map<String, String> conf = serviceLocator.getService(HadoopService.class).getClusterConfiguration(getTargetCluster()); |
... | ... | |
100 | 105 |
|
101 | 106 |
/** |
102 | 107 |
* Checks the wf params. |
103 |
* |
|
104 |
* @throws MSROException |
|
105 |
* the MSRO exception |
|
108 |
* |
|
109 |
* @throws MSROException the MSRO exception |
|
106 | 110 |
* @throws HadoopServiceException |
107 | 111 |
*/ |
108 | 112 |
private void checkNodeParams() throws MSROException, HadoopServiceException { |
... | ... | |
118 | 122 |
if (!clusters.contains(getSourceCluster())) { throw new MSROException(String.format("source cluster '%s' doesn not exists", getSourceCluster())); } |
119 | 123 |
if (!clusters.contains(getTargetCluster())) { throw new MSROException(String.format("target cluster '%s' doesn not exists", getTargetCluster())); } |
120 | 124 |
|
121 |
if (!hadoop.existHbaseTable(getSourceCluster(), getSourceTable())) { throw new MSROException(String.format( |
|
122 |
"source table '%s' doesn not exists on cluster '%s'", getSourceTable(), getSourceCluster())); } |
|
125 |
if (!hadoop.existHbaseTable(getSourceCluster(), getSourceTable())) { |
|
126 |
throw new MSROException(String.format( |
|
127 |
"source table '%s' doesn not exists on cluster '%s'", getSourceTable(), getSourceCluster())); |
|
128 |
} |
|
123 | 129 |
} |
124 | 130 |
|
125 | 131 |
/** |
126 | 132 |
* Check parameter existence. |
127 |
* |
|
128 |
* @param param |
|
129 |
* the param |
|
130 |
* @param msg |
|
131 |
* the msg |
|
132 |
* @throws MSROException |
|
133 |
* the MSRO exception |
|
133 |
* |
|
134 |
* @param param the param |
|
135 |
* @param msg the msg |
|
136 |
* @throws MSROException the MSRO exception |
|
134 | 137 |
*/ |
135 | 138 |
private void checkParamExist(final String param, final String msg) throws MSROException { |
136 | 139 |
if (StringUtils.isBlank(param)) { throw new MSROException(msg); } |
Also available in: Unified diff
integrated (hopefully) all required changes from dnet40