Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.io.IOException;
4
import java.net.URI;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Map.Entry;
8
import java.util.NavigableMap;
9
import java.util.Set;
10

    
11
import com.google.common.base.Function;
12
import com.google.common.collect.Iterables;
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Sets;
16
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
17
import eu.dnetlib.rmi.data.hadoop.ClusterName;
18
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
19
import eu.dnetlib.rmi.data.hadoop.hbase.Column;
20
import eu.dnetlib.rmi.data.hadoop.hbase.HBaseRowDescriptor;
21

    
22
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
23
import eu.dnetlib.rmi.data.hadoop.hbase.schema.HBaseTableDescriptor;
24
import eu.dnetlib.rmi.data.hadoop.hbase.schema.HBaseTableRegionInfo;
25
import org.apache.commons.lang3.StringUtils;
26
import org.apache.commons.logging.Log;
27
import org.apache.commons.logging.LogFactory;
28
import org.apache.hadoop.conf.Configuration;
29
import org.apache.hadoop.fs.FileSystem;
30
import org.apache.hadoop.fs.Path;
31
import org.apache.hadoop.hbase.HColumnDescriptor;
32
import org.apache.hadoop.hbase.HRegionInfo;
33
import org.apache.hadoop.hbase.HTableDescriptor;
34
import org.apache.hadoop.hbase.client.*;
35
import org.apache.hadoop.hbase.util.Bytes;
36
import org.springframework.beans.factory.annotation.Autowired;
37
import org.springframework.beans.factory.annotation.Required;
38

    
39
public class HadoopServiceCore {
40

    
41
	private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM
42
	@Autowired
43
	protected ConfigurationEnumerator configurationEnumerator;
44
	@Autowired
45
	private HadoopClientMap clients;
46
	private int maxVersions;
47

    
48
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
49
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
50
		return Lists.newArrayList(Iterables.transform(Lists.newArrayList(admin.listTables()), HadoopUtils.tableName()));
51
	}
52

    
53
	private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException {
54
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
55

    
56
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
57

    
58
		return admin;
59
	}
60

    
61
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
62
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
63

    
64
		if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
65

    
66
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
67

    
68
		final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
69

    
70
		final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
71

    
72
		final Set<String> columns = Sets.newHashSet();
73

    
74
		for (HColumnDescriptor hColDesc : Lists.newArrayList(desc.getColumnFamilies())) {
75
			columns.add(hColDesc.getNameAsString());
76
		}
77

    
78
		HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
79
		htDescriptor.setColumns(columns);
80

    
81
		List<HBaseTableRegionInfo> regions = Lists.newArrayList();
82

    
83
		for (HRegionInfo info : tableRegions) {
84
			regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
85
		}
86
		htDescriptor.setRegions(regions);
87

    
88
		if (log.isDebugEnabled()) {
89
			log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
90
		}
91

    
92
		return htDescriptor.toString();
93
	}
94

    
95
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
96
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
97
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
98

    
99
		return Lists.newArrayList(Iterables.transform(desc.getFamilies(), new Function<HColumnDescriptor, String>() {
100

    
101
			@Override
102
			public String apply(final HColumnDescriptor desc) {
103
				return desc.getNameAsString();
104
			}
105
		}));
106
	}
107

    
108
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
109
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
110

    
111
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
112

    
113
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
114

    
115
		log.info("disabling table: " + table);
116
		admin.disableTable(table);
117

    
118
		log.info("deleting table: " + table);
119
		admin.deleteTable(table);
120

    
121
		log.info("creating table: " + table);
122
		admin.createTable(desc);
123
	}
124

    
125
	public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
126
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
127

    
128
		return admin.tableExists(table);
129
	}
130

    
131
	public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
132
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
133

    
134
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
135

    
136
		log.info("disabling table: " + table);
137
		admin.disableTable(table);
138

    
139
		log.info("deleting table: " + table);
140
		admin.deleteTable(table);
141
	}
142

    
143
	public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException {
144
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
145

    
146
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
147

    
148
		if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
149

    
150
		final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
151

    
152
		doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
153
	}
154

    
155
	public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
156
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
157

    
158
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
159

    
160
		doCreateTable(clusterName, table, columns, null);
161
	}
162

    
163
	public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions)
164
			throws IOException, HadoopServiceException {
165
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
166

    
167
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
168

    
169
		final HTableDescriptor desc = new HTableDescriptor(table);
170
		for (final String column : columns) {
171
			final HColumnDescriptor hds = new HColumnDescriptor(column);
172
			hds.setMaxVersions(getMaxVersions());
173
			desc.addFamily(hds);
174
		}
175

    
176
		log.info("creating hbase table: " + table);
177

    
178
		if (regions != null && !regions.isEmpty()) {
179
			log.debug(String.format("create using %s regions: %s", regions.size(), regions));
180
			admin.createTable(desc, getSplitKeys(regions));
181
		} else {
182
			admin.createTable(desc);
183
		}
184

    
185
		log.info("created hbase table: [" + table + "]");
186
		log.debug("descriptor: [" + desc.toString() + "]");
187
	}
188

    
189
	private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) {
190
		byte[][] splits = new byte[regions.size() - 1][];
191
		for (int i = 0; i < regions.size() - 1; i++) {
192
			splits[i] = regions.get(i).getEndKey().getBytes();
193
		}
194
		return splits;
195
	}
196

    
197
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
198

    
199
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
200

    
201
		if (!admin.tableExists(table)) {
202
			createTable(clusterName, table, columns);
203
		} else {
204
			final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
205
			final Set<String> foundColumns = Sets.newHashSet(Iterables.transform(Lists.newArrayList(desc.getColumnFamilies()), HadoopUtils.columnName()));
206

    
207
			log.info("ensuring columns on table " + table + ": " + columns);
208
			final List<String> missingColumns = Lists.newArrayList(Iterables.filter(columns, HadoopUtils.columnPredicate(foundColumns)));
209
			if (!missingColumns.isEmpty()) {
210

    
211
				if (admin.isTableEnabled(table)) {
212
					admin.disableTable(table);
213
				}
214

    
215
				for (final String column : missingColumns) {
216
					log.info("hbase table: '" + table + "', adding column: " + column);
217
					admin.addColumn(table, new HColumnDescriptor(column));
218
				}
219

    
220
				admin.enableTable(table);
221
			}
222
		}
223
	}
224

    
225
	public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
226
		final Configuration conf = configurationEnumerator.get(clusterName);
227
		final HTable table = new HTable(conf, tableName);
228

    
229
		try {
230
			table.put(puts);
231
		} finally {
232
			table.flushCommits();
233
			table.close();
234
		}
235
	}
236

    
237
	public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
238
		final Configuration conf = configurationEnumerator.get(clusterName);
239
		final HTable table = new HTable(conf, tableName);
240
		try {
241
			table.delete(deletes);
242
		} finally {
243
			table.flushCommits();
244
			table.close();
245
		}
246
	}
247

    
248
	public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException {
249
		final Configuration conf = configurationEnumerator.get(clusterName);
250
		final HTable table = new HTable(conf, tableName);
251
		try {
252
			for(HBaseRowDescriptor desc : columns) {
253

    
254
				final Delete d = new Delete(Bytes.toBytes(desc.getRowKey()));
255
				d.setWriteToWAL(true);
256
				for(Column c : desc.getColumns()) {
257
					for(String qualifier : c.getQualifier()) {
258
						log.info(String.format("delete from row '%s' cf '%s:%s'", desc.getRowKey(), c.getFamily(), qualifier));
259
						d.deleteColumns(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier));
260
					}
261
				}
262
				table.delete(d);
263
			}
264
		} finally {
265
			table.flushCommits();
266
			table.close();
267
		}
268
	}
269

    
270
	public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException {
271
		final Configuration conf = configurationEnumerator.get(clusterName);
272
		final HTable table = new HTable(conf, tableName);
273
		try {
274
			return table.get(new Get(id));
275
		} finally {
276
			table.close();
277
		}
278
	}
279

    
280
	public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException {
281
		final Map<String, HBaseRowDescriptor> map = Maps.newHashMap();
282
		for(String rowKey : rowKeys) {
283
			map.put(rowKey, describeRow(clusterName, tableName, rowKey));
284
		}
285
		return map;
286
	}
287

    
288
	public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException {
289
		final Configuration conf = configurationEnumerator.get(clusterName);
290
		final HTable table = new HTable(conf, tableName);
291

    
292
		final HBaseRowDescriptor desc = new HBaseRowDescriptor();
293

    
294
		try {
295
			final Result r = table.get(new Get(Bytes.toBytes(rowKey)));
296

    
297
			if (r.isEmpty()) {
298
				return desc;
299
			}
300

    
301
			final List<Column> columns = Lists.newArrayList();
302

    
303
			for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) {
304
				final Set<byte[]> qualifiers = e.getValue().keySet();
305
				final String family = new String(e.getKey());
306
				final Column col = new Column(family);
307

    
308
				for(byte[] q : qualifiers) {
309
					String qs = new String(q);
310
					col.getQualifier().add(qs);
311
				}
312
				columns.add(col);
313
			}
314
			desc.setColumns(columns);
315
			desc.setRowKey(rowKey);
316

    
317
			return desc;
318
		} finally {
319
			table.close();
320
		}
321
	}
322

    
323
	public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException {
324
		final Configuration conf = configurationEnumerator.get(clusterName);
325
		final HTable table = new HTable(conf, tableName);
326
		try {
327
			final ResultScanner rs = table.getScanner(scan);
328
			try {
329
				return Lists.newArrayList(rs.iterator());
330
			} finally {
331
				rs.close();
332
			}
333
		} finally {
334
			table.close();
335
		}
336
	}
337

    
338
	public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException {
339
		if (StringUtils.isBlank(path))
340
			throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path.");
341

    
342
		final Configuration conf = configurationEnumerator.get(clusterName);
343

    
344
		try {
345
			final FileSystem hdfs = FileSystem.get(conf);
346
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
347

    
348
			if (hdfs.exists(absolutePath)) {
349
				log.debug("deleteFromHBase path: " + absolutePath.toString());
350
				hdfs.delete(absolutePath, true);
351
				log.info("deleted path: " + absolutePath.toString());
352
				return true;
353
			} else {
354
				log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString());
355
				return false;
356
			}
357
		} catch (IOException e) {
358
			throw new HadoopServiceException(e);
359
		}
360
	}
361

    
362
	public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException {
363
		if (StringUtils.isBlank(path))
364
			throw new HadoopServiceException("Cannot create an empty HDFS path.");
365

    
366
		final Configuration conf = configurationEnumerator.get(clusterName);
367

    
368
		try {
369
			final FileSystem hdfs = FileSystem.get(conf);
370
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
371
			if (!hdfs.exists(absolutePath)) {
372
				hdfs.mkdirs(absolutePath);
373
				log.info("created path: " + absolutePath.toString());
374
				return true;
375
			} else if (force) {
376
				log.info(String.format("found directory '%s', force delete it", absolutePath.toString()));
377
				hdfs.delete(absolutePath, true);
378

    
379
				hdfs.mkdirs(absolutePath);
380
				log.info("created path: " + absolutePath.toString());
381
				return true;
382
			} else {
383
				log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString()));
384
				return false;
385
			}
386
		} catch (IOException e) {
387
			throw new HadoopServiceException(e);
388
		}
389
	}
390

    
391
	public boolean existHdfsPath(final ClusterName clusterName, final String path) throws HadoopServiceException {
392
		if (StringUtils.isBlank(path))
393
			throw new HadoopServiceException("invalid empty path");
394

    
395
		final Configuration conf = configurationEnumerator.get(clusterName);
396
		try {
397
			final FileSystem hdfs = FileSystem.get(conf);
398
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
399
			return hdfs.exists(absolutePath);
400
		} catch (IOException e) {
401
			throw new HadoopServiceException(e);
402
		}
403
	}
404

    
405
	public Configuration getClusterConiguration(final ClusterName clusterName) {
406
		return configurationEnumerator.get(clusterName);
407
	}
408

    
409
	public int getMaxVersions() {
410
		return maxVersions;
411
	}
412

    
413
	@Required
414
	public void setMaxVersions(final int maxVersions) {
415
		this.maxVersions = maxVersions;
416
	}
417

    
418
	public HadoopClientMap getClients() {
419
		return clients;
420
	}
421

    
422
}
(4-4/7)