Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Map.Entry;
7
import java.util.Set;
8

    
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import eu.dnetlib.data.hadoop.config.ClusterName;
12
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
13
import eu.dnetlib.data.hadoop.rmi.HadoopService;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
16
import eu.dnetlib.enabling.tools.AbstractBaseService;
17
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler;
18
import org.apache.commons.codec.binary.Base64;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.apache.hadoop.conf.Configuration;
22
import org.apache.hadoop.hbase.client.Put;
23
import org.apache.hadoop.hbase.util.Bytes;
24
import org.springframework.beans.factory.annotation.Autowired;
25
import org.springframework.beans.factory.annotation.Required;
26

    
27
/**
28
 * The Class HadoopServiceImpl.
29
 */
30
public class HadoopServiceImpl extends AbstractBaseService implements HadoopService {
31

    
32
	private static final Log log = LogFactory.getLog(HadoopServiceImpl.class);
33
	/**
34
	 * notification handler.
35
	 */
36
	private NotificationHandler notificationHandler;
37

    
38
	/** The hadoop service core. */
39
	@Autowired
40
	private HadoopServiceCore hadoopServiceCore;
41

    
42
	/** The job registry. */
43
	@Autowired
44
	private JobRegistry jobRegistry;
45

    
46
	/*
47
	 * (non-Javadoc)
48
	 * 
49
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listAvailableJobs()
50
	 */
51
	@Override
52
	public List<String> listAvailableJobs() throws HadoopServiceException {
53
		List<String> res = Lists.newArrayList();
54
		return res;
55
	}
56

    
57
	/*
58
	 * (non-Javadoc)
59
	 * 
60
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listJobs(java.lang.String)
61
	 */
62
	@Override
63
	public List<HadoopJobDescriptor> listJobs(final String clusterName) throws HadoopServiceException {
64
		return jobRegistry.listJobs(checkExists(clusterName));
65
	}
66

    
67
	/*
68
	 * (non-Javadoc)
69
	 * 
70
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#killJob(java.lang.String)
71
	 */
72
	@Override
73
	public boolean killJob(final String jobId) throws HadoopServiceException {
74
		jobRegistry.unregisterJob(jobId);
75
		return true;
76
	}
77

    
78
	/*
79
	 * (non-Javadoc)
80
	 * 
81
	 * @see eu.dnetlib.enabling.tools.AbstractBaseService#notify(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
82
	 */
83
	@Override
84
	public void notify(final String subscriptionId, final String topic, final String isId, final String message) {
85
		getNotificationHandler().notified(subscriptionId, topic, isId, message);
86
	}
87

    
88
	/*
89
	 * (non-Javadoc)
90
	 * 
91
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listHbaseTables(java.lang.String)
92
	 */
93
	@Override
94
	public List<String> listHbaseTables(final String clusterName) throws HadoopServiceException {
95
		try {
96
			return hadoopServiceCore.listTables(checkExists(clusterName));
97
		} catch (IOException e) {
98
			throw new HadoopServiceException(e);
99
		}
100
	}
101

    
102
	/*
103
	 * (non-Javadoc)
104
	 * 
105
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#ensureHbaseTable(java.lang.String, java.lang.String, java.util.Set)
106
	 */
107
	@Override
108
	public boolean ensureHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
109
		try {
110
			hadoopServiceCore.ensureTable(checkExists(clusterName), tableName, columns);
111
			return true;
112
		} catch (IOException e) {
113
			throw new HadoopServiceException(e);
114
		}
115
	}
116

    
117
	/*
118
	 * (non-Javadoc)
119
	 * 
120
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.Set)
121
	 */
122
	@Override
123
	public boolean createHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
124
		try {
125
			hadoopServiceCore.createTable(checkExists(clusterName), tableName, columns);
126
			return true;
127
		} catch (IOException e) {
128
			throw new HadoopServiceException(e);
129
		}
130
	}
131

    
132
	/*
133
	 * (non-Javadoc)
134
	 *
135
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.String)
136
	 */
137
	@Override
138
	public boolean createConfiguredHbaseTable(final String clusterName, final String tableName, final String tableConfiguration) throws HadoopServiceException {
139
		try {
140
			hadoopServiceCore.createTable(checkExists(clusterName), tableName, tableConfiguration);
141
			return true;
142
		} catch (IOException e) {
143
			throw new HadoopServiceException(e);
144
		}
145
	}
146

    
147
	/*
148
	 * (non-Javadoc)
149
	 * 
150
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#truncateHbaseTable(java.lang.String, java.lang.String)
151
	 */
152
	@Override
153
	public boolean truncateHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
154
		try {
155
			hadoopServiceCore.truncateTable(checkExists(clusterName), tableName);
156
		} catch (IOException e) {
157
			throw new HadoopServiceException(e);
158
		}
159
		return true;
160
	}
161

    
162
	/*
163
	 * (non-Javadoc)
164
	 * 
165
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#dropHbaseTable(java.lang.String, java.lang.String)
166
	 */
167
	@Override
168
	public boolean dropHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
169
		try {
170
			hadoopServiceCore.dropTable(checkExists(clusterName), tableName);
171
		} catch (IOException e) {
172
			throw new HadoopServiceException(e);
173
		}
174
		return true;
175
	}
176

    
177
	/*
178
	 * (non-Javadoc)
179
	 * 
180
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#existHbaseTable(java.lang.String, java.lang.String)
181
	 */
182
	@Override
183
	public boolean existHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
184
		try {
185
			return hadoopServiceCore.existTable(checkExists(clusterName), tableName);
186
		} catch (IOException e) {
187
			throw new HadoopServiceException(e);
188
		}
189
	}
190

    
191
	/*
192
	 * (non-Javadoc)
193
	 * 
194
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#getClusterConfiguration(java.lang.String)
195
	 */
196
	@Override
197
	public Map<String, String> getClusterConfiguration(final String clusterName) throws HadoopServiceException {
198

    
199
		final Configuration conf = hadoopServiceCore.getClusterConiguration(checkExists(clusterName));
200
		final Map<String, String> res = Maps.newHashMap();
201
		for (Entry<String, String> e : conf) {
202
			res.put(e.getKey(), e.getValue());
203
		}
204

    
205
		return res;
206
	}
207

    
208
	@Override
209
	public boolean deleteHdfsPath(final String clusterName, final String path) throws HadoopServiceException {
210
		return hadoopServiceCore.deleteFromHdfs(checkExists(clusterName), path);
211
	}
212

    
213
	@Override
214
	public boolean existHdfsPath(final String clusterName, final String path) throws HadoopServiceException {
215
		return hadoopServiceCore.existHdfsPath(checkExists(clusterName), path);
216
	}
217

    
218
	@Override
219
	public boolean createHdfsDirectory(final String clusterName, final String path, final boolean force) throws HadoopServiceException {
220
		return hadoopServiceCore.createHdfsDir(checkExists(clusterName), path, force);
221
	}
222

    
223
	@Override
224
	public List<String> listClusters() throws HadoopServiceException {
225
		try {
226
			return ClusterName.asStringList();
227
		} catch (Throwable e) {
228
			throw new HadoopServiceException(e);
229
		}
230
	}
231

    
232
	@Override
233
	public List<String> describeHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
234
		try {
235
			return hadoopServiceCore.describeTable(checkExists(clusterName), tableName);
236
		} catch (IOException e) {
237
			throw new HadoopServiceException(e);
238
		}
239
	}
240

    
241
	@Override
242
	public HBaseRowDescriptor describeHBaseColumn(final String clusterName, final String tableName, final String rowKey) throws HadoopServiceException {
243
		try {
244
			return hadoopServiceCore.describeRow(checkExists(clusterName), tableName, rowKey);
245
		} catch (IOException e) {
246
			throw new HadoopServiceException(e);
247
		}
248
	}
249

    
250
	@Override
251
	public Map<String, HBaseRowDescriptor> describeHBaseColumns(final String clusterName, final String tableName, final List<String> rowKeys) throws HadoopServiceException {
252
		try {
253
			return hadoopServiceCore.describeRows(checkExists(clusterName), tableName, rowKeys);
254
		} catch (IOException e) {
255
			throw new HadoopServiceException(e);
256
		}
257
	}
258

    
259
	@Override
260
	public String describeHBaseTableConfiguration(final String clusterName, final String tableName) throws HadoopServiceException {
261
		try {
262
			return hadoopServiceCore.getHBaseTableDescriptor(checkExists(clusterName), tableName);
263
		} catch (IOException e) {
264
			throw new HadoopServiceException(e);
265
		}
266
	}
267

    
268
	@Override
269
	public boolean deleteHBaseColumn(final String clusterName, final String tableName, final HBaseRowDescriptor column) throws HadoopServiceException {
270
		try {
271
			hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, Lists.newArrayList(column));
272
			return true;
273
		} catch (IOException e) {
274
			throw new HadoopServiceException(e);
275
		}
276
	}
277

    
278
	@Override
279
	public boolean deleteHBaseColumns(final String clusterName, final String tableName, final List<HBaseRowDescriptor> column) throws HadoopServiceException {
280
		try {
281
			hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, column);
282
			return true;
283
		} catch (IOException e) {
284
			throw new HadoopServiceException(e);
285
		}
286
	}
287

    
288
	@Override
289
	public void addHBaseColumn(final String clusterName,
290
			final String tableName,
291
			final String rowKey,
292
			final String columnFamily,
293
			final String qualifier,
294
			final String value)
295
			throws HadoopServiceException {
296
		log.debug(String.format("adding value to K: '%s' CF:'%s' Q:'%s'", rowKey, columnFamily, qualifier));
297
		final Put put = new Put(Bytes.toBytes(rowKey)).add(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Base64.decodeBase64(value));
298
		try {
299
			hadoopServiceCore.writeOnHBase(ClusterName.valueOf(clusterName), tableName, Lists.newArrayList(put));
300
		} catch (IOException e) {
301
			throw new HadoopServiceException(e);
302
		}
303
	}
304

    
305

    
306

    
307
	///////////////////
308

    
309
	/**
310
	 * Check exists.
311
	 *
312
	 * @param clusterName
313
	 *            the cluster name
314
	 * @return the cluster name
315
	 * @throws HadoopServiceException
316
	 *             the hadoop service exception
317
	 */
318
	private ClusterName checkExists(final String clusterName) throws HadoopServiceException {
319
		try {
320
			return ClusterName.valueOf(clusterName);
321
		} catch (final IllegalArgumentException e) {
322
			throw new HadoopServiceException("Invalid cluster name: " + clusterName);
323
		}
324
	}
325

    
326
	/**
327
	 * Gets the notification handler.
328
	 *
329
	 * @return the notification handler
330
	 */
331
	public NotificationHandler getNotificationHandler() {
332
		return notificationHandler;
333
	}
334

    
335
	/**
336
	 * Sets the notification handler.
337
	 *
338
	 * @param notificationHandler
339
	 *            the new notification handler
340
	 */
341
	@Required
342
	public void setNotificationHandler(final NotificationHandler notificationHandler) {
343
		this.notificationHandler = notificationHandler;
344
	}
345

    
346
}
(5-5/7)