Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.io.IOException;
4
import java.net.URI;
5
import java.util.*;
6
import java.util.Map.Entry;
7
import java.util.stream.Collectors;
8

    
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.data.hadoop.rmi.hbase.Column;
16
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
17
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableDescriptor;
18
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableRegionInfo;
19
import org.apache.commons.lang.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.hadoop.conf.Configuration;
23
import org.apache.hadoop.fs.FileSystem;
24
import org.apache.hadoop.fs.Path;
25
import org.apache.hadoop.hbase.HColumnDescriptor;
26
import org.apache.hadoop.hbase.HRegionInfo;
27
import org.apache.hadoop.hbase.HTableDescriptor;
28
import org.apache.hadoop.hbase.client.*;
29
import org.apache.hadoop.hbase.util.Bytes;
30
import org.springframework.beans.factory.annotation.Autowired;
31
import org.springframework.beans.factory.annotation.Required;
32

    
33
public class HadoopServiceCore {
34

    
35
	private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM
36
	@Autowired
37
	protected ConfigurationEnumerator configurationEnumerator;
38
	@Autowired
39
	private HadoopClientMap clients;
40
	private int maxVersions;
41

    
42
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
43
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
44
		return Arrays.asList(admin.listTables())
45
				.stream()
46
				.map(HTableDescriptor::getNameAsString)
47
				.collect(Collectors.toList());
48
	}
49

    
50
	private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException {
51
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
52

    
53
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
54

    
55
		return admin;
56
	}
57

    
58
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
59
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
60

    
61
		if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
62

    
63
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
64

    
65
		final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
66

    
67
		final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
68

    
69
		final Set<String> columns = Sets.newHashSet();
70

    
71
		for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) {
72
			columns.add(hColDesc.getNameAsString());
73
		}
74

    
75
		HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
76
		htDescriptor.setColumns(columns);
77

    
78
		List<HBaseTableRegionInfo> regions = Lists.newArrayList();
79

    
80
		for (HRegionInfo info : tableRegions) {
81
			regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
82
		}
83
		htDescriptor.setRegions(regions);
84

    
85
		if (log.isDebugEnabled()) {
86
			log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
87
		}
88

    
89
		return htDescriptor.toString();
90
	}
91

    
92
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
93
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
94
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
95
		return desc.getFamilies().stream()
96
				.map(d -> d.getNameAsString())
97
				.collect(Collectors.toList());
98
	}
99

    
100
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
101
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
102

    
103
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
104

    
105
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
106

    
107
		log.info("disabling table: " + table);
108
		admin.disableTable(table);
109

    
110
		log.info("deleting table: " + table);
111
		admin.deleteTable(table);
112

    
113
		log.info("creating table: " + table);
114
		admin.createTable(desc);
115
	}
116

    
117
	public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
118
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
119

    
120
		return admin.tableExists(table);
121
	}
122

    
123
	public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
124
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
125

    
126
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
127

    
128
		log.info("disabling table: " + table);
129
		admin.disableTable(table);
130

    
131
		log.info("deleting table: " + table);
132
		admin.deleteTable(table);
133
	}
134

    
135
	public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException {
136
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
137

    
138
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
139

    
140
		if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
141

    
142
		final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
143

    
144
		doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
145
	}
146

    
147
	public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
148
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
149

    
150
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
151

    
152
		doCreateTable(clusterName, table, columns, null);
153
	}
154

    
155
	public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions)
156
			throws IOException, HadoopServiceException {
157
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
158

    
159
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
160

    
161
		final HTableDescriptor desc = new HTableDescriptor(table);
162
		for (final String column : columns) {
163
			final HColumnDescriptor hds = new HColumnDescriptor(column);
164
			hds.setMaxVersions(getMaxVersions());
165
			desc.addFamily(hds);
166
		}
167

    
168
		log.info("creating hbase table: " + table);
169

    
170
		if (regions != null && !regions.isEmpty()) {
171
			log.debug(String.format("create using %s regions: %s", regions.size(), regions));
172
			admin.createTable(desc, getSplitKeys(regions));
173
		} else {
174
			admin.createTable(desc);
175
		}
176

    
177
		log.info("created hbase table: [" + table + "]");
178
		log.debug("descriptor: [" + desc.toString() + "]");
179
	}
180

    
181
	private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) {
182
		byte[][] splits = new byte[regions.size() - 1][];
183
		for (int i = 0; i < regions.size() - 1; i++) {
184
			splits[i] = regions.get(i).getEndKey().getBytes();
185
		}
186
		return splits;
187
	}
188

    
189
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
190

    
191
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
192

    
193
		if (!admin.tableExists(table)) {
194
			createTable(clusterName, table, columns);
195
		} else {
196
			final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
197

    
198
			final Set<String> foundColumns = desc.getFamilies().stream()
199
					.map(d -> d.getNameAsString())
200
					.collect(Collectors.toCollection(HashSet::new));
201

    
202
			log.info("ensuring columns on table " + table + ": " + columns);
203
			final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
204
			if (!missingColumns.isEmpty()) {
205

    
206
				if (admin.isTableEnabled(table)) {
207
					admin.disableTable(table);
208
				}
209

    
210
				for (final String column : missingColumns) {
211
					log.info("hbase table: '" + table + "', adding column: " + column);
212
					admin.addColumn(table, new HColumnDescriptor(column));
213
				}
214

    
215
				admin.enableTable(table);
216
			}
217
		}
218
	}
219

    
220
	public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
221
		final Configuration conf = configurationEnumerator.get(clusterName);
222
		final HTable table = new HTable(conf, tableName);
223

    
224
		try {
225
			table.put(puts);
226
		} finally {
227
			table.flushCommits();
228
			table.close();
229
		}
230
	}
231

    
232
	public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
233
		final Configuration conf = configurationEnumerator.get(clusterName);
234
		final HTable table = new HTable(conf, tableName);
235
		try {
236
			table.delete(deletes);
237
		} finally {
238
			table.flushCommits();
239
			table.close();
240
		}
241
	}
242

    
243
	public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException {
244
		final Configuration conf = configurationEnumerator.get(clusterName);
245
		final HTable table = new HTable(conf, tableName);
246
		try {
247
			for(HBaseRowDescriptor desc : columns) {
248

    
249
				final Delete d = new Delete(Bytes.toBytes(desc.getRowKey()));
250
				d.setWriteToWAL(true);
251
				for(Column c : desc.getColumns()) {
252
					for(String qualifier : c.getQualifier()) {
253
						log.info(String.format("delete from row '%s' cf '%s:%s'", desc.getRowKey(), c.getFamily(), qualifier));
254
						d.deleteColumns(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier));
255
					}
256
				}
257
				table.delete(d);
258
			}
259
		} finally {
260
			table.flushCommits();
261
			table.close();
262
		}
263
	}
264

    
265
	public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException {
266
		final Configuration conf = configurationEnumerator.get(clusterName);
267
		final HTable table = new HTable(conf, tableName);
268
		try {
269
			return table.get(new Get(id));
270
		} finally {
271
			table.close();
272
		}
273
	}
274

    
275
	public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException {
276
		final Map<String, HBaseRowDescriptor> map = Maps.newHashMap();
277
		for(String rowKey : rowKeys) {
278
			map.put(rowKey, describeRow(clusterName, tableName, rowKey));
279
		}
280
		return map;
281
	}
282

    
283
	public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException {
284
		final Configuration conf = configurationEnumerator.get(clusterName);
285
		final HTable table = new HTable(conf, tableName);
286

    
287
		final HBaseRowDescriptor desc = new HBaseRowDescriptor();
288

    
289
		try {
290
			final Result r = table.get(new Get(Bytes.toBytes(rowKey)));
291

    
292
			if (r.isEmpty()) {
293
				return desc;
294
			}
295

    
296
			final List<Column> columns = Lists.newArrayList();
297

    
298
			for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) {
299
				final Set<byte[]> qualifiers = e.getValue().keySet();
300
				final String family = new String(e.getKey());
301
				final Column col = new Column(family);
302

    
303
				for(byte[] q : qualifiers) {
304
					String qs = new String(q);
305
					col.getQualifier().add(qs);
306
				}
307
				columns.add(col);
308
			}
309
			desc.setColumns(columns);
310
			desc.setRowKey(rowKey);
311

    
312
			return desc;
313
		} finally {
314
			table.close();
315
		}
316
	}
317

    
318
	public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException {
319
		final Configuration conf = configurationEnumerator.get(clusterName);
320
		final HTable table = new HTable(conf, tableName);
321
		try {
322
			final ResultScanner rs = table.getScanner(scan);
323
			try {
324
				return Lists.newArrayList(rs.iterator());
325
			} finally {
326
				rs.close();
327
			}
328
		} finally {
329
			table.close();
330
		}
331
	}
332

    
333
	public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException {
334
		if (StringUtils.isBlank(path))
335
			throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path.");
336

    
337
		final Configuration conf = configurationEnumerator.get(clusterName);
338

    
339
		try {
340
			final FileSystem hdfs = FileSystem.get(conf);
341
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
342

    
343
			if (hdfs.exists(absolutePath)) {
344
				log.debug("deleteFromHBase path: " + absolutePath.toString());
345
				hdfs.delete(absolutePath, true);
346
				log.info("deleted path: " + absolutePath.toString());
347
				return true;
348
			} else {
349
				log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString());
350
				return false;
351
			}
352
		} catch (IOException e) {
353
			throw new HadoopServiceException(e);
354
		}
355
	}
356

    
357
	public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException {
358
		if (StringUtils.isBlank(path))
359
			throw new HadoopServiceException("Cannot create an empty HDFS path.");
360

    
361
		final Configuration conf = configurationEnumerator.get(clusterName);
362

    
363
		try {
364
			final FileSystem hdfs = FileSystem.get(conf);
365
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
366
			if (!hdfs.exists(absolutePath)) {
367
				hdfs.mkdirs(absolutePath);
368
				log.info("created path: " + absolutePath.toString());
369
				return true;
370
			} else if (force) {
371
				log.info(String.format("found directory '%s', force delete it", absolutePath.toString()));
372
				hdfs.delete(absolutePath, true);
373

    
374
				hdfs.mkdirs(absolutePath);
375
				log.info("created path: " + absolutePath.toString());
376
				return true;
377
			} else {
378
				log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString()));
379
				return false;
380
			}
381
		} catch (IOException e) {
382
			throw new HadoopServiceException(e);
383
		}
384
	}
385

    
386
	public boolean existHdfsPath(final ClusterName clusterName, final String path) throws HadoopServiceException {
387
		if (StringUtils.isBlank(path))
388
			throw new HadoopServiceException("invalid empty path");
389

    
390
		final Configuration conf = configurationEnumerator.get(clusterName);
391
		try {
392
			final FileSystem hdfs = FileSystem.get(conf);
393
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
394
			return hdfs.exists(absolutePath);
395
		} catch (IOException e) {
396
			throw new HadoopServiceException(e);
397
		}
398
	}
399

    
400
	public Configuration getClusterConiguration(final ClusterName clusterName) {
401
		return configurationEnumerator.get(clusterName);
402
	}
403

    
404
	public int getMaxVersions() {
405
		return maxVersions;
406
	}
407

    
408
	@Required
409
	public void setMaxVersions(final int maxVersions) {
410
		this.maxVersions = maxVersions;
411
	}
412

    
413
	public HadoopClientMap getClients() {
414
		return clients;
415
	}
416

    
417
}
(4-4/7)