Project

General

Profile

1
package eu.dnetlib.data.hadoop.blackboard;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Map.Entry;
7

    
8
import com.google.common.base.Joiner;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import eu.dnetlib.data.hadoop.HadoopJob;
12
import eu.dnetlib.data.hadoop.mapreduce.MapreduceJobMonitor;
13
import eu.dnetlib.rmi.data.hadoop.ClusterName;
14
import eu.dnetlib.rmi.data.hadoop.HadoopJobType.AdminJobType;
15
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
16
import eu.dnetlib.data.hadoop.utils.JobProfile;
17
import eu.dnetlib.data.hadoop.utils.ScanProperties;
18
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
19
import org.apache.commons.lang3.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.hadoop.conf.Configuration;
23
import org.apache.hadoop.hbase.mapreduce.CopyTable;
24
import org.apache.hadoop.mapred.JobConf;
25
import org.apache.hadoop.mapred.RunningJob;
26
import org.apache.hadoop.mapreduce.Job;
27

    
28
/**
29
 * The Class SubmitAdminJobAction.
30
 */
31
public class SubmitAdminJobAction extends SubmitMapreduceJobAction {
32

    
33
	/**
34
	 * logger.
35
	 */
36
	private static final Log log = LogFactory.getLog(SubmitAdminJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM
37

    
38
	/**
39
	 * (non-Javadoc)
40
	 *
41
	 * @see
42
	 * eu.dnetlib.data.hadoop..blackboard.SubmitMapreduceJobAction#executeAsync(eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler,
43
	 * eu.dnetlib.enabling.tools.blackboard.BlackboardJob)
44
	 */
45
	@Override
46
	public void submit(final JobCompletion callback, final BlackboardJob bbJob, String jobName, JobProfile jobProfile) throws HadoopServiceException {
47
		final ClusterName clusterName = ClusterName.valueOf(bbJob.getParameters().get("cluster"));
48

    
49
		try {
50
			JobConf jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
51

    
52
			jobConf = initAdminJob(bbJob, jobName, jobProfile, jobConf);
53

    
54
			if (!hadoopClientMap.isMapreduceAvailable(clusterName))
55
				throw new HadoopServiceException("mapreduce not available for cluster: " + clusterName.toString());
56

    
57
			logJobDetails(jobConf);
58

    
59
			final RunningJob runningJob = hadoopClientMap.getJtClient(clusterName).submitJob(jobConf);
60
			final String jobId = newJobId(clusterName, runningJob.getID().getId());
61

    
62
			jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
63
					new MapreduceJobMonitor(runningJob, callback)));
64

    
65
		} catch (IOException e) {
66
			throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
67
		}
68
	}
69

    
70
	private JobConf initAdminJob(final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile, final JobConf jobConf) throws IOException,
71
			HadoopServiceException {
72
		switch (AdminJobType.valueOf(jobName)) {
73
		case copytable:
74
			return copyTable(jobProfile, bbJob.getParameters(), jobConf);
75

    
76
		default:
77
			throw new HadoopServiceException("unknown admin job: " + jobName);
78
		}
79
	}
80

    
81
	/**
82
	 * Builds a CopyTable job. Mimics
83
	 * <p/>
84
	 * <pre>
85
	 * {@code
86
	 * bin/hbase org.apache.hadoop.hbase.mapreduce.CopyTable --starttime=1265875194289 --endtime=1265878794289
87
	 * --peer.adr=server1,server2,server3:2181:/hbase --families=myOldCf:myNewCf,cf2,cf3 TestTable <code>
88
	 *
89
	 * Options:
90
	 * rs.class     hbase.regionserver.class of the peer cluster,
91
	 *              specify if different from current cluster
92
	 * rs.impl      hbase.regionserver.impl of the peer cluster,
93
	 * startrow     the start row
94
	 * stoprow      the stop row
95
	 * starttime    beginning of the time range (unixtime in millis)
96
	 *              without endtime means from starttime to forever
97
	 * endtime      end of the time range.  Ignored if no starttime specified.
98
	 * versions     number of cell versions to copy
99
	 * new.name     new table's name
100
	 * peer.adr     Address of the peer cluster given in the format
101
	 *              hbase.zookeeer.quorum:hbase.zookeeper.client.port:zookeeper.znode.parent
102
	 * families     comma-separated list of families to copy
103
	 *              To copy from cf1 to cf2, give sourceCfName:destCfName.
104
	 *              To keep the same name, just give "cfName"
105
	 * all.cells    also copy deleteFromHBase markers and deleted cells
106
	 *
107
	 * Args:
108
	 * tablename    Name of the table to copy
109
	 * }
110
	 * </pre>
111
	 *
112
	 * @param jobProfile the job profile
113
	 * @param parameters the parameters
114
	 * @param jobConf    the job conf
115
	 * @return the job conf
116
	 * @throws IOException            Signals that an I/O exception has occurred.
117
	 * @throws HadoopServiceException
118
	 */
119
	private JobConf copyTable(final JobProfile jobProfile, final Map<String, String> parameters, final JobConf jobConf) throws IOException,
120
			HadoopServiceException {
121

    
122
		ScanProperties scan = jobProfile.getScanProperties();
123

    
124
		List<String> params = Lists.newArrayList();
125
		if (!scan.getFamilies().isEmpty()) {
126
			String families = "--families=" + Joiner.on(",").join(scan.getFamilies());
127
			log.debug("adding column families: " + families);
128
			params.add(families);
129
		}
130

    
131
		// copy on remote cluster?
132
		if (parameters.containsKey("peer.adr") && !StringUtils.equals(parameters.get("sourceCluster"), parameters.get("cluster"))) {
133
			String peerAdr = "--peer.adr=" + parameters.get("peer.adr");
134
			log.debug("adding peer address: " + peerAdr);
135
			params.add(peerAdr);
136
		}
137

    
138
		// sets the target table
139
		String targetTable = "--new.name=" + parameters.get("new.name");
140
		log.debug("adding target table: " + targetTable);
141
		params.add(targetTable);
142

    
143
		// sets the source table
144
		String sourceTable = parameters.get("hbase.mapreduce.inputtable");
145
		log.debug("adding source table: " + sourceTable);
146
		params.add(sourceTable);
147

    
148
		log.info("copy table params: " + params);
149

    
150
		final Job copyJob = CopyTable.createSubmittableJob(jobConf, Iterables.toArray(params, String.class));
151

    
152
		if (copyJob == null) throw new HadoopServiceException("invalid copytable parameters: " + params);
153

    
154
		final Configuration copyConf = copyJob.getConfiguration();
155

    
156
		return merge(jobConf, copyConf);
157
	}
158

    
159
	/**
160
	 * Merge.
161
	 *
162
	 * @param jobConf  the job conf
163
	 * @param copyConf the copy conf
164
	 * @return the job conf
165
	 */
166
	private JobConf merge(final JobConf jobConf, final Configuration copyConf) {
167
		for (Entry<String, String> e : copyConf) {
168
			jobConf.set(e.getKey(), e.getValue());
169
		}
170
		return jobConf;
171
	}
172

    
173
}
(11-11/13)