Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.io.IOException;
4
import java.net.URI;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Map.Entry;
8
import java.util.NavigableMap;
9
import java.util.Set;
10

    
11
import com.google.common.base.Function;
12
import com.google.common.collect.Iterables;
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Sets;
16
import eu.dnetlib.data.hadoop.config.ClusterName;
17
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
18
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
19
import eu.dnetlib.data.hadoop.rmi.hbase.Column;
20
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
21
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableDescriptor;
22
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableRegionInfo;
23
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
24
import org.apache.commons.lang.StringUtils;
25
import org.apache.commons.logging.Log;
26
import org.apache.commons.logging.LogFactory;
27
import org.apache.hadoop.conf.Configuration;
28
import org.apache.hadoop.fs.FileSystem;
29
import org.apache.hadoop.fs.Path;
30
import org.apache.hadoop.hbase.HColumnDescriptor;
31
import org.apache.hadoop.hbase.HRegionInfo;
32
import org.apache.hadoop.hbase.HTableDescriptor;
33
import org.apache.hadoop.hbase.client.*;
34
import org.apache.hadoop.hbase.util.Bytes;
35
import org.springframework.beans.factory.annotation.Autowired;
36
import org.springframework.beans.factory.annotation.Required;
37

    
38
public class HadoopServiceCore {
39

    
40
	private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM
41
	@Autowired
42
	protected ConfigurationEnumerator configurationEnumerator;
43
	@Autowired
44
	private HadoopClientMap clients;
45
	private int maxVersions;
46

    
47
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
48
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
49
		return Lists.newArrayList(Iterables.transform(Lists.newArrayList(admin.listTables()), HadoopUtils.tableName()));
50
	}
51

    
52
	private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException {
53
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
54

    
55
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
56

    
57
		return admin;
58
	}
59

    
60
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
61
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
62

    
63
		if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
64

    
65
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
66

    
67
		final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
68

    
69
		final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
70

    
71
		final Set<String> columns = Sets.newHashSet();
72

    
73
		for (HColumnDescriptor hColDesc : Lists.newArrayList(desc.getColumnFamilies())) {
74
			columns.add(hColDesc.getNameAsString());
75
		}
76

    
77
		HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
78
		htDescriptor.setColumns(columns);
79

    
80
		List<HBaseTableRegionInfo> regions = Lists.newArrayList();
81

    
82
		for (HRegionInfo info : tableRegions) {
83
			regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
84
		}
85
		htDescriptor.setRegions(regions);
86

    
87
		if (log.isDebugEnabled()) {
88
			log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
89
		}
90

    
91
		return htDescriptor.toString();
92
	}
93

    
94
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
95
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
96
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
97

    
98
		return Lists.newArrayList(Iterables.transform(desc.getFamilies(), new Function<HColumnDescriptor, String>() {
99

    
100
			@Override
101
			public String apply(final HColumnDescriptor desc) {
102
				return desc.getNameAsString();
103
			}
104
		}));
105
	}
106

    
107
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
108
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
109

    
110
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
111

    
112
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
113

    
114
		log.info("disabling table: " + table);
115
		admin.disableTable(table);
116

    
117
		log.info("deleting table: " + table);
118
		admin.deleteTable(table);
119

    
120
		log.info("creating table: " + table);
121
		admin.createTable(desc);
122
	}
123

    
124
	public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
125
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
126

    
127
		return admin.tableExists(table);
128
	}
129

    
130
	public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
131
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
132

    
133
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
134

    
135
		log.info("disabling table: " + table);
136
		admin.disableTable(table);
137

    
138
		log.info("deleting table: " + table);
139
		admin.deleteTable(table);
140
	}
141

    
142
	public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException {
143
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
144

    
145
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
146

    
147
		if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
148

    
149
		final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
150

    
151
		doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
152
	}
153

    
154
	public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
155
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
156

    
157
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
158

    
159
		doCreateTable(clusterName, table, columns, null);
160
	}
161

    
162
	public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions)
163
			throws IOException, HadoopServiceException {
164
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
165

    
166
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
167

    
168
		final HTableDescriptor desc = new HTableDescriptor(table);
169
		for (final String column : columns) {
170
			final HColumnDescriptor hds = new HColumnDescriptor(column);
171
			hds.setMaxVersions(getMaxVersions());
172
			desc.addFamily(hds);
173
		}
174

    
175
		log.info("creating hbase table: " + table);
176

    
177
		if (regions != null && !regions.isEmpty()) {
178
			log.debug(String.format("create using %s regions: %s", regions.size(), regions));
179
			admin.createTable(desc, getSplitKeys(regions));
180
		} else {
181
			admin.createTable(desc);
182
		}
183

    
184
		log.info("created hbase table: [" + table + "]");
185
		log.debug("descriptor: [" + desc.toString() + "]");
186
	}
187

    
188
	private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) {
189
		byte[][] splits = new byte[regions.size() - 1][];
190
		for (int i = 0; i < regions.size() - 1; i++) {
191
			splits[i] = regions.get(i).getEndKey().getBytes();
192
		}
193
		return splits;
194
	}
195

    
196
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
197

    
198
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
199

    
200
		if (!admin.tableExists(table)) {
201
			createTable(clusterName, table, columns);
202
		} else {
203
			final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
204
			final Set<String> foundColumns = Sets.newHashSet(Iterables.transform(Lists.newArrayList(desc.getColumnFamilies()), HadoopUtils.columnName()));
205

    
206
			log.info("ensuring columns on table " + table + ": " + columns);
207
			final List<String> missingColumns = Lists.newArrayList(Iterables.filter(columns, HadoopUtils.columnPredicate(foundColumns)));
208
			if (!missingColumns.isEmpty()) {
209

    
210
				if (admin.isTableEnabled(table)) {
211
					admin.disableTable(table);
212
				}
213

    
214
				for (final String column : missingColumns) {
215
					log.info("hbase table: '" + table + "', adding column: " + column);
216
					admin.addColumn(table, new HColumnDescriptor(column));
217
				}
218

    
219
				admin.enableTable(table);
220
			}
221
		}
222
	}
223

    
224
	public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
225
		final Configuration conf = configurationEnumerator.get(clusterName);
226
		final HTable table = new HTable(conf, tableName);
227

    
228
		try {
229
			table.put(puts);
230
		} finally {
231
			table.flushCommits();
232
			table.close();
233
		}
234
	}
235

    
236
	public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
237
		final Configuration conf = configurationEnumerator.get(clusterName);
238
		final HTable table = new HTable(conf, tableName);
239
		try {
240
			table.delete(deletes);
241
		} finally {
242
			table.flushCommits();
243
			table.close();
244
		}
245
	}
246

    
247
	public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException {
248
		final Configuration conf = configurationEnumerator.get(clusterName);
249
		final HTable table = new HTable(conf, tableName);
250
		try {
251
			for(HBaseRowDescriptor desc : columns) {
252

    
253
				final Delete d = new Delete(Bytes.toBytes(desc.getRowKey()));
254
				d.setWriteToWAL(true);
255
				for(Column c : desc.getColumns()) {
256
					for(String qualifier : c.getQualifier()) {
257
						log.info(String.format("delete from row '%s' cf '%s:%s'", desc.getRowKey(), c.getFamily(), qualifier));
258
						d.deleteColumns(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier));
259
					}
260
				}
261
				table.delete(d);
262
			}
263
		} finally {
264
			table.flushCommits();
265
			table.close();
266
		}
267
	}
268

    
269
	public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException {
270
		final Configuration conf = configurationEnumerator.get(clusterName);
271
		final HTable table = new HTable(conf, tableName);
272
		try {
273
			return table.get(new Get(id));
274
		} finally {
275
			table.close();
276
		}
277
	}
278

    
279
	public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException {
280
		final Map<String, HBaseRowDescriptor> map = Maps.newHashMap();
281
		for(String rowKey : rowKeys) {
282
			map.put(rowKey, describeRow(clusterName, tableName, rowKey));
283
		}
284
		return map;
285
	}
286

    
287
	public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException {
288
		final Configuration conf = configurationEnumerator.get(clusterName);
289
		final HTable table = new HTable(conf, tableName);
290

    
291
		final HBaseRowDescriptor desc = new HBaseRowDescriptor();
292

    
293
		try {
294
			final Result r = table.get(new Get(Bytes.toBytes(rowKey)));
295

    
296
			if (r.isEmpty()) {
297
				return desc;
298
			}
299

    
300
			final List<Column> columns = Lists.newArrayList();
301

    
302
			for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) {
303
				final Set<byte[]> qualifiers = e.getValue().keySet();
304
				final String family = new String(e.getKey());
305
				final Column col = new Column(family);
306

    
307
				for(byte[] q : qualifiers) {
308
					String qs = new String(q);
309
					col.getQualifier().add(qs);
310
				}
311
				columns.add(col);
312
			}
313
			desc.setColumns(columns);
314
			desc.setRowKey(rowKey);
315

    
316
			return desc;
317
		} finally {
318
			table.close();
319
		}
320
	}
321

    
322
	public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException {
323
		final Configuration conf = configurationEnumerator.get(clusterName);
324
		final HTable table = new HTable(conf, tableName);
325
		try {
326
			final ResultScanner rs = table.getScanner(scan);
327
			try {
328
				return Lists.newArrayList(rs.iterator());
329
			} finally {
330
				rs.close();
331
			}
332
		} finally {
333
			table.close();
334
		}
335
	}
336

    
337
	public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException {
338
		if (StringUtils.isBlank(path))
339
			throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path.");
340

    
341
		final Configuration conf = configurationEnumerator.get(clusterName);
342

    
343
		try {
344
			final FileSystem hdfs = FileSystem.get(conf);
345
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
346

    
347
			if (hdfs.exists(absolutePath)) {
348
				log.debug("deleteFromHBase path: " + absolutePath.toString());
349
				hdfs.delete(absolutePath, true);
350
				log.info("deleted path: " + absolutePath.toString());
351
				return true;
352
			} else {
353
				log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString());
354
				return false;
355
			}
356
		} catch (IOException e) {
357
			throw new HadoopServiceException(e);
358
		}
359
	}
360

    
361
	public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException {
362
		if (StringUtils.isBlank(path))
363
			throw new HadoopServiceException("Cannot create an empty HDFS path.");
364

    
365
		final Configuration conf = configurationEnumerator.get(clusterName);
366

    
367
		try {
368
			final FileSystem hdfs = FileSystem.get(conf);
369
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
370
			if (!hdfs.exists(absolutePath)) {
371
				hdfs.mkdirs(absolutePath);
372
				log.info("created path: " + absolutePath.toString());
373
				return true;
374
			} else if (force) {
375
				log.info(String.format("found directory '%s', force delete it", absolutePath.toString()));
376
				hdfs.delete(absolutePath, true);
377

    
378
				hdfs.mkdirs(absolutePath);
379
				log.info("created path: " + absolutePath.toString());
380
				return true;
381
			} else {
382
				log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString()));
383
				return false;
384
			}
385
		} catch (IOException e) {
386
			throw new HadoopServiceException(e);
387
		}
388
	}
389

    
390
	public boolean existHdfsPath(final ClusterName clusterName, final String path) throws HadoopServiceException {
391
		if (StringUtils.isBlank(path))
392
			throw new HadoopServiceException("invalid empty path");
393

    
394
		final Configuration conf = configurationEnumerator.get(clusterName);
395
		try {
396
			final FileSystem hdfs = FileSystem.get(conf);
397
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
398
			return hdfs.exists(absolutePath);
399
		} catch (IOException e) {
400
			throw new HadoopServiceException(e);
401
		}
402
	}
403

    
404
	public Configuration getClusterConiguration(final ClusterName clusterName) {
405
		return configurationEnumerator.get(clusterName);
406
	}
407

    
408
	public int getMaxVersions() {
409
		return maxVersions;
410
	}
411

    
412
	@Required
413
	public void setMaxVersions(final int maxVersions) {
414
		this.maxVersions = maxVersions;
415
	}
416

    
417
	public HadoopClientMap getClients() {
418
		return clients;
419
	}
420

    
421
}
(4-4/7)