Project

General

Profile

« Previous | Next » 

Revision 49795

lazy init of hadoop clients allow to define the user for HBase admin and jtClient once. The user is available as service property

View differences:

HadoopServiceCore.java
40 40
	private int maxVersions;
41 41

  
42 42
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
43
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
44
		return Arrays.asList(admin.listTables())
45
				.stream()
46
				.map(HTableDescriptor::getNameAsString)
47
				.collect(Collectors.toList());
43
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
44
			return Arrays.asList(admin.listTables())
45
					.stream()
46
					.map(HTableDescriptor::getNameAsString)
47
					.collect(Collectors.toList());
48
		}
48 49
	}
49 50

  
50
	private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException {
51
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
51
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
52
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
52 53

  
53
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
54
			if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
54 55

  
55
		return admin;
56
	}
56
			if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
57 57

  
58
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
59
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
58
			final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
60 59

  
61
		if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
60
			final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
62 61

  
63
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
62
			final Set<String> columns = Sets.newHashSet();
64 63

  
65
		final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
64
			for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) {
65
				columns.add(hColDesc.getNameAsString());
66
			}
66 67

  
67
		final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
68
			HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
69
			htDescriptor.setColumns(columns);
68 70

  
69
		final Set<String> columns = Sets.newHashSet();
71
			List<HBaseTableRegionInfo> regions = Lists.newArrayList();
70 72

  
71
		for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) {
72
			columns.add(hColDesc.getNameAsString());
73
		}
73
			for (HRegionInfo info : tableRegions) {
74
				regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
75
			}
76
			htDescriptor.setRegions(regions);
74 77

  
75
		HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
76
		htDescriptor.setColumns(columns);
78
			if (log.isDebugEnabled()) {
79
				log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
80
			}
77 81

  
78
		List<HBaseTableRegionInfo> regions = Lists.newArrayList();
79

  
80
		for (HRegionInfo info : tableRegions) {
81
			regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
82
			return htDescriptor.toString();
82 83
		}
83
		htDescriptor.setRegions(regions);
84

  
85
		if (log.isDebugEnabled()) {
86
			log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
87
		}
88

  
89
		return htDescriptor.toString();
90 84
	}
91 85

  
92 86
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
93
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
94
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
95
		return desc.getFamilies().stream()
96
				.map(d -> d.getNameAsString())
97
				.collect(Collectors.toList());
87
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
88
			final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
89
			return desc.getFamilies().stream()
90
					.map(d -> d.getNameAsString())
91
					.collect(Collectors.toList());
92
		}
98 93
	}
99 94

  
100 95
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
101
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
96
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
102 97

  
103
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
98
			if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
104 99

  
105
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
100
			final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
106 101

  
107
		log.info("disabling table: " + table);
108
		admin.disableTable(table);
102
			log.info("disabling table: " + table);
103
			admin.disableTable(table);
109 104

  
110
		log.info("deleting table: " + table);
111
		admin.deleteTable(table);
105
			log.info("deleting table: " + table);
106
			admin.deleteTable(table);
112 107

  
113
		log.info("creating table: " + table);
114
		admin.createTable(desc);
108
			log.info("creating table: " + table);
109
			admin.createTable(desc);
110
		}
115 111
	}
116 112

  
117 113
	public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
118
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
114
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
119 115

  
120
		return admin.tableExists(table);
116
			return admin.tableExists(table);
117
		}
121 118
	}
122 119

  
123 120
	public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
124
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
121
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
125 122

  
126
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
123
			if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
127 124

  
128
		log.info("disabling table: " + table);
129
		admin.disableTable(table);
125
			log.info("disabling table: " + table);
126
			admin.disableTable(table);
130 127

  
131
		log.info("deleting table: " + table);
132
		admin.deleteTable(table);
128
			log.info("deleting table: " + table);
129
			admin.deleteTable(table);
130
		}
133 131
	}
134 132

  
135 133
	public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException {
136
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
134
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
137 135

  
138
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
136
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
139 137

  
140
		if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
138
			if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
141 139

  
142
		final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
140
			final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
143 141

  
144
		doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
142
			doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
143
		}
145 144
	}
146 145

  
147 146
	public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
148
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
147
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
149 148

  
150
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
149
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
151 150

  
152
		doCreateTable(clusterName, table, columns, null);
151
			doCreateTable(clusterName, table, columns, null);
152
		}
153 153
	}
154 154

  
155 155
	public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions)
156 156
			throws IOException, HadoopServiceException {
157
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
157
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
158 158

  
159
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
159
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
160 160

  
161
		final HTableDescriptor desc = new HTableDescriptor(table);
162
		for (final String column : columns) {
163
			final HColumnDescriptor hds = new HColumnDescriptor(column);
164
			hds.setMaxVersions(getMaxVersions());
165
			desc.addFamily(hds);
166
		}
161
			final HTableDescriptor desc = new HTableDescriptor(table);
162
			for (final String column : columns) {
163
				final HColumnDescriptor hds = new HColumnDescriptor(column);
164
				hds.setMaxVersions(getMaxVersions());
165
				desc.addFamily(hds);
166
			}
167 167

  
168
		log.info("creating hbase table: " + table);
168
			log.info("creating hbase table: " + table);
169 169

  
170
		if (regions != null && !regions.isEmpty()) {
171
			log.debug(String.format("create using %s regions: %s", regions.size(), regions));
172
			admin.createTable(desc, getSplitKeys(regions));
173
		} else {
174
			admin.createTable(desc);
170
			if (regions != null && !regions.isEmpty()) {
171
				log.debug(String.format("create using %s regions: %s", regions.size(), regions));
172
				admin.createTable(desc, getSplitKeys(regions));
173
			} else {
174
				admin.createTable(desc);
175
			}
176

  
177
			log.info("created hbase table: [" + table + "]");
178
			log.debug("descriptor: [" + desc.toString() + "]");
175 179
		}
176

  
177
		log.info("created hbase table: [" + table + "]");
178
		log.debug("descriptor: [" + desc.toString() + "]");
179 180
	}
180 181

  
181 182
	private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) {
......
187 188
	}
188 189

  
189 190
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
191
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
190 192

  
191
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
193
			if (!admin.tableExists(table)) {
194
				createTable(clusterName, table, columns);
195
			} else {
196
				final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
192 197

  
193
		if (!admin.tableExists(table)) {
194
			createTable(clusterName, table, columns);
195
		} else {
196
			final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
198
				final Set<String> foundColumns = desc.getFamilies().stream()
199
						.map(d -> d.getNameAsString())
200
						.collect(Collectors.toCollection(HashSet::new));
197 201

  
198
			final Set<String> foundColumns = desc.getFamilies().stream()
199
					.map(d -> d.getNameAsString())
200
					.collect(Collectors.toCollection(HashSet::new));
202
				log.info("ensuring columns on table " + table + ": " + columns);
203
				final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
204
				if (!missingColumns.isEmpty()) {
201 205

  
202
			log.info("ensuring columns on table " + table + ": " + columns);
203
			final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
204
			if (!missingColumns.isEmpty()) {
206
					if (admin.isTableEnabled(table)) {
207
						admin.disableTable(table);
208
					}
205 209

  
206
				if (admin.isTableEnabled(table)) {
207
					admin.disableTable(table);
208
				}
210
					for (final String column : missingColumns) {
211
						log.info("hbase table: '" + table + "', adding column: " + column);
212
						admin.addColumn(table, new HColumnDescriptor(column));
213
					}
209 214

  
210
				for (final String column : missingColumns) {
211
					log.info("hbase table: '" + table + "', adding column: " + column);
212
					admin.addColumn(table, new HColumnDescriptor(column));
215
					admin.enableTable(table);
213 216
				}
214

  
215
				admin.enableTable(table);
216 217
			}
217 218
		}
218 219
	}
......
317 318

  
318 319
	public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException {
319 320
		final Configuration conf = configurationEnumerator.get(clusterName);
320
		final HTable table = new HTable(conf, tableName);
321
		try {
321
		try(final HTable table = new HTable(conf, tableName)) {
322 322
			final ResultScanner rs = table.getScanner(scan);
323 323
			try {
324 324
				return Lists.newArrayList(rs.iterator());
325 325
			} finally {
326 326
				rs.close();
327 327
			}
328
		} finally {
329
			table.close();
330 328
		}
331 329
	}
332 330

  
......
336 334

  
337 335
		final Configuration conf = configurationEnumerator.get(clusterName);
338 336

  
339
		try {
340
			final FileSystem hdfs = FileSystem.get(conf);
337
		try(final FileSystem hdfs = FileSystem.get(conf)) {
341 338
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
342 339

  
343 340
			if (hdfs.exists(absolutePath)) {
......
360 357

  
361 358
		final Configuration conf = configurationEnumerator.get(clusterName);
362 359

  
363
		try {
364
			final FileSystem hdfs = FileSystem.get(conf);
360
		try(final FileSystem hdfs = FileSystem.get(conf)) {
365 361
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
366 362
			if (!hdfs.exists(absolutePath)) {
367 363
				hdfs.mkdirs(absolutePath);
......
388 384
			throw new HadoopServiceException("invalid empty path");
389 385

  
390 386
		final Configuration conf = configurationEnumerator.get(clusterName);
391
		try {
392
			final FileSystem hdfs = FileSystem.get(conf);
387
		try(final FileSystem hdfs = FileSystem.get(conf)) {
393 388
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
394 389
			return hdfs.exists(absolutePath);
395 390
		} catch (IOException e) {

Also available in: Unified diff