Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.hadoop;
2
3
import java.io.IOException;
4
import java.util.List;
5 28570 claudio.at
import java.util.Map;
6
import java.util.Map.Entry;
7 26600 sandro.lab
import java.util.Set;
8
9
import com.google.common.collect.Lists;
10 28570 claudio.at
import com.google.common.collect.Maps;
11 26600 sandro.lab
import eu.dnetlib.data.hadoop.config.ClusterName;
12
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
13
import eu.dnetlib.data.hadoop.rmi.HadoopService;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15 39676 claudio.at
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
16 26600 sandro.lab
import eu.dnetlib.enabling.tools.AbstractBaseService;
17
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler;
18 47678 alessia.ba
import org.apache.commons.codec.binary.Base64;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21 39575 claudio.at
import org.apache.hadoop.conf.Configuration;
22 47678 alessia.ba
import org.apache.hadoop.hbase.client.Put;
23
import org.apache.hadoop.hbase.util.Bytes;
24 39575 claudio.at
import org.springframework.beans.factory.annotation.Autowired;
25
import org.springframework.beans.factory.annotation.Required;
26 26600 sandro.lab
27 28573 claudio.at
/**
28
 * The Class HadoopServiceImpl.
29
 */
30 26600 sandro.lab
public class HadoopServiceImpl extends AbstractBaseService implements HadoopService {
31
32 47678 alessia.ba
	private static final Log log = LogFactory.getLog(HadoopServiceImpl.class);
33 26600 sandro.lab
	/**
34
	 * notification handler.
35
	 */
36
	private NotificationHandler notificationHandler;
37
38 28573 claudio.at
	/** The hadoop service core. */
39 26600 sandro.lab
	@Autowired
40
	private HadoopServiceCore hadoopServiceCore;
41
42 28573 claudio.at
	/** The job registry. */
43 26600 sandro.lab
	@Autowired
44
	private JobRegistry jobRegistry;
45
46 28573 claudio.at
	/*
47
	 * (non-Javadoc)
48 32179 claudio.at
	 *
49 28573 claudio.at
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listAvailableJobs()
50
	 */
51 26600 sandro.lab
	@Override
52
	public List<String> listAvailableJobs() throws HadoopServiceException {
53
		List<String> res = Lists.newArrayList();
54
		return res;
55
	}
56
57 28573 claudio.at
	/*
58
	 * (non-Javadoc)
59 32179 claudio.at
	 *
60 28573 claudio.at
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listJobs(java.lang.String)
61
	 */
62 26600 sandro.lab
	@Override
63 28570 claudio.at
	public List<HadoopJobDescriptor> listJobs(final String clusterName) throws HadoopServiceException {
64
		return jobRegistry.listJobs(checkExists(clusterName));
65 26600 sandro.lab
	}
66
67 28573 claudio.at
	/*
68
	 * (non-Javadoc)
69 32179 claudio.at
	 *
70 28573 claudio.at
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#killJob(java.lang.String)
71
	 */
72 26600 sandro.lab
	@Override
73 28570 claudio.at
	public boolean killJob(final String jobId) throws HadoopServiceException {
74 26600 sandro.lab
		jobRegistry.unregisterJob(jobId);
75
		return true;
76
	}
77
78 28573 claudio.at
	/*
79
	 * (non-Javadoc)
80 32179 claudio.at
	 *
81 28573 claudio.at
	 * @see eu.dnetlib.enabling.tools.AbstractBaseService#notify(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
82
	 */
83 26600 sandro.lab
	@Override
84 28570 claudio.at
	public void notify(final String subscriptionId, final String topic, final String isId, final String message) {
85 26600 sandro.lab
		getNotificationHandler().notified(subscriptionId, topic, isId, message);
86
	}
87
88 28573 claudio.at
	/*
89
	 * (non-Javadoc)
90 32179 claudio.at
	 *
91 28573 claudio.at
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listHbaseTables(java.lang.String)
92
	 */
93 26600 sandro.lab
	@Override
94 28570 claudio.at
	public List<String> listHbaseTables(final String clusterName) throws HadoopServiceException {
95 26600 sandro.lab
		try {
96 28570 claudio.at
			return hadoopServiceCore.listTables(checkExists(clusterName));
97 26600 sandro.lab
		} catch (IOException e) {
98
			throw new HadoopServiceException(e);
99
		}
100
	}
101
102 28573 claudio.at
	/*
103
	 * (non-Javadoc)
104 32179 claudio.at
	 *
105 28573 claudio.at
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#ensureHbaseTable(java.lang.String, java.lang.String, java.util.Set)
106
	 */
107 26600 sandro.lab
	@Override
108 28570 claudio.at
	public boolean ensureHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
109 26600 sandro.lab
		try {
110 28570 claudio.at
			hadoopServiceCore.ensureTable(checkExists(clusterName), tableName, columns);
111 26600 sandro.lab
			return true;
112
		} catch (IOException e) {
113
			throw new HadoopServiceException(e);
114
		}
115
	}
116
117 28573 claudio.at
	/*
118
	 * (non-Javadoc)
119 32179 claudio.at
	 *
120 28573 claudio.at
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.Set)
121
	 */
122 26600 sandro.lab
	@Override
123 28570 claudio.at
	public boolean createHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
124 26600 sandro.lab
		try {
125 28570 claudio.at
			hadoopServiceCore.createTable(checkExists(clusterName), tableName, columns);
126 26600 sandro.lab
			return true;
127
		} catch (IOException e) {
128
			throw new HadoopServiceException(e);
129
		}
130
	}
131
132 28573 claudio.at
	/*
133
	 * (non-Javadoc)
134 39856 claudio.at
	 *
135
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.String)
136
	 */
137
	@Override
138
	public boolean createConfiguredHbaseTable(final String clusterName, final String tableName, final String tableConfiguration) throws HadoopServiceException {
139
		try {
140
			hadoopServiceCore.createTable(checkExists(clusterName), tableName, tableConfiguration);
141
			return true;
142
		} catch (IOException e) {
143
			throw new HadoopServiceException(e);
144
		}
145
	}
146
147
	/*
148
	 * (non-Javadoc)
149 32179 claudio.at
	 *
150 28573 claudio.at
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#truncateHbaseTable(java.lang.String, java.lang.String)
151
	 */
152 26600 sandro.lab
	@Override
153 28570 claudio.at
	public boolean truncateHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
154 26600 sandro.lab
		try {
155 28570 claudio.at
			hadoopServiceCore.truncateTable(checkExists(clusterName), tableName);
156 26600 sandro.lab
		} catch (IOException e) {
157
			throw new HadoopServiceException(e);
158
		}
159
		return true;
160
	}
161
162 28573 claudio.at
	/*
163
	 * (non-Javadoc)
164 32179 claudio.at
	 *
165 28573 claudio.at
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#dropHbaseTable(java.lang.String, java.lang.String)
166
	 */
167 27702 claudio.at
	@Override
168 28570 claudio.at
	public boolean dropHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
169 27702 claudio.at
		try {
170 28570 claudio.at
			hadoopServiceCore.dropTable(checkExists(clusterName), tableName);
171 27702 claudio.at
		} catch (IOException e) {
172
			throw new HadoopServiceException(e);
173
		}
174
		return true;
175
	}
176
177 28573 claudio.at
	/*
178
	 * (non-Javadoc)
179 32179 claudio.at
	 *
180 28573 claudio.at
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#existHbaseTable(java.lang.String, java.lang.String)
181
	 */
182 27873 claudio.at
	@Override
183 28570 claudio.at
	public boolean existHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
184 27873 claudio.at
		try {
185 28570 claudio.at
			return hadoopServiceCore.existTable(checkExists(clusterName), tableName);
186 27873 claudio.at
		} catch (IOException e) {
187
			throw new HadoopServiceException(e);
188
		}
189
	}
190
191 28573 claudio.at
	/*
192
	 * (non-Javadoc)
193 32179 claudio.at
	 *
194 28573 claudio.at
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#getClusterConfiguration(java.lang.String)
195
	 */
196 28570 claudio.at
	@Override
197
	public Map<String, String> getClusterConfiguration(final String clusterName) throws HadoopServiceException {
198
199
		final Configuration conf = hadoopServiceCore.getClusterConiguration(checkExists(clusterName));
200
		final Map<String, String> res = Maps.newHashMap();
201
		for (Entry<String, String> e : conf) {
202
			res.put(e.getKey(), e.getValue());
203
		}
204
205
		return res;
206
	}
207
208 31254 claudio.at
	@Override
209 39575 claudio.at
	public boolean deleteHdfsPath(final String clusterName, final String path) throws HadoopServiceException {
210
		return hadoopServiceCore.deleteFromHdfs(checkExists(clusterName), path);
211
	}
212
213
	@Override
214 42452 claudio.at
	public boolean existHdfsPath(final String clusterName, final String path) throws HadoopServiceException {
215
		return hadoopServiceCore.existHdfsPath(checkExists(clusterName), path);
216
	}
217
218
	@Override
219 39575 claudio.at
	public boolean createHdfsDirectory(final String clusterName, final String path, final boolean force) throws HadoopServiceException {
220
		return hadoopServiceCore.createHdfsDir(checkExists(clusterName), path, force);
221
	}
222
223
	@Override
224 31254 claudio.at
	public List<String> listClusters() throws HadoopServiceException {
225
		try {
226
			return ClusterName.asStringList();
227
		} catch (Throwable e) {
228
			throw new HadoopServiceException(e);
229
		}
230
	}
231
232 32179 claudio.at
	@Override
233
	public List<String> describeHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
234
		try {
235
			return hadoopServiceCore.describeTable(checkExists(clusterName), tableName);
236
		} catch (IOException e) {
237
			throw new HadoopServiceException(e);
238
		}
239
	}
240
241 39676 claudio.at
	@Override
242
	public HBaseRowDescriptor describeHBaseColumn(final String clusterName, final String tableName, final String rowKey) throws HadoopServiceException {
243
		try {
244
			return hadoopServiceCore.describeRow(checkExists(clusterName), tableName, rowKey);
245
		} catch (IOException e) {
246
			throw new HadoopServiceException(e);
247
		}
248
	}
249 26600 sandro.lab
250 39676 claudio.at
	@Override
251
	public Map<String, HBaseRowDescriptor> describeHBaseColumns(final String clusterName, final String tableName, final List<String> rowKeys) throws HadoopServiceException {
252
		try {
253
			return hadoopServiceCore.describeRows(checkExists(clusterName), tableName, rowKeys);
254
		} catch (IOException e) {
255
			throw new HadoopServiceException(e);
256
		}
257
	}
258
259 39684 claudio.at
	@Override
260 39847 claudio.at
	public String describeHBaseTableConfiguration(final String clusterName, final String tableName) throws HadoopServiceException {
261
		try {
262
			return hadoopServiceCore.getHBaseTableDescriptor(checkExists(clusterName), tableName);
263
		} catch (IOException e) {
264
			throw new HadoopServiceException(e);
265
		}
266
	}
267
268
	@Override
269 39684 claudio.at
	public boolean deleteHBaseColumn(final String clusterName, final String tableName, final HBaseRowDescriptor column) throws HadoopServiceException {
270
		try {
271
			hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, Lists.newArrayList(column));
272
			return true;
273
		} catch (IOException e) {
274
			throw new HadoopServiceException(e);
275
		}
276
	}
277
278
	@Override
279
	public boolean deleteHBaseColumns(final String clusterName, final String tableName, final List<HBaseRowDescriptor> column) throws HadoopServiceException {
280
		try {
281
			hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, column);
282
			return true;
283
		} catch (IOException e) {
284
			throw new HadoopServiceException(e);
285
		}
286
	}
287
288 47678 alessia.ba
	@Override
289
	public void addHBaseColumn(final String clusterName,
290
			final String tableName,
291
			final String rowKey,
292
			final String columnFamily,
293
			final String qualifier,
294
			final String value)
295
			throws HadoopServiceException {
296
		log.debug(String.format("adding value to K: '%s' CF:'%s' Q:'%s'", rowKey, columnFamily, qualifier));
297
		final Put put = new Put(Bytes.toBytes(rowKey)).add(Bytes.toBytes(columnFamily), Bytes.toBytes(qualifier), Base64.decodeBase64(value));
298
		try {
299
			hadoopServiceCore.writeOnHBase(ClusterName.valueOf(clusterName), tableName, Lists.newArrayList(put));
300
		} catch (IOException e) {
301
			throw new HadoopServiceException(e);
302
		}
303
	}
304
305
306
307 39676 claudio.at
	///////////////////
308
309 28573 claudio.at
	/**
310
	 * Check exists.
311 31254 claudio.at
	 *
312 28573 claudio.at
	 * @param clusterName
313
	 *            the cluster name
314
	 * @return the cluster name
315
	 * @throws HadoopServiceException
316
	 *             the hadoop service exception
317
	 */
318 28570 claudio.at
	private ClusterName checkExists(final String clusterName) throws HadoopServiceException {
319
		try {
320
			return ClusterName.valueOf(clusterName);
321
		} catch (final IllegalArgumentException e) {
322
			throw new HadoopServiceException("Invalid cluster name: " + clusterName);
323
		}
324
	}
325
326 28573 claudio.at
	/**
327
	 * Gets the notification handler.
328 31254 claudio.at
	 *
329 28573 claudio.at
	 * @return the notification handler
330
	 */
331 26600 sandro.lab
	public NotificationHandler getNotificationHandler() {
332
		return notificationHandler;
333
	}
334
335 28573 claudio.at
	/**
336
	 * Sets the notification handler.
337 31254 claudio.at
	 *
338 28573 claudio.at
	 * @param notificationHandler
339
	 *            the new notification handler
340
	 */
341 26600 sandro.lab
	@Required
342 28570 claudio.at
	public void setNotificationHandler(final NotificationHandler notificationHandler) {
343 26600 sandro.lab
		this.notificationHandler = notificationHandler;
344
	}
345
346
}