Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Map.Entry;
7
import java.util.Set;
8

    
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import eu.dnetlib.data.hadoop.config.ClusterName;
12
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
13
import eu.dnetlib.data.hadoop.rmi.HadoopService;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
16
import eu.dnetlib.enabling.tools.AbstractBaseService;
17
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler;
18
import org.apache.hadoop.conf.Configuration;
19
import org.springframework.beans.factory.annotation.Autowired;
20
import org.springframework.beans.factory.annotation.Required;
21

    
22
/**
23
 * The Class HadoopServiceImpl.
24
 */
25
public class HadoopServiceImpl extends AbstractBaseService implements HadoopService {
26

    
27
	/**
28
	 * notification handler.
29
	 */
30
	private NotificationHandler notificationHandler;
31

    
32
	/** The hadoop service core. */
33
	@Autowired
34
	private HadoopServiceCore hadoopServiceCore;
35

    
36
	/** The job registry. */
37
	@Autowired
38
	private JobRegistry jobRegistry;
39

    
40
	/*
41
	 * (non-Javadoc)
42
	 * 
43
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listAvailableJobs()
44
	 */
45
	@Override
46
	public List<String> listAvailableJobs() throws HadoopServiceException {
47
		List<String> res = Lists.newArrayList();
48
		return res;
49
	}
50

    
51
	/*
52
	 * (non-Javadoc)
53
	 * 
54
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listJobs(java.lang.String)
55
	 */
56
	@Override
57
	public List<HadoopJobDescriptor> listJobs(final String clusterName) throws HadoopServiceException {
58
		return jobRegistry.listJobs(checkExists(clusterName));
59
	}
60

    
61
	/*
62
	 * (non-Javadoc)
63
	 * 
64
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#killJob(java.lang.String)
65
	 */
66
	@Override
67
	public boolean killJob(final String jobId) throws HadoopServiceException {
68
		jobRegistry.unregisterJob(jobId);
69
		return true;
70
	}
71

    
72
	/*
73
	 * (non-Javadoc)
74
	 * 
75
	 * @see eu.dnetlib.enabling.tools.AbstractBaseService#notify(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
76
	 */
77
	@Override
78
	public void notify(final String subscriptionId, final String topic, final String isId, final String message) {
79
		getNotificationHandler().notified(subscriptionId, topic, isId, message);
80
	}
81

    
82
	/*
83
	 * (non-Javadoc)
84
	 * 
85
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listHbaseTables(java.lang.String)
86
	 */
87
	@Override
88
	public List<String> listHbaseTables(final String clusterName) throws HadoopServiceException {
89
		try {
90
			return hadoopServiceCore.listTables(checkExists(clusterName));
91
		} catch (IOException e) {
92
			throw new HadoopServiceException(e);
93
		}
94
	}
95

    
96
	/*
97
	 * (non-Javadoc)
98
	 * 
99
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#ensureHbaseTable(java.lang.String, java.lang.String, java.util.Set)
100
	 */
101
	@Override
102
	public boolean ensureHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
103
		try {
104
			hadoopServiceCore.ensureTable(checkExists(clusterName), tableName, columns);
105
			return true;
106
		} catch (IOException e) {
107
			throw new HadoopServiceException(e);
108
		}
109
	}
110

    
111
	/*
112
	 * (non-Javadoc)
113
	 * 
114
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.Set)
115
	 */
116
	@Override
117
	public boolean createHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
118
		try {
119
			hadoopServiceCore.createTable(checkExists(clusterName), tableName, columns);
120
			return true;
121
		} catch (IOException e) {
122
			throw new HadoopServiceException(e);
123
		}
124
	}
125

    
126
	/*
127
	 * (non-Javadoc)
128
	 *
129
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.String)
130
	 */
131
	@Override
132
	public boolean createConfiguredHbaseTable(final String clusterName, final String tableName, final String tableConfiguration) throws HadoopServiceException {
133
		try {
134
			hadoopServiceCore.createTable(checkExists(clusterName), tableName, tableConfiguration);
135
			return true;
136
		} catch (IOException e) {
137
			throw new HadoopServiceException(e);
138
		}
139
	}
140

    
141
	/*
142
	 * (non-Javadoc)
143
	 * 
144
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#truncateHbaseTable(java.lang.String, java.lang.String)
145
	 */
146
	@Override
147
	public boolean truncateHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
148
		try {
149
			hadoopServiceCore.truncateTable(checkExists(clusterName), tableName);
150
		} catch (IOException e) {
151
			throw new HadoopServiceException(e);
152
		}
153
		return true;
154
	}
155

    
156
	/*
157
	 * (non-Javadoc)
158
	 * 
159
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#dropHbaseTable(java.lang.String, java.lang.String)
160
	 */
161
	@Override
162
	public boolean dropHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
163
		try {
164
			hadoopServiceCore.dropTable(checkExists(clusterName), tableName);
165
		} catch (IOException e) {
166
			throw new HadoopServiceException(e);
167
		}
168
		return true;
169
	}
170

    
171
	/*
172
	 * (non-Javadoc)
173
	 * 
174
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#existHbaseTable(java.lang.String, java.lang.String)
175
	 */
176
	@Override
177
	public boolean existHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
178
		try {
179
			return hadoopServiceCore.existTable(checkExists(clusterName), tableName);
180
		} catch (IOException e) {
181
			throw new HadoopServiceException(e);
182
		}
183
	}
184

    
185
	/*
186
	 * (non-Javadoc)
187
	 * 
188
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#getClusterConfiguration(java.lang.String)
189
	 */
190
	@Override
191
	public Map<String, String> getClusterConfiguration(final String clusterName) throws HadoopServiceException {
192

    
193
		final Configuration conf = hadoopServiceCore.getClusterConiguration(checkExists(clusterName));
194
		final Map<String, String> res = Maps.newHashMap();
195
		for (Entry<String, String> e : conf) {
196
			res.put(e.getKey(), e.getValue());
197
		}
198

    
199
		return res;
200
	}
201

    
202
	@Override
203
	public boolean deleteHdfsPath(final String clusterName, final String path) throws HadoopServiceException {
204
		return hadoopServiceCore.deleteFromHdfs(checkExists(clusterName), path);
205
	}
206

    
207
	@Override
208
	public boolean existHdfsPath(final String clusterName, final String path) throws HadoopServiceException {
209
		return hadoopServiceCore.existHdfsPath(checkExists(clusterName), path);
210
	}
211

    
212
	@Override
213
	public boolean createHdfsDirectory(final String clusterName, final String path, final boolean force) throws HadoopServiceException {
214
		return hadoopServiceCore.createHdfsDir(checkExists(clusterName), path, force);
215
	}
216

    
217
	@Override
218
	public List<String> listClusters() throws HadoopServiceException {
219
		try {
220
			return ClusterName.asStringList();
221
		} catch (Throwable e) {
222
			throw new HadoopServiceException(e);
223
		}
224
	}
225

    
226
	@Override
227
	public List<String> describeHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
228
		try {
229
			return hadoopServiceCore.describeTable(checkExists(clusterName), tableName);
230
		} catch (IOException e) {
231
			throw new HadoopServiceException(e);
232
		}
233
	}
234

    
235
	@Override
236
	public HBaseRowDescriptor describeHBaseColumn(final String clusterName, final String tableName, final String rowKey) throws HadoopServiceException {
237
		try {
238
			return hadoopServiceCore.describeRow(checkExists(clusterName), tableName, rowKey);
239
		} catch (IOException e) {
240
			throw new HadoopServiceException(e);
241
		}
242
	}
243

    
244
	@Override
245
	public Map<String, HBaseRowDescriptor> describeHBaseColumns(final String clusterName, final String tableName, final List<String> rowKeys) throws HadoopServiceException {
246
		try {
247
			return hadoopServiceCore.describeRows(checkExists(clusterName), tableName, rowKeys);
248
		} catch (IOException e) {
249
			throw new HadoopServiceException(e);
250
		}
251
	}
252

    
253
	@Override
254
	public String describeHBaseTableConfiguration(final String clusterName, final String tableName) throws HadoopServiceException {
255
		try {
256
			return hadoopServiceCore.getHBaseTableDescriptor(checkExists(clusterName), tableName);
257
		} catch (IOException e) {
258
			throw new HadoopServiceException(e);
259
		}
260
	}
261

    
262
	@Override
263
	public boolean deleteHBaseColumn(final String clusterName, final String tableName, final HBaseRowDescriptor column) throws HadoopServiceException {
264
		try {
265
			hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, Lists.newArrayList(column));
266
			return true;
267
		} catch (IOException e) {
268
			throw new HadoopServiceException(e);
269
		}
270
	}
271

    
272
	@Override
273
	public boolean deleteHBaseColumns(final String clusterName, final String tableName, final List<HBaseRowDescriptor> column) throws HadoopServiceException {
274
		try {
275
			hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, column);
276
			return true;
277
		} catch (IOException e) {
278
			throw new HadoopServiceException(e);
279
		}
280
	}
281

    
282
	///////////////////
283

    
284
	/**
285
	 * Check exists.
286
	 *
287
	 * @param clusterName
288
	 *            the cluster name
289
	 * @return the cluster name
290
	 * @throws HadoopServiceException
291
	 *             the hadoop service exception
292
	 */
293
	private ClusterName checkExists(final String clusterName) throws HadoopServiceException {
294
		try {
295
			return ClusterName.valueOf(clusterName);
296
		} catch (final IllegalArgumentException e) {
297
			throw new HadoopServiceException("Invalid cluster name: " + clusterName);
298
		}
299
	}
300

    
301
	/**
302
	 * Gets the notification handler.
303
	 *
304
	 * @return the notification handler
305
	 */
306
	public NotificationHandler getNotificationHandler() {
307
		return notificationHandler;
308
	}
309

    
310
	/**
311
	 * Sets the notification handler.
312
	 *
313
	 * @param notificationHandler
314
	 *            the new notification handler
315
	 */
316
	@Required
317
	public void setNotificationHandler(final NotificationHandler notificationHandler) {
318
		this.notificationHandler = notificationHandler;
319
	}
320

    
321
}
(5-5/7)