Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Map.Entry;
7
import java.util.Set;
8

    
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import eu.dnetlib.data.hadoop.config.ClusterName;
12
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
13
import eu.dnetlib.data.hadoop.rmi.HadoopService;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.enabling.tools.AbstractBaseService;
16
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler;
17
import org.apache.hadoop.conf.Configuration;
18
import org.springframework.beans.factory.annotation.Autowired;
19
import org.springframework.beans.factory.annotation.Required;
20

    
21
/**
22
 * The Class HadoopServiceImpl.
23
 */
24
public class HadoopServiceImpl extends AbstractBaseService implements HadoopService {
25

    
26
	/**
27
	 * notification handler.
28
	 */
29
	private NotificationHandler notificationHandler;
30

    
31
	/** The hadoop service core. */
32
	@Autowired
33
	private HadoopServiceCore hadoopServiceCore;
34

    
35
	/** The job registry. */
36
	@Autowired
37
	private JobRegistry jobRegistry;
38

    
39
	/*
40
	 * (non-Javadoc)
41
	 * 
42
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listAvailableJobs()
43
	 */
44
	@Override
45
	public List<String> listAvailableJobs() throws HadoopServiceException {
46
		List<String> res = Lists.newArrayList();
47
		return res;
48
	}
49

    
50
	/*
51
	 * (non-Javadoc)
52
	 * 
53
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listJobs(java.lang.String)
54
	 */
55
	@Override
56
	public List<HadoopJobDescriptor> listJobs(final String clusterName) throws HadoopServiceException {
57
		return jobRegistry.listJobs(checkExists(clusterName));
58
	}
59

    
60
	/*
61
	 * (non-Javadoc)
62
	 * 
63
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#killJob(java.lang.String)
64
	 */
65
	@Override
66
	public boolean killJob(final String jobId) throws HadoopServiceException {
67
		jobRegistry.unregisterJob(jobId);
68
		return true;
69
	}
70

    
71
	/*
72
	 * (non-Javadoc)
73
	 * 
74
	 * @see eu.dnetlib.enabling.tools.AbstractBaseService#notify(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
75
	 */
76
	@Override
77
	public void notify(final String subscriptionId, final String topic, final String isId, final String message) {
78
		getNotificationHandler().notified(subscriptionId, topic, isId, message);
79
	}
80

    
81
	/*
82
	 * (non-Javadoc)
83
	 * 
84
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listHbaseTables(java.lang.String)
85
	 */
86
	@Override
87
	public List<String> listHbaseTables(final String clusterName) throws HadoopServiceException {
88
		try {
89
			return hadoopServiceCore.listTables(checkExists(clusterName));
90
		} catch (IOException e) {
91
			throw new HadoopServiceException(e);
92
		}
93
	}
94

    
95
	/*
96
	 * (non-Javadoc)
97
	 * 
98
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#ensureHbaseTable(java.lang.String, java.lang.String, java.util.Set)
99
	 */
100
	@Override
101
	public boolean ensureHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
102
		try {
103
			hadoopServiceCore.ensureTable(checkExists(clusterName), tableName, columns);
104
			return true;
105
		} catch (IOException e) {
106
			throw new HadoopServiceException(e);
107
		}
108
	}
109

    
110
	/*
111
	 * (non-Javadoc)
112
	 * 
113
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.Set)
114
	 */
115
	@Override
116
	public boolean createHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
117
		try {
118
			hadoopServiceCore.createTable(checkExists(clusterName), tableName, columns);
119
			return true;
120
		} catch (IOException e) {
121
			throw new HadoopServiceException(e);
122
		}
123
	}
124

    
125
	/*
126
	 * (non-Javadoc)
127
	 * 
128
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#truncateHbaseTable(java.lang.String, java.lang.String)
129
	 */
130
	@Override
131
	public boolean truncateHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
132
		try {
133
			hadoopServiceCore.truncateTable(checkExists(clusterName), tableName);
134
		} catch (IOException e) {
135
			throw new HadoopServiceException(e);
136
		}
137
		return true;
138
	}
139

    
140
	/*
141
	 * (non-Javadoc)
142
	 * 
143
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#dropHbaseTable(java.lang.String, java.lang.String)
144
	 */
145
	@Override
146
	public boolean dropHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
147
		try {
148
			hadoopServiceCore.dropTable(checkExists(clusterName), tableName);
149
		} catch (IOException e) {
150
			throw new HadoopServiceException(e);
151
		}
152
		return true;
153
	}
154

    
155
	/*
156
	 * (non-Javadoc)
157
	 * 
158
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#existHbaseTable(java.lang.String, java.lang.String)
159
	 */
160
	@Override
161
	public boolean existHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
162
		try {
163
			return hadoopServiceCore.existTable(checkExists(clusterName), tableName);
164
		} catch (IOException e) {
165
			throw new HadoopServiceException(e);
166
		}
167
	}
168

    
169
	/*
170
	 * (non-Javadoc)
171
	 * 
172
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#getClusterConfiguration(java.lang.String)
173
	 */
174
	@Override
175
	public Map<String, String> getClusterConfiguration(final String clusterName) throws HadoopServiceException {
176

    
177
		final Configuration conf = hadoopServiceCore.getClusterConiguration(checkExists(clusterName));
178
		final Map<String, String> res = Maps.newHashMap();
179
		for (Entry<String, String> e : conf) {
180
			res.put(e.getKey(), e.getValue());
181
		}
182

    
183
		return res;
184
	}
185

    
186
	@Override
187
	public boolean deleteHdfsPath(final String clusterName, final String path) throws HadoopServiceException {
188
		return hadoopServiceCore.deleteFromHdfs(checkExists(clusterName), path);
189
	}
190

    
191
	@Override
192
	public boolean createHdfsDirectory(final String clusterName, final String path, final boolean force) throws HadoopServiceException {
193
		return hadoopServiceCore.createHdfsDir(checkExists(clusterName), path, force);
194
	}
195

    
196
	@Override
197
	public List<String> listClusters() throws HadoopServiceException {
198
		try {
199
			return ClusterName.asStringList();
200
		} catch (Throwable e) {
201
			throw new HadoopServiceException(e);
202
		}
203
	}
204

    
205
	@Override
206
	public List<String> describeHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
207
		try {
208
			return hadoopServiceCore.describeTable(checkExists(clusterName), tableName);
209
		} catch (IOException e) {
210
			throw new HadoopServiceException(e);
211
		}
212
	}
213

    
214
	// /////////////////
215

    
216
	/**
217
	 * Check exists.
218
	 *
219
	 * @param clusterName
220
	 *            the cluster name
221
	 * @return the cluster name
222
	 * @throws HadoopServiceException
223
	 *             the hadoop service exception
224
	 */
225
	private ClusterName checkExists(final String clusterName) throws HadoopServiceException {
226
		try {
227
			return ClusterName.valueOf(clusterName);
228
		} catch (final IllegalArgumentException e) {
229
			throw new HadoopServiceException("Invalid cluster name: " + clusterName);
230
		}
231
	}
232

    
233
	/**
234
	 * Gets the notification handler.
235
	 *
236
	 * @return the notification handler
237
	 */
238
	public NotificationHandler getNotificationHandler() {
239
		return notificationHandler;
240
	}
241

    
242
	/**
243
	 * Sets the notification handler.
244
	 *
245
	 * @param notificationHandler
246
	 *            the new notification handler
247
	 */
248
	@Required
249
	public void setNotificationHandler(final NotificationHandler notificationHandler) {
250
		this.notificationHandler = notificationHandler;
251
	}
252

    
253
}
(5-5/7)