1
|
package eu.dnetlib.data.hadoop.action;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.List;
|
5
|
import java.util.Map;
|
6
|
import java.util.Map.Entry;
|
7
|
|
8
|
import com.google.common.base.Joiner;
|
9
|
import com.google.common.collect.Iterables;
|
10
|
import com.google.common.collect.Lists;
|
11
|
import eu.dnetlib.data.hadoop.HadoopJob;
|
12
|
import eu.dnetlib.data.hadoop.config.ClusterName;
|
13
|
import eu.dnetlib.data.hadoop.mapred.MapreduceJobMonitor;
|
14
|
import eu.dnetlib.data.hadoop.rmi.HadoopJobType.AdminJobType;
|
15
|
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
|
16
|
import eu.dnetlib.data.hadoop.utils.JobProfile;
|
17
|
import eu.dnetlib.data.hadoop.utils.ScanProperties;
|
18
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
|
19
|
import org.apache.commons.lang.StringUtils;
|
20
|
import org.apache.commons.logging.Log;
|
21
|
import org.apache.commons.logging.LogFactory;
|
22
|
import org.apache.hadoop.conf.Configuration;
|
23
|
import org.apache.hadoop.hbase.mapreduce.CopyTable;
|
24
|
import org.apache.hadoop.mapred.JobConf;
|
25
|
import org.apache.hadoop.mapred.RunningJob;
|
26
|
import org.apache.hadoop.mapreduce.Job;
|
27
|
|
28
|
/**
|
29
|
* The Class SubmitAdminJobAction.
|
30
|
*/
|
31
|
public class SubmitAdminJobAction extends SubmitMapreduceJobAction {
|
32
|
|
33
|
/**
|
34
|
* logger.
|
35
|
*/
|
36
|
private static final Log log = LogFactory.getLog(SubmitAdminJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM
|
37
|
|
38
|
/*
|
39
|
* (non-Javadoc)
|
40
|
*
|
41
|
* @see
|
42
|
* eu.dnetlib.data.hadoop.action.SubmitMapreduceJobAction#executeAsync(eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler,
|
43
|
* eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
|
44
|
*/
|
45
|
@Override
|
46
|
public void submit(final JobCompletion callback, final BlackboardJob bbJob, String jobName, JobProfile jobProfile) throws HadoopServiceException {
|
47
|
final ClusterName clusterName = ClusterName.valueOf(bbJob.getParameters().get("cluster"));
|
48
|
|
49
|
try {
|
50
|
JobConf jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
|
51
|
|
52
|
jobConf = initAdminJob(bbJob, jobName, jobProfile, jobConf);
|
53
|
|
54
|
if (!hadoopClientMap.isMapreduceAvailable(clusterName))
|
55
|
throw new HadoopServiceException("mapreduce not available for cluster: " + clusterName.toString());
|
56
|
|
57
|
logJobDetails(jobConf);
|
58
|
|
59
|
final RunningJob runningJob = hadoopClientMap.getJtClient(clusterName).submitJob(jobConf);
|
60
|
final String jobId = newJobId(clusterName, runningJob.getID().getId());
|
61
|
|
62
|
jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
|
63
|
new MapreduceJobMonitor(runningJob, callback)));
|
64
|
|
65
|
} catch (IOException e) {
|
66
|
throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
|
67
|
}
|
68
|
}
|
69
|
|
70
|
private JobConf initAdminJob(final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile, final JobConf jobConf) throws IOException,
|
71
|
HadoopServiceException {
|
72
|
switch (AdminJobType.valueOf(jobName)) {
|
73
|
case copytable:
|
74
|
return copyTable(jobProfile, bbJob.getParameters(), jobConf);
|
75
|
|
76
|
default:
|
77
|
throw new HadoopServiceException("unknown admin job: " + jobName);
|
78
|
}
|
79
|
}
|
80
|
|
81
|
/**
|
82
|
* Builds a CopyTable job. Mimics
|
83
|
* <p/>
|
84
|
* <pre>
|
85
|
* {@code
|
86
|
* bin/hbase org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289
|
87
|
* --peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable <code>
|
88
|
*
|
89
|
* Options:
|
90
|
* rs.class hbase.regionserver.class of the peer cluster,
|
91
|
* specify if different from current cluster
|
92
|
* rs.impl hbase.regionserver.impl of the peer cluster,
|
93
|
* startrow the start row
|
94
|
* stoprow the stop row
|
95
|
* starttime beginning of the time range (unixtime in millis)
|
96
|
* without endtime means from starttime to forever
|
97
|
* endtime end of the time range. Ignored if no starttime specified.
|
98
|
* versions number of cell versions to copy
|
99
|
* new.name new table's name
|
100
|
* peer.adr Address of the peer cluster given in the format
|
101
|
* hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent
|
102
|
* families comma-separated list of families to copy
|
103
|
* To copy from cf1 to cf2, give sourceCfName:destCfName.
|
104
|
* To keep the same name, just give "cfName"
|
105
|
* all.cells also copy deleteFromHBase markers and deleted cells
|
106
|
*
|
107
|
* Args:
|
108
|
* tablename Name of the table to copy
|
109
|
* }
|
110
|
* </pre>
|
111
|
*
|
112
|
* @param jobProfile the job profile
|
113
|
* @param parameters the parameters
|
114
|
* @param jobConf the job conf
|
115
|
* @return the job conf
|
116
|
* @throws IOException Signals that an I/O exception has occurred.
|
117
|
* @throws HadoopServiceException
|
118
|
*/
|
119
|
private JobConf copyTable(final JobProfile jobProfile, final Map<String, String> parameters, final JobConf jobConf) throws IOException,
|
120
|
HadoopServiceException {
|
121
|
|
122
|
ScanProperties scan = jobProfile.getScanProperties();
|
123
|
|
124
|
List<String> params = Lists.newArrayList();
|
125
|
if (!scan.getFamilies().isEmpty()) {
|
126
|
String families = "--families=" + Joiner.on(",").join(scan.getFamilies());
|
127
|
log.debug("adding column families: " + families);
|
128
|
params.add(families);
|
129
|
}
|
130
|
|
131
|
// copy on remote cluster?
|
132
|
if (parameters.containsKey("peer.adr") && !StringUtils.equals(parameters.get("sourceCluster"), parameters.get("cluster"))) {
|
133
|
String peerAdr = "--peer.adr=" + parameters.get("peer.adr");
|
134
|
log.debug("adding peer address: " + peerAdr);
|
135
|
params.add(peerAdr);
|
136
|
}
|
137
|
|
138
|
// sets the target table
|
139
|
String targetTable = "--new.name=" + parameters.get("new.name");
|
140
|
log.debug("adding target table: " + targetTable);
|
141
|
params.add(targetTable);
|
142
|
|
143
|
// sets the source table
|
144
|
String sourceTable = parameters.get("hbase.mapreduce.inputtable");
|
145
|
log.debug("adding source table: " + sourceTable);
|
146
|
params.add(sourceTable);
|
147
|
|
148
|
log.info("copy table params: " + params);
|
149
|
|
150
|
final Job copyJob = CopyTable.createSubmittableJob(jobConf, Iterables.toArray(params, String.class));
|
151
|
|
152
|
if (copyJob == null) throw new HadoopServiceException("invalid copytable parameters: " + params);
|
153
|
|
154
|
final Configuration copyConf = copyJob.getConfiguration();
|
155
|
|
156
|
return merge(jobConf, copyConf);
|
157
|
}
|
158
|
|
159
|
/**
|
160
|
* Merge.
|
161
|
*
|
162
|
* @param jobConf the job conf
|
163
|
* @param copyConf the copy conf
|
164
|
* @return the job conf
|
165
|
*/
|
166
|
private JobConf merge(final JobConf jobConf, final Configuration copyConf) {
|
167
|
for (Entry<String, String> e : copyConf) {
|
168
|
jobConf.set(e.getKey(), e.getValue());
|
169
|
}
|
170
|
return jobConf;
|
171
|
}
|
172
|
|
173
|
}
|