Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.io.IOException;
4
import java.net.URI;
5
import java.util.*;
6
import java.util.Map.Entry;
7
import java.util.stream.Collectors;
8

    
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.data.hadoop.rmi.hbase.Column;
16
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
17
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableDescriptor;
18
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableRegionInfo;
19
import org.apache.commons.lang.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.hadoop.conf.Configuration;
23
import org.apache.hadoop.fs.FileSystem;
24
import org.apache.hadoop.fs.Path;
25
import org.apache.hadoop.hbase.HColumnDescriptor;
26
import org.apache.hadoop.hbase.HRegionInfo;
27
import org.apache.hadoop.hbase.HTableDescriptor;
28
import org.apache.hadoop.hbase.client.*;
29
import org.apache.hadoop.hbase.util.Bytes;
30
import org.springframework.beans.factory.annotation.Autowired;
31
import org.springframework.beans.factory.annotation.Required;
32

    
33
public class HadoopServiceCore {
34

    
35
	private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM
36
	@Autowired
37
	protected ConfigurationEnumerator configurationEnumerator;
38
	@Autowired
39
	private HadoopClientMap clients;
40
	private int maxVersions;
41

    
42
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
43
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
44
			return Arrays.asList(admin.listTables())
45
					.stream()
46
					.map(HTableDescriptor::getNameAsString)
47
					.collect(Collectors.toList());
48
		}
49
	}
50

    
51
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
52
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
53

    
54
			if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
55

    
56
			if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
57

    
58
			final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
59

    
60
			final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
61

    
62
			final Set<String> columns = Sets.newHashSet();
63

    
64
			for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) {
65
				columns.add(hColDesc.getNameAsString());
66
			}
67

    
68
			HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
69
			htDescriptor.setColumns(columns);
70

    
71
			List<HBaseTableRegionInfo> regions = Lists.newArrayList();
72

    
73
			for (HRegionInfo info : tableRegions) {
74
				regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
75
			}
76
			htDescriptor.setRegions(regions);
77

    
78
			if (log.isDebugEnabled()) {
79
				log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
80
			}
81

    
82
			return htDescriptor.toString();
83
		}
84
	}
85

    
86
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
87
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
88
			final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
89
			return desc.getFamilies().stream()
90
					.map(d -> d.getNameAsString())
91
					.collect(Collectors.toList());
92
		}
93
	}
94

    
95
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
96
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
97

    
98
			if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
99

    
100
			final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
101

    
102
			log.info("disabling table: " + table);
103
			admin.disableTable(table);
104

    
105
			log.info("deleting table: " + table);
106
			admin.deleteTable(table);
107

    
108
			log.info("creating table: " + table);
109
			admin.createTable(desc);
110
		}
111
	}
112

    
113
	public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
114
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
115

    
116
			return admin.tableExists(table);
117
		}
118
	}
119

    
120
	public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
121
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
122

    
123
			if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
124

    
125
			log.info("disabling table: " + table);
126
			admin.disableTable(table);
127

    
128
			log.info("deleting table: " + table);
129
			admin.deleteTable(table);
130
		}
131
	}
132

    
133
	public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException {
134
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
135

    
136
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
137

    
138
			if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
139

    
140
			final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
141

    
142
			doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
143
		}
144
	}
145

    
146
	public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
147
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
148

    
149
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
150

    
151
			doCreateTable(clusterName, table, columns, null);
152
		}
153
	}
154

    
155
	public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions)
156
			throws IOException, HadoopServiceException {
157
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
158

    
159
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
160

    
161
			final HTableDescriptor desc = new HTableDescriptor(table);
162
			for (final String column : columns) {
163
				final HColumnDescriptor hds = new HColumnDescriptor(column);
164
				hds.setMaxVersions(getMaxVersions());
165
				desc.addFamily(hds);
166
			}
167

    
168
			log.info("creating hbase table: " + table);
169

    
170
			if (regions != null && !regions.isEmpty()) {
171
				log.debug(String.format("create using %s regions: %s", regions.size(), regions));
172
				admin.createTable(desc, getSplitKeys(regions));
173
			} else {
174
				admin.createTable(desc);
175
			}
176

    
177
			log.info("created hbase table: [" + table + "]");
178
			log.debug("descriptor: [" + desc.toString() + "]");
179
		}
180
	}
181

    
182
	private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) {
183
		byte[][] splits = new byte[regions.size() - 1][];
184
		for (int i = 0; i < regions.size() - 1; i++) {
185
			splits[i] = regions.get(i).getEndKey().getBytes();
186
		}
187
		return splits;
188
	}
189

    
190
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
191
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
192

    
193
			if (!admin.tableExists(table)) {
194
				createTable(clusterName, table, columns);
195
			} else {
196
				final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
197

    
198
				final Set<String> foundColumns = desc.getFamilies().stream()
199
						.map(d -> d.getNameAsString())
200
						.collect(Collectors.toCollection(HashSet::new));
201

    
202
				log.info("ensuring columns on table " + table + ": " + columns);
203
				final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
204
				if (!missingColumns.isEmpty()) {
205

    
206
					if (admin.isTableEnabled(table)) {
207
						admin.disableTable(table);
208
					}
209

    
210
					for (final String column : missingColumns) {
211
						log.info("hbase table: '" + table + "', adding column: " + column);
212
						admin.addColumn(table, new HColumnDescriptor(column));
213
					}
214

    
215
					admin.enableTable(table);
216
				}
217
			}
218
		}
219
	}
220

    
221
	public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
222
		final Configuration conf = configurationEnumerator.get(clusterName);
223
		final HTable table = new HTable(conf, tableName);
224

    
225
		try {
226
			table.put(puts);
227
		} finally {
228
			table.flushCommits();
229
			table.close();
230
		}
231
	}
232

    
233
	public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
234
		final Configuration conf = configurationEnumerator.get(clusterName);
235
		final HTable table = new HTable(conf, tableName);
236
		try {
237
			table.delete(deletes);
238
		} finally {
239
			table.flushCommits();
240
			table.close();
241
		}
242
	}
243

    
244
	public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException {
245
		final Configuration conf = configurationEnumerator.get(clusterName);
246
		final HTable table = new HTable(conf, tableName);
247
		try {
248
			for(HBaseRowDescriptor desc : columns) {
249

    
250
				final Delete d = new Delete(Bytes.toBytes(desc.getRowKey()));
251
				d.setWriteToWAL(true);
252
				for(Column c : desc.getColumns()) {
253
					for(String qualifier : c.getQualifier()) {
254
						log.info(String.format("delete from row '%s' cf '%s:%s'", desc.getRowKey(), c.getFamily(), qualifier));
255
						d.deleteColumns(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier));
256
					}
257
				}
258
				table.delete(d);
259
			}
260
		} finally {
261
			table.flushCommits();
262
			table.close();
263
		}
264
	}
265

    
266
	public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException {
267
		final Configuration conf = configurationEnumerator.get(clusterName);
268
		final HTable table = new HTable(conf, tableName);
269
		try {
270
			return table.get(new Get(id));
271
		} finally {
272
			table.close();
273
		}
274
	}
275

    
276
	public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException {
277
		final Map<String, HBaseRowDescriptor> map = Maps.newHashMap();
278
		for(String rowKey : rowKeys) {
279
			map.put(rowKey, describeRow(clusterName, tableName, rowKey));
280
		}
281
		return map;
282
	}
283

    
284
	public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException {
285
		final Configuration conf = configurationEnumerator.get(clusterName);
286
		final HTable table = new HTable(conf, tableName);
287

    
288
		final HBaseRowDescriptor desc = new HBaseRowDescriptor();
289

    
290
		try {
291
			final Result r = table.get(new Get(Bytes.toBytes(rowKey)));
292

    
293
			if (r.isEmpty()) {
294
				return desc;
295
			}
296

    
297
			final List<Column> columns = Lists.newArrayList();
298

    
299
			for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) {
300
				final Set<byte[]> qualifiers = e.getValue().keySet();
301
				final String family = new String(e.getKey());
302
				final Column col = new Column(family);
303

    
304
				for(byte[] q : qualifiers) {
305
					String qs = new String(q);
306
					col.getQualifier().add(qs);
307
				}
308
				columns.add(col);
309
			}
310
			desc.setColumns(columns);
311
			desc.setRowKey(rowKey);
312

    
313
			return desc;
314
		} finally {
315
			table.close();
316
		}
317
	}
318

    
319
	public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException {
320
		final Configuration conf = configurationEnumerator.get(clusterName);
321
		try(final HTable table = new HTable(conf, tableName)) {
322
			final ResultScanner rs = table.getScanner(scan);
323
			try {
324
				return Lists.newArrayList(rs.iterator());
325
			} finally {
326
				rs.close();
327
			}
328
		}
329
	}
330

    
331
	public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException {
332
		if (StringUtils.isBlank(path))
333
			throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path.");
334

    
335
		final Configuration conf = configurationEnumerator.get(clusterName);
336

    
337
		try(final FileSystem hdfs = FileSystem.get(conf)) {
338
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
339

    
340
			if (hdfs.exists(absolutePath)) {
341
				log.debug("deleteFromHBase path: " + absolutePath.toString());
342
				hdfs.delete(absolutePath, true);
343
				log.info("deleted path: " + absolutePath.toString());
344
				return true;
345
			} else {
346
				log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString());
347
				return false;
348
			}
349
		} catch (IOException e) {
350
			throw new HadoopServiceException(e);
351
		}
352
	}
353

    
354
	public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException {
355
		if (StringUtils.isBlank(path))
356
			throw new HadoopServiceException("Cannot create an empty HDFS path.");
357

    
358
		final Configuration conf = configurationEnumerator.get(clusterName);
359

    
360
		try(final FileSystem hdfs = FileSystem.get(conf)) {
361
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
362
			if (!hdfs.exists(absolutePath)) {
363
				hdfs.mkdirs(absolutePath);
364
				log.info("created path: " + absolutePath.toString());
365
				return true;
366
			} else if (force) {
367
				log.info(String.format("found directory '%s', force delete it", absolutePath.toString()));
368
				hdfs.delete(absolutePath, true);
369

    
370
				hdfs.mkdirs(absolutePath);
371
				log.info("created path: " + absolutePath.toString());
372
				return true;
373
			} else {
374
				log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString()));
375
				return false;
376
			}
377
		} catch (IOException e) {
378
			throw new HadoopServiceException(e);
379
		}
380
	}
381

    
382
	public boolean existHdfsPath(final ClusterName clusterName, final String path) throws HadoopServiceException {
383
		if (StringUtils.isBlank(path))
384
			throw new HadoopServiceException("invalid empty path");
385

    
386
		final Configuration conf = configurationEnumerator.get(clusterName);
387
		try(final FileSystem hdfs = FileSystem.get(conf)) {
388
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
389
			return hdfs.exists(absolutePath);
390
		} catch (IOException e) {
391
			throw new HadoopServiceException(e);
392
		}
393
	}
394

    
395
	public Configuration getClusterConiguration(final ClusterName clusterName) {
396
		return configurationEnumerator.get(clusterName);
397
	}
398

    
399
	public int getMaxVersions() {
400
		return maxVersions;
401
	}
402

    
403
	@Required
404
	public void setMaxVersions(final int maxVersions) {
405
		this.maxVersions = maxVersions;
406
	}
407

    
408
	public HadoopClientMap getClients() {
409
		return clients;
410
	}
411

    
412
}
(4-4/7)