Project

General

Profile

« Previous | Next » 

Revision 39575

added new rmi methods: deleteHdfsPath, createHdfsDirectory
added blackboard action: DELETE_HDFS_PATH

View differences:

HadoopServiceCore.java
1 1
package eu.dnetlib.data.hadoop;
2 2

  
3 3
import java.io.IOException;
4
import java.net.URI;
4 5
import java.util.List;
5 6
import java.util.Set;
6 7

  
8
import com.google.common.base.Function;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Lists;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
16
import org.apache.commons.lang.StringUtils;
7 17
import org.apache.commons.logging.Log;
8 18
import org.apache.commons.logging.LogFactory;
9 19
import org.apache.hadoop.conf.Configuration;
20
import org.apache.hadoop.fs.FileSystem;
21
import org.apache.hadoop.fs.Path;
10 22
import org.apache.hadoop.hbase.HColumnDescriptor;
11 23
import org.apache.hadoop.hbase.HTableDescriptor;
12
import org.apache.hadoop.hbase.client.Delete;
13
import org.apache.hadoop.hbase.client.Get;
14
import org.apache.hadoop.hbase.client.HBaseAdmin;
15
import org.apache.hadoop.hbase.client.HTable;
16
import org.apache.hadoop.hbase.client.Put;
17
import org.apache.hadoop.hbase.client.Result;
18
import org.apache.hadoop.hbase.client.ResultScanner;
19
import org.apache.hadoop.hbase.client.Scan;
24
import org.apache.hadoop.hbase.client.*;
20 25
import org.apache.hadoop.hbase.util.Bytes;
21 26
import org.springframework.beans.factory.annotation.Autowired;
22 27
import org.springframework.beans.factory.annotation.Required;
23 28

  
24
import com.google.common.base.Function;
25
import com.google.common.collect.Iterables;
26
import com.google.common.collect.Lists;
27
import com.google.common.collect.Sets;
28

  
29
import eu.dnetlib.data.hadoop.config.ClusterName;
30
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
31
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
32
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
33

  
34 29
public class HadoopServiceCore {
35 30

  
36 31
	private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM
37

  
38 32
	@Autowired
33
	protected ConfigurationEnumerator configurationEnumerator;
34
	@Autowired
39 35
	private HadoopClientMap clients;
40

  
41
	@Autowired
42
	protected ConfigurationEnumerator configurationEnumerator;
43

  
44 36
	private int maxVersions;
45 37

  
46 38
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
......
123 115
		log.debug("descriptor: [" + desc.toString() + "]");
124 116
	}
125 117

  
126
	class HBaseSchemaBuilder {
127

  
128
	}
129

  
130 118
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
131 119

  
132 120
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
......
155 143
		}
156 144
	}
157 145

  
158
	public void write(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
146
	public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
159 147
		final Configuration conf = configurationEnumerator.get(clusterName);
160 148
		final HTable table = new HTable(conf, tableName);
161 149
		try {
......
166 154
		}
167 155
	}
168 156

  
169
	public void delete(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
157
	public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
170 158
		final Configuration conf = configurationEnumerator.get(clusterName);
171 159
		final HTable table = new HTable(conf, tableName);
172 160
		try {
......
202 190
		}
203 191
	}
204 192

  
193
	public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException {
194
		if (StringUtils.isBlank(path))
195
			throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path.");
196

  
197
		final Configuration conf = configurationEnumerator.get(clusterName);
198

  
199
		try {
200
			final FileSystem hdfs = FileSystem.get(conf);
201
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
202

  
203
			if (hdfs.exists(absolutePath)) {
204
				log.debug("deleteFromHBase path: " + absolutePath.toString());
205
				hdfs.delete(absolutePath, true);
206
				log.info("deleted path: " + absolutePath.toString());
207
				return true;
208
			} else {
209
				log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString());
210
				return false;
211
			}
212
		} catch (IOException e) {
213
			throw new HadoopServiceException(e);
214
		}
215
	}
216

  
217
	public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException {
218
		if (StringUtils.isBlank(path))
219
			throw new HadoopServiceException("Cannot create an empty HDFS path.");
220

  
221
		final Configuration conf = configurationEnumerator.get(clusterName);
222

  
223
		try {
224
			final FileSystem hdfs = FileSystem.get(conf);
225
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
226
			if (!hdfs.exists(absolutePath)) {
227
				hdfs.mkdirs(absolutePath);
228
				log.info("created path: " + absolutePath.toString());
229
				return true;
230
			} else if (force) {
231
				log.info(String.format("found directory '%s', force delete it", absolutePath.toString()));
232
				hdfs.delete(absolutePath, true);
233

  
234
				hdfs.mkdirs(absolutePath);
235
				log.info("created path: " + absolutePath.toString());
236
				return true;
237
			} else {
238
				log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString()));
239
				return false;
240
			}
241
		} catch (IOException e) {
242
			throw new HadoopServiceException(e);
243
		}
244
	}
245

  
205 246
	public Configuration getClusterConiguration(final ClusterName clusterName) {
206 247
		return configurationEnumerator.get(clusterName);
207 248
	}
......
219 260
		return clients;
220 261
	}
221 262

  
263

  
222 264
}

Also available in: Unified diff