Revision 39575
Added by Claudio Atzori over 8 years ago
HadoopServiceCore.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.hadoop; |
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 |
import java.net.URI; |
|
4 | 5 |
import java.util.List; |
5 | 6 |
import java.util.Set; |
6 | 7 |
|
8 |
import com.google.common.base.Function; |
|
9 |
import com.google.common.collect.Iterables; |
|
10 |
import com.google.common.collect.Lists; |
|
11 |
import com.google.common.collect.Sets; |
|
12 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
13 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
14 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
15 |
import eu.dnetlib.data.hadoop.utils.HadoopUtils; |
|
16 |
import org.apache.commons.lang.StringUtils; |
|
7 | 17 |
import org.apache.commons.logging.Log; |
8 | 18 |
import org.apache.commons.logging.LogFactory; |
9 | 19 |
import org.apache.hadoop.conf.Configuration; |
20 |
import org.apache.hadoop.fs.FileSystem; |
|
21 |
import org.apache.hadoop.fs.Path; |
|
10 | 22 |
import org.apache.hadoop.hbase.HColumnDescriptor; |
11 | 23 |
import org.apache.hadoop.hbase.HTableDescriptor; |
12 |
import org.apache.hadoop.hbase.client.Delete; |
|
13 |
import org.apache.hadoop.hbase.client.Get; |
|
14 |
import org.apache.hadoop.hbase.client.HBaseAdmin; |
|
15 |
import org.apache.hadoop.hbase.client.HTable; |
|
16 |
import org.apache.hadoop.hbase.client.Put; |
|
17 |
import org.apache.hadoop.hbase.client.Result; |
|
18 |
import org.apache.hadoop.hbase.client.ResultScanner; |
|
19 |
import org.apache.hadoop.hbase.client.Scan; |
|
24 |
import org.apache.hadoop.hbase.client.*; |
|
20 | 25 |
import org.apache.hadoop.hbase.util.Bytes; |
21 | 26 |
import org.springframework.beans.factory.annotation.Autowired; |
22 | 27 |
import org.springframework.beans.factory.annotation.Required; |
23 | 28 |
|
24 |
import com.google.common.base.Function; |
|
25 |
import com.google.common.collect.Iterables; |
|
26 |
import com.google.common.collect.Lists; |
|
27 |
import com.google.common.collect.Sets; |
|
28 |
|
|
29 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
30 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
31 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
32 |
import eu.dnetlib.data.hadoop.utils.HadoopUtils; |
|
33 |
|
|
34 | 29 |
public class HadoopServiceCore { |
35 | 30 |
|
36 | 31 |
private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM |
37 |
|
|
38 | 32 |
@Autowired |
33 |
protected ConfigurationEnumerator configurationEnumerator; |
|
34 |
@Autowired |
|
39 | 35 |
private HadoopClientMap clients; |
40 |
|
|
41 |
@Autowired |
|
42 |
protected ConfigurationEnumerator configurationEnumerator; |
|
43 |
|
|
44 | 36 |
private int maxVersions; |
45 | 37 |
|
46 | 38 |
public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException { |
... | ... | |
123 | 115 |
log.debug("descriptor: [" + desc.toString() + "]"); |
124 | 116 |
} |
125 | 117 |
|
126 |
class HBaseSchemaBuilder { |
|
127 |
|
|
128 |
} |
|
129 |
|
|
130 | 118 |
public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
131 | 119 |
|
132 | 120 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
... | ... | |
155 | 143 |
} |
156 | 144 |
} |
157 | 145 |
|
158 |
public void write(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException { |
|
146 |
public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
|
|
159 | 147 |
final Configuration conf = configurationEnumerator.get(clusterName); |
160 | 148 |
final HTable table = new HTable(conf, tableName); |
161 | 149 |
try { |
... | ... | |
166 | 154 |
} |
167 | 155 |
} |
168 | 156 |
|
169 |
public void delete(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException { |
|
157 |
public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
|
|
170 | 158 |
final Configuration conf = configurationEnumerator.get(clusterName); |
171 | 159 |
final HTable table = new HTable(conf, tableName); |
172 | 160 |
try { |
... | ... | |
202 | 190 |
} |
203 | 191 |
} |
204 | 192 |
|
193 |
public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException { |
|
194 |
if (StringUtils.isBlank(path)) |
|
195 |
throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path."); |
|
196 |
|
|
197 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
198 |
|
|
199 |
try { |
|
200 |
final FileSystem hdfs = FileSystem.get(conf); |
|
201 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
|
202 |
|
|
203 |
if (hdfs.exists(absolutePath)) { |
|
204 |
log.debug("deleteFromHBase path: " + absolutePath.toString()); |
|
205 |
hdfs.delete(absolutePath, true); |
|
206 |
log.info("deleted path: " + absolutePath.toString()); |
|
207 |
return true; |
|
208 |
} else { |
|
209 |
log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString()); |
|
210 |
return false; |
|
211 |
} |
|
212 |
} catch (IOException e) { |
|
213 |
throw new HadoopServiceException(e); |
|
214 |
} |
|
215 |
} |
|
216 |
|
|
217 |
public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException { |
|
218 |
if (StringUtils.isBlank(path)) |
|
219 |
throw new HadoopServiceException("Cannot create an empty HDFS path."); |
|
220 |
|
|
221 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
222 |
|
|
223 |
try { |
|
224 |
final FileSystem hdfs = FileSystem.get(conf); |
|
225 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
|
226 |
if (!hdfs.exists(absolutePath)) { |
|
227 |
hdfs.mkdirs(absolutePath); |
|
228 |
log.info("created path: " + absolutePath.toString()); |
|
229 |
return true; |
|
230 |
} else if (force) { |
|
231 |
log.info(String.format("found directory '%s', force delete it", absolutePath.toString())); |
|
232 |
hdfs.delete(absolutePath, true); |
|
233 |
|
|
234 |
hdfs.mkdirs(absolutePath); |
|
235 |
log.info("created path: " + absolutePath.toString()); |
|
236 |
return true; |
|
237 |
} else { |
|
238 |
log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString())); |
|
239 |
return false; |
|
240 |
} |
|
241 |
} catch (IOException e) { |
|
242 |
throw new HadoopServiceException(e); |
|
243 |
} |
|
244 |
} |
|
245 |
|
|
205 | 246 |
public Configuration getClusterConiguration(final ClusterName clusterName) { |
206 | 247 |
return configurationEnumerator.get(clusterName); |
207 | 248 |
} |
... | ... | |
219 | 260 |
return clients; |
220 | 261 |
} |
221 | 262 |
|
263 |
|
|
222 | 264 |
} |
Also available in: Unified diff
added new rmi methods: deleteHdfsPath, createHdfsDirectory
added blackboard action: DELETE_HDFS_PATH