Revision 49795
Added by Claudio Atzori over 6 years ago
HadoopServiceCore.java | ||
---|---|---|
40 | 40 |
private int maxVersions; |
41 | 41 |
|
42 | 42 |
public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException { |
43 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
44 |
return Arrays.asList(admin.listTables()) |
|
45 |
.stream() |
|
46 |
.map(HTableDescriptor::getNameAsString) |
|
47 |
.collect(Collectors.toList()); |
|
43 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
44 |
return Arrays.asList(admin.listTables()) |
|
45 |
.stream() |
|
46 |
.map(HTableDescriptor::getNameAsString) |
|
47 |
.collect(Collectors.toList()); |
|
48 |
} |
|
48 | 49 |
} |
49 | 50 |
|
50 |
private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException {
|
|
51 |
final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
|
|
51 |
public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
|
|
52 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
52 | 53 |
|
53 |
if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
|
|
54 |
if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
|
|
54 | 55 |
|
55 |
return admin; |
|
56 |
} |
|
56 |
if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString())); |
|
57 | 57 |
|
58 |
public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException { |
|
59 |
final HBaseAdmin admin = clients.getHbaseAdmin(clusterName); |
|
58 |
final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes()); |
|
60 | 59 |
|
61 |
if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
|
|
60 |
final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
|
|
62 | 61 |
|
63 |
if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
|
|
62 |
final Set<String> columns = Sets.newHashSet();
|
|
64 | 63 |
|
65 |
final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes()); |
|
64 |
for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) { |
|
65 |
columns.add(hColDesc.getNameAsString()); |
|
66 |
} |
|
66 | 67 |
|
67 |
final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes()); |
|
68 |
HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor(); |
|
69 |
htDescriptor.setColumns(columns); |
|
68 | 70 |
|
69 |
final Set<String> columns = Sets.newHashSet();
|
|
71 |
List<HBaseTableRegionInfo> regions = Lists.newArrayList();
|
|
70 | 72 |
|
71 |
for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) { |
|
72 |
columns.add(hColDesc.getNameAsString()); |
|
73 |
} |
|
73 |
for (HRegionInfo info : tableRegions) { |
|
74 |
regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey()))); |
|
75 |
} |
|
76 |
htDescriptor.setRegions(regions); |
|
74 | 77 |
|
75 |
HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor(); |
|
76 |
htDescriptor.setColumns(columns); |
|
78 |
if (log.isDebugEnabled()) { |
|
79 |
log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString()); |
|
80 |
} |
|
77 | 81 |
|
78 |
List<HBaseTableRegionInfo> regions = Lists.newArrayList(); |
|
79 |
|
|
80 |
for (HRegionInfo info : tableRegions) { |
|
81 |
regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey()))); |
|
82 |
return htDescriptor.toString(); |
|
82 | 83 |
} |
83 |
htDescriptor.setRegions(regions); |
|
84 |
|
|
85 |
if (log.isDebugEnabled()) { |
|
86 |
log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString()); |
|
87 |
} |
|
88 |
|
|
89 |
return htDescriptor.toString(); |
|
90 | 84 |
} |
91 | 85 |
|
92 | 86 |
public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
93 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
94 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
95 |
return desc.getFamilies().stream() |
|
96 |
.map(d -> d.getNameAsString()) |
|
97 |
.collect(Collectors.toList()); |
|
87 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
88 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
89 |
return desc.getFamilies().stream() |
|
90 |
.map(d -> d.getNameAsString()) |
|
91 |
.collect(Collectors.toList()); |
|
92 |
} |
|
98 | 93 |
} |
99 | 94 |
|
100 | 95 |
public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
101 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
96 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
102 | 97 |
|
103 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table"); |
|
98 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
|
|
104 | 99 |
|
105 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
100 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
|
|
106 | 101 |
|
107 |
log.info("disabling table: " + table); |
|
108 |
admin.disableTable(table); |
|
102 |
log.info("disabling table: " + table);
|
|
103 |
admin.disableTable(table);
|
|
109 | 104 |
|
110 |
log.info("deleting table: " + table); |
|
111 |
admin.deleteTable(table); |
|
105 |
log.info("deleting table: " + table);
|
|
106 |
admin.deleteTable(table);
|
|
112 | 107 |
|
113 |
log.info("creating table: " + table); |
|
114 |
admin.createTable(desc); |
|
108 |
log.info("creating table: " + table); |
|
109 |
admin.createTable(desc); |
|
110 |
} |
|
115 | 111 |
} |
116 | 112 |
|
117 | 113 |
public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
118 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
114 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
119 | 115 |
|
120 |
return admin.tableExists(table); |
|
116 |
return admin.tableExists(table); |
|
117 |
} |
|
121 | 118 |
} |
122 | 119 |
|
123 | 120 |
public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
124 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
121 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
125 | 122 |
|
126 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'"); |
|
123 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
|
|
127 | 124 |
|
128 |
log.info("disabling table: " + table); |
|
129 |
admin.disableTable(table); |
|
125 |
log.info("disabling table: " + table);
|
|
126 |
admin.disableTable(table);
|
|
130 | 127 |
|
131 |
log.info("deleting table: " + table); |
|
132 |
admin.deleteTable(table); |
|
128 |
log.info("deleting table: " + table); |
|
129 |
admin.deleteTable(table); |
|
130 |
} |
|
133 | 131 |
} |
134 | 132 |
|
135 | 133 |
public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException { |
136 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
134 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
137 | 135 |
|
138 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
136 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
|
|
139 | 137 |
|
140 |
if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration"); |
|
138 |
if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
|
|
141 | 139 |
|
142 |
final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration); |
|
140 |
final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
|
|
143 | 141 |
|
144 |
doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions()); |
|
142 |
doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions()); |
|
143 |
} |
|
145 | 144 |
} |
146 | 145 |
|
147 | 146 |
public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
148 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
147 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
149 | 148 |
|
150 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
149 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
|
|
151 | 150 |
|
152 |
doCreateTable(clusterName, table, columns, null); |
|
151 |
doCreateTable(clusterName, table, columns, null); |
|
152 |
} |
|
153 | 153 |
} |
154 | 154 |
|
155 | 155 |
public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions) |
156 | 156 |
throws IOException, HadoopServiceException { |
157 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
157 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
158 | 158 |
|
159 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
159 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
|
|
160 | 160 |
|
161 |
final HTableDescriptor desc = new HTableDescriptor(table); |
|
162 |
for (final String column : columns) { |
|
163 |
final HColumnDescriptor hds = new HColumnDescriptor(column); |
|
164 |
hds.setMaxVersions(getMaxVersions()); |
|
165 |
desc.addFamily(hds); |
|
166 |
} |
|
161 |
final HTableDescriptor desc = new HTableDescriptor(table);
|
|
162 |
for (final String column : columns) {
|
|
163 |
final HColumnDescriptor hds = new HColumnDescriptor(column);
|
|
164 |
hds.setMaxVersions(getMaxVersions());
|
|
165 |
desc.addFamily(hds);
|
|
166 |
}
|
|
167 | 167 |
|
168 |
log.info("creating hbase table: " + table); |
|
168 |
log.info("creating hbase table: " + table);
|
|
169 | 169 |
|
170 |
if (regions != null && !regions.isEmpty()) { |
|
171 |
log.debug(String.format("create using %s regions: %s", regions.size(), regions)); |
|
172 |
admin.createTable(desc, getSplitKeys(regions)); |
|
173 |
} else { |
|
174 |
admin.createTable(desc); |
|
170 |
if (regions != null && !regions.isEmpty()) { |
|
171 |
log.debug(String.format("create using %s regions: %s", regions.size(), regions)); |
|
172 |
admin.createTable(desc, getSplitKeys(regions)); |
|
173 |
} else { |
|
174 |
admin.createTable(desc); |
|
175 |
} |
|
176 |
|
|
177 |
log.info("created hbase table: [" + table + "]"); |
|
178 |
log.debug("descriptor: [" + desc.toString() + "]"); |
|
175 | 179 |
} |
176 |
|
|
177 |
log.info("created hbase table: [" + table + "]"); |
|
178 |
log.debug("descriptor: [" + desc.toString() + "]"); |
|
179 | 180 |
} |
180 | 181 |
|
181 | 182 |
private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) { |
... | ... | |
187 | 188 |
} |
188 | 189 |
|
189 | 190 |
public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
191 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
190 | 192 |
|
191 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
193 |
if (!admin.tableExists(table)) { |
|
194 |
createTable(clusterName, table, columns); |
|
195 |
} else { |
|
196 |
final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table)); |
|
192 | 197 |
|
193 |
if (!admin.tableExists(table)) { |
|
194 |
createTable(clusterName, table, columns); |
|
195 |
} else { |
|
196 |
final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table)); |
|
198 |
final Set<String> foundColumns = desc.getFamilies().stream() |
|
199 |
.map(d -> d.getNameAsString()) |
|
200 |
.collect(Collectors.toCollection(HashSet::new)); |
|
197 | 201 |
|
198 |
final Set<String> foundColumns = desc.getFamilies().stream()
|
|
199 |
.map(d -> d.getNameAsString())
|
|
200 |
.collect(Collectors.toCollection(HashSet::new));
|
|
202 |
log.info("ensuring columns on table " + table + ": " + columns);
|
|
203 |
final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
|
|
204 |
if (!missingColumns.isEmpty()) {
|
|
201 | 205 |
|
202 |
log.info("ensuring columns on table " + table + ": " + columns);
|
|
203 |
final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
|
|
204 |
if (!missingColumns.isEmpty()) {
|
|
206 |
if (admin.isTableEnabled(table)) {
|
|
207 |
admin.disableTable(table);
|
|
208 |
}
|
|
205 | 209 |
|
206 |
if (admin.isTableEnabled(table)) { |
|
207 |
admin.disableTable(table); |
|
208 |
} |
|
210 |
for (final String column : missingColumns) { |
|
211 |
log.info("hbase table: '" + table + "', adding column: " + column); |
|
212 |
admin.addColumn(table, new HColumnDescriptor(column)); |
|
213 |
} |
|
209 | 214 |
|
210 |
for (final String column : missingColumns) { |
|
211 |
log.info("hbase table: '" + table + "', adding column: " + column); |
|
212 |
admin.addColumn(table, new HColumnDescriptor(column)); |
|
215 |
admin.enableTable(table); |
|
213 | 216 |
} |
214 |
|
|
215 |
admin.enableTable(table); |
|
216 | 217 |
} |
217 | 218 |
} |
218 | 219 |
} |
... | ... | |
317 | 318 |
|
318 | 319 |
public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException { |
319 | 320 |
final Configuration conf = configurationEnumerator.get(clusterName); |
320 |
final HTable table = new HTable(conf, tableName); |
|
321 |
try { |
|
321 |
try(final HTable table = new HTable(conf, tableName)) { |
|
322 | 322 |
final ResultScanner rs = table.getScanner(scan); |
323 | 323 |
try { |
324 | 324 |
return Lists.newArrayList(rs.iterator()); |
325 | 325 |
} finally { |
326 | 326 |
rs.close(); |
327 | 327 |
} |
328 |
} finally { |
|
329 |
table.close(); |
|
330 | 328 |
} |
331 | 329 |
} |
332 | 330 |
|
... | ... | |
336 | 334 |
|
337 | 335 |
final Configuration conf = configurationEnumerator.get(clusterName); |
338 | 336 |
|
339 |
try { |
|
340 |
final FileSystem hdfs = FileSystem.get(conf); |
|
337 |
try(final FileSystem hdfs = FileSystem.get(conf)) { |
|
341 | 338 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
342 | 339 |
|
343 | 340 |
if (hdfs.exists(absolutePath)) { |
... | ... | |
360 | 357 |
|
361 | 358 |
final Configuration conf = configurationEnumerator.get(clusterName); |
362 | 359 |
|
363 |
try { |
|
364 |
final FileSystem hdfs = FileSystem.get(conf); |
|
360 |
try(final FileSystem hdfs = FileSystem.get(conf)) { |
|
365 | 361 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
366 | 362 |
if (!hdfs.exists(absolutePath)) { |
367 | 363 |
hdfs.mkdirs(absolutePath); |
... | ... | |
388 | 384 |
throw new HadoopServiceException("invalid empty path"); |
389 | 385 |
|
390 | 386 |
final Configuration conf = configurationEnumerator.get(clusterName); |
391 |
try { |
|
392 |
final FileSystem hdfs = FileSystem.get(conf); |
|
387 |
try(final FileSystem hdfs = FileSystem.get(conf)) { |
|
393 | 388 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
394 | 389 |
return hdfs.exists(absolutePath); |
395 | 390 |
} catch (IOException e) { |
Also available in: Unified diff
lazy init of hadoop clients allow to define the user for HBase admin and jtClient once. The user is available as service property