Project

General

Profile

« Previous | Next » 

Revision 39732

[maven-release-plugin] copy for tag dnet-hadoop-service-2.4.1

View differences:

modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/pom.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3
	<parent>
4
		<groupId>eu.dnetlib</groupId>
5
		<artifactId>dnet-parent</artifactId>
6
		<version>1.0.0</version>
7
	</parent>
8
	<modelVersion>4.0.0</modelVersion>
9
	<groupId>eu.dnetlib</groupId>
10
	<artifactId>dnet-hadoop-service</artifactId>
11
	<packaging>jar</packaging>
12
	<version>2.4.1</version>
13
	<scm>
14
	    <developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet40/modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1</developerConnection>
15
	</scm>
16
	<repositories>
17
		<!-- Cloudera Repositories -->
18
		<repository>
19
			<snapshots>
20
				<enabled>false</enabled>
21
			</snapshots>
22
			<id>cloudera-central</id>
23
			<name>cloundera-libs-release</name>
24
			<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-central</url>
25
		</repository>
26
		<repository>
27
			<id>cloudera-snapshots</id>
28
			<name>cloudera-libs-snapshot</name>
29
			<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-snapshots</url>
30
		</repository>
31
	</repositories>
32
	<dependencies>
33
		<dependency>
34
			<groupId>eu.dnetlib</groupId>
35
			<artifactId>dnet-hadoop-service-rmi</artifactId>
36
			<version>[1.0.0,2.0.0)</version>
37
		</dependency>
38
		<dependency>
39
			<groupId>com.google.code.gson</groupId>
40
			<artifactId>gson</artifactId>
41
			<version>${google.gson.version}</version>
42
		</dependency>
43
		<dependency>
44
			<groupId>eu.dnetlib</groupId>
45
			<artifactId>cnr-blackboard-common</artifactId>
46
			<version>[2.1.0,3.0.0)</version>
47
		</dependency>
48
		<dependency>
49
			<groupId>eu.dnetlib</groupId>
50
			<artifactId>cnr-resultset-client</artifactId>
51
			<version>[2.0.0,3.0.0)</version>
52
			<exclusions>
53
				<exclusion>
54
					<artifactId>slf4j-api</artifactId>
55
					<groupId>org.slf4j</groupId>
56
				</exclusion>
57
			</exclusions>
58
		</dependency>
59
		<dependency>
60
			<groupId>eu.dnetlib</groupId>
61
			<artifactId>dnet-mapreduce-submitter</artifactId>
62
			<version>[2.0.0,3.0.0)</version>
63
		</dependency>
64
		<dependency>
65
			<groupId>org.apache.hbase</groupId>
66
			<artifactId>hbase</artifactId>
67
			<version>${apache.hbase.version}</version>
68
			<exclusions>
69
				<exclusion>
70
					<artifactId>jsp-api-2.1</artifactId>
71
					<groupId>org.mortbay.jetty</groupId>
72
				</exclusion>
73
				<exclusion>
74
					<artifactId>servlet-api-2.5</artifactId>
75
					<groupId>org.mortbay.jetty</groupId>
76
				</exclusion>
77
				<exclusion>
78
					<artifactId>slf4j-api</artifactId>
79
					<groupId>org.slf4j</groupId>
80
				</exclusion>
81
				<exclusion>
82
					<artifactId>slf4j-log4j12</artifactId>
83
					<groupId>org.slf4j</groupId>
84
				</exclusion>
85
				<exclusion>
86
					<artifactId>stax-api</artifactId>
87
					<groupId>stax</groupId>
88
				</exclusion>
89
				<exclusion>
90
					<artifactId>httpclient</artifactId>
91
					<groupId>org.apache.httpcomponents</groupId>
92
				</exclusion>
93
				<exclusion>
94
					<artifactId>guava</artifactId>
95
					<groupId>com.google.guava</groupId>
96
				</exclusion>
97
			</exclusions>
98
		</dependency>
99
		<dependency>
100
			<groupId>org.apache.oozie</groupId>
101
			<artifactId>oozie-client</artifactId>
102
			<version>${apache.oozie.version}</version>
103
			<exclusions>
104
				<exclusion>
105
					<artifactId>slf4j-simple</artifactId>
106
					<groupId>org.slf4j</groupId>
107
				</exclusion>
108
				<exclusion>
109
					<artifactId>slf4j-api</artifactId>
110
					<groupId>org.slf4j</groupId>
111
				</exclusion>
112
				<exclusion>
113
					<artifactId>xercesImpl</artifactId>
114
					<groupId>xerces</groupId>
115
				</exclusion>
116
			</exclusions>
117
		</dependency>
118
		<dependency>
119
			<groupId>junit</groupId>
120
			<artifactId>junit</artifactId>
121
			<version>${junit.version}</version>
122
			<scope>test</scope>
123
		</dependency>
124
		<dependency>
125
			<groupId>org.apache.logging.log4j</groupId>
126
			<artifactId>log4j-slf4j-impl</artifactId>
127
			<version>2.0.2</version>
128
			<scope>test</scope>
129
		</dependency>
130
		<dependency>
131
			<groupId>org.apache.httpcomponents</groupId>
132
			<artifactId>httpclient</artifactId>
133
			<version>4.3.1</version>
134
		</dependency>		
135
	</dependencies>
136
</project>
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.io.IOException;
4
import java.net.URI;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Map.Entry;
8
import java.util.NavigableMap;
9
import java.util.Set;
10

  
11
import com.google.common.base.Function;
12
import com.google.common.collect.Iterables;
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Sets;
16
import eu.dnetlib.data.hadoop.config.ClusterName;
17
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
18
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
19
import eu.dnetlib.data.hadoop.rmi.hbase.Column;
20
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
21
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
22
import org.apache.commons.lang.StringUtils;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.apache.hadoop.conf.Configuration;
26
import org.apache.hadoop.fs.FileSystem;
27
import org.apache.hadoop.fs.Path;
28
import org.apache.hadoop.hbase.HColumnDescriptor;
29
import org.apache.hadoop.hbase.HTableDescriptor;
30
import org.apache.hadoop.hbase.client.*;
31
import org.apache.hadoop.hbase.util.Bytes;
32
import org.springframework.beans.factory.annotation.Autowired;
33
import org.springframework.beans.factory.annotation.Required;
34

  
35
public class HadoopServiceCore {
36

  
37
	private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM
38
	@Autowired
39
	protected ConfigurationEnumerator configurationEnumerator;
40
	@Autowired
41
	private HadoopClientMap clients;
42
	private int maxVersions;
43

  
44
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
45
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
46
		return Lists.newArrayList(Iterables.transform(Lists.newArrayList(admin.listTables()), HadoopUtils.tableName()));
47
	}
48

  
49
	private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException {
50
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
51

  
52
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
53

  
54
		return admin;
55
	}
56

  
57
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
58
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
59
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
60

  
61
		return Lists.newArrayList(Iterables.transform(desc.getFamilies(), new Function<HColumnDescriptor, String>() {
62

  
63
			@Override
64
			public String apply(final HColumnDescriptor desc) {
65
				return desc.getNameAsString();
66
			}
67
		}));
68
	}
69

  
70
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
71
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
72

  
73
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
74

  
75
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
76

  
77
		log.info("disabling table: " + table);
78
		admin.disableTable(table);
79

  
80
		log.info("deleting table: " + table);
81
		admin.deleteTable(table);
82

  
83
		log.info("creating table: " + table);
84
		admin.createTable(desc);
85
	}
86

  
87
	public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
88
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
89

  
90
		return admin.tableExists(table);
91
	}
92

  
93
	public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
94
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
95

  
96
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
97

  
98
		log.info("disabling table: " + table);
99
		admin.disableTable(table);
100

  
101
		log.info("deleting table: " + table);
102
		admin.deleteTable(table);
103
	}
104

  
105
	public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
106
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
107

  
108
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
109

  
110
		final HTableDescriptor desc = new HTableDescriptor(table);
111
		for (final String column : columns) {
112
			final HColumnDescriptor hds = new HColumnDescriptor(column);
113
			hds.setMaxVersions(getMaxVersions());
114
			desc.addFamily(hds);
115
		}
116
		log.info("creating hbase table: " + table);
117

  
118
		admin.createTable(desc);
119

  
120
		log.info("created hbase table: [" + table + "]");
121
		log.debug("descriptor: [" + desc.toString() + "]");
122
	}
123

  
124
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
125

  
126
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
127

  
128
		if (!admin.tableExists(table)) {
129
			createTable(clusterName, table, columns);
130
		} else {
131
			final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
132
			final Set<String> foundColumns = Sets.newHashSet(Iterables.transform(Lists.newArrayList(desc.getColumnFamilies()), HadoopUtils.columnName()));
133

  
134
			log.info("ensuring columns on table " + table + ": " + columns);
135
			final List<String> missingColumns = Lists.newArrayList(Iterables.filter(columns, HadoopUtils.columnPredicate(foundColumns)));
136
			if (!missingColumns.isEmpty()) {
137

  
138
				if (admin.isTableEnabled(table)) {
139
					admin.disableTable(table);
140
				}
141

  
142
				for (final String column : missingColumns) {
143
					log.info("hbase table: '" + table + "', adding column: " + column);
144
					admin.addColumn(table, new HColumnDescriptor(column));
145
				}
146

  
147
				admin.enableTable(table);
148
			}
149
		}
150
	}
151

  
152
	public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
153
		final Configuration conf = configurationEnumerator.get(clusterName);
154
		final HTable table = new HTable(conf, tableName);
155

  
156
		try {
157
			table.put(puts);
158
		} finally {
159
			table.flushCommits();
160
			table.close();
161
		}
162
	}
163

  
164
	public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
165
		final Configuration conf = configurationEnumerator.get(clusterName);
166
		final HTable table = new HTable(conf, tableName);
167
		try {
168
			table.delete(deletes);
169
		} finally {
170
			table.flushCommits();
171
			table.close();
172
		}
173
	}
174

  
175
	public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException {
176
		final Configuration conf = configurationEnumerator.get(clusterName);
177
		final HTable table = new HTable(conf, tableName);
178
		try {
179
			final List<Delete> deletes = Lists.newArrayList();
180
			for(HBaseRowDescriptor desc : columns) {
181

  
182
				final Delete d = new Delete(Bytes.toBytes(desc.getRowKey()));
183

  
184
				for(Column c : desc.getColumns()) {
185
					for(String qualifier : c.getQualifier()) {
186
						log.info(String.format("delete from row '%s' cf '%s::%s'", desc.getRowKey(), c.getFamily(), qualifier));
187
						d.deleteColumn(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier));
188
					}
189
				}
190

  
191
				deletes.add(d);
192
			}
193
			table.delete(deletes);
194

  
195
		} finally {
196
			table.flushCommits();
197
			table.close();
198
		}
199
	}
200

  
201
	public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException {
202
		final Configuration conf = configurationEnumerator.get(clusterName);
203
		final HTable table = new HTable(conf, tableName);
204
		try {
205
			return table.get(new Get(id));
206
		} finally {
207
			table.close();
208
		}
209
	}
210

  
211
	public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException {
212
		final Map<String, HBaseRowDescriptor> map = Maps.newHashMap();
213
		for(String rowKey : rowKeys) {
214
			map.put(rowKey, describeRow(clusterName, tableName, rowKey));
215
		}
216
		return map;
217
	}
218

  
219
	public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException {
220
		final Configuration conf = configurationEnumerator.get(clusterName);
221
		final HTable table = new HTable(conf, tableName);
222

  
223
		final HBaseRowDescriptor desc = new HBaseRowDescriptor();
224

  
225
		try {
226
			final Result r = table.get(new Get(Bytes.toBytes(rowKey)));
227

  
228
			if (r.isEmpty()) {
229
				return desc;
230
			}
231

  
232
			final List<Column> columns = Lists.newArrayList();
233

  
234
			for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) {
235
				final Set<byte[]> qualifiers = e.getValue().keySet();
236
				final String family = new String(e.getKey());
237
				final Column col = new Column(family);
238

  
239
				for(byte[] q : qualifiers) {
240
					String qs = new String(q);
241
					col.getQualifier().add(qs);
242
				}
243
				columns.add(col);
244
			}
245
			desc.setColumns(columns);
246
			desc.setRowKey(rowKey);
247

  
248
			return desc;
249
		} finally {
250
			table.close();
251
		}
252
	}
253

  
254
	public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException {
255
		final Configuration conf = configurationEnumerator.get(clusterName);
256
		final HTable table = new HTable(conf, tableName);
257
		try {
258
			final ResultScanner rs = table.getScanner(scan);
259
			try {
260
				return Lists.newArrayList(rs.iterator());
261
			} finally {
262
				rs.close();
263
			}
264
		} finally {
265
			table.close();
266
		}
267
	}
268

  
269
	public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException {
270
		if (StringUtils.isBlank(path))
271
			throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path.");
272

  
273
		final Configuration conf = configurationEnumerator.get(clusterName);
274

  
275
		try {
276
			final FileSystem hdfs = FileSystem.get(conf);
277
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
278

  
279
			if (hdfs.exists(absolutePath)) {
280
				log.debug("deleteFromHBase path: " + absolutePath.toString());
281
				hdfs.delete(absolutePath, true);
282
				log.info("deleted path: " + absolutePath.toString());
283
				return true;
284
			} else {
285
				log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString());
286
				return false;
287
			}
288
		} catch (IOException e) {
289
			throw new HadoopServiceException(e);
290
		}
291
	}
292

  
293
	public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException {
294
		if (StringUtils.isBlank(path))
295
			throw new HadoopServiceException("Cannot create an empty HDFS path.");
296

  
297
		final Configuration conf = configurationEnumerator.get(clusterName);
298

  
299
		try {
300
			final FileSystem hdfs = FileSystem.get(conf);
301
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
302
			if (!hdfs.exists(absolutePath)) {
303
				hdfs.mkdirs(absolutePath);
304
				log.info("created path: " + absolutePath.toString());
305
				return true;
306
			} else if (force) {
307
				log.info(String.format("found directory '%s', force delete it", absolutePath.toString()));
308
				hdfs.delete(absolutePath, true);
309

  
310
				hdfs.mkdirs(absolutePath);
311
				log.info("created path: " + absolutePath.toString());
312
				return true;
313
			} else {
314
				log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString()));
315
				return false;
316
			}
317
		} catch (IOException e) {
318
			throw new HadoopServiceException(e);
319
		}
320
	}
321

  
322
	public Configuration getClusterConiguration(final ClusterName clusterName) {
323
		return configurationEnumerator.get(clusterName);
324
	}
325

  
326
	public int getMaxVersions() {
327
		return maxVersions;
328
	}
329

  
330
	@Required
331
	public void setMaxVersions(final int maxVersions) {
332
		this.maxVersions = maxVersions;
333
	}
334

  
335
	public HadoopClientMap getClients() {
336
		return clients;
337
	}
338

  
339
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTestContextConfiguration.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import org.springframework.context.annotation.Bean;
4
import org.springframework.context.annotation.Configuration;
5
import org.springframework.context.annotation.Profile;
6
import org.springframework.core.io.ClassPathResource;
7
import org.springframework.core.io.Resource;
8

  
9
import eu.dnetlib.data.hadoop.HadoopClientMap;
10
import eu.dnetlib.data.hadoop.HadoopServiceCore;
11
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
12
import eu.dnetlib.data.hadoop.config.ConfigurationFactory;
13
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory;
14
import eu.dnetlib.data.mapreduce.JobClientFactory;
15

  
16
@Configuration
17
@Profile(value = "test")
18
public class HBaseTestContextConfiguration {
19

  
20
	public static final String ENABLED_CLIENTS = "{"
21
			+ "\"DM\":{\"oozie\":\"false\",\"mapred\":\"false\",\"hbase\":\"true\"},"
22
			+ "\"IIS\":{\"oozie\":\"false\",\"mapred\":\"false\",\"hbase\":\"false\"}"
23
			+ "}";
24

  
25
	public static final int MAX_VERSIONS = 10;
26

  
27
	@Bean
28
	public HadoopServiceCore hadoopServiceCore() {
29
		final HadoopServiceCore core = new HadoopServiceCore();
30

  
31
		core.setMaxVersions(MAX_VERSIONS);
32

  
33
		System.out.println("using hbase max versions: " + MAX_VERSIONS);
34
		return core;
35
	}
36

  
37
	@Bean(initMethod = "init")
38
	public HadoopClientMap hadoopClientMap() throws InterruptedException {
39
		final HadoopClientMap clientMap = new HadoopClientMap();
40
		clientMap.setEnabledClients(ENABLED_CLIENTS);
41
		clientMap.setClientsInitTime(10);
42

  
43
		return clientMap;
44
	}
45

  
46
	@Bean
47
	public HBaseAdminFactory hBaseAdminFactory() {
48
		return new HBaseAdminFactory();
49
	}
50

  
51
	@Bean
52
	public OozieClientFactory oozieClientFactory() {
53
		return new OozieClientFactory();
54
	}
55

  
56
	@Bean
57
	public JobClientFactory jobClientFactory() {
58
		return new JobClientFactory();
59
	}
60

  
61
	@Bean
62
	public ConfigurationEnumerator configurationEnumerator() {
63
		return new ConfigurationEnumerator();
64
	}
65

  
66
	@Bean
67
	public ConfigurationFactory DM() {
68
		return get(new ClassPathResource("/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties"));
69
	}
70

  
71
	@Bean
72
	public ConfigurationFactory IIS() {
73
		return get(new ClassPathResource("/eu/dnetlib/data/hadoop/config/hadoop-default.iis.icm.properties"));
74
	}
75

  
76
	protected ConfigurationFactory get(final Resource props) {
77
		final ConfigurationFactory configurationFactory = new ConfigurationFactory();
78
		configurationFactory.setDefaults(props);
79
		return configurationFactory;
80
	}
81

  
82
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieJobMonitor.java
1
package eu.dnetlib.data.hadoop.oozie;
2

  
3
import java.util.Date;
4
import java.util.HashMap;
5

  
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.apache.oozie.client.OozieClient;
9
import org.apache.oozie.client.OozieClientException;
10
import org.apache.oozie.client.WorkflowJob.Status;
11

  
12
import eu.dnetlib.data.hadoop.action.JobCompletion;
13
import eu.dnetlib.data.hadoop.action.JobMonitor;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15

  
16
public class OozieJobMonitor extends JobMonitor {
17

  
18
	private static final Log log = LogFactory.getLog(JobMonitor.class); // NOPMD by marko on 11/24/08 5:02 PM
19

  
20
	private final OozieClient oozieClient;
21

  
22
	private final String jobId;
23

  
24
	public OozieJobMonitor(OozieClient oozieClient, String jobId, JobCompletion callback) {
25
		super(callback);
26
		this.oozieClient = oozieClient;
27
		this.jobId = jobId;
28
	}
29

  
30
	@Override
31
	public void run() {
32
		try {
33
			log.info("waiting for oozie job completion: " + getHadoopId());
34

  
35
			Status status = doGetStatus();
36
			while (status.equals(Status.RUNNING)) {
37
				Thread.sleep(monitorSleepTimeSecs * 1000);
38

  
39
				Status currentStatus = doGetStatus();
40
				if (!status.equals(currentStatus)) {
41
					status = currentStatus;
42
					lastActivity = new Date(System.currentTimeMillis());
43
				}
44
			}
45

  
46
			log.debug("job " + jobId + " finihsed with status: " + status);
47
			if (status == Status.SUCCEEDED) {
48
				// TODO set some content to return to the blackboard msg.
49
				getCallback().done(new HashMap<String, String>());
50
            } else {
51
				// TODO retrieve some failure information from the oozie client.
52
				String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus() + ", oozie log:\n "
53
						+ getOozieClient().getJobLog(getHadoopId()) + "\n\n";
54
				getCallback().failed(msg, new HadoopServiceException(msg));
55
            }
56
		} catch (Throwable e) {
57
			getCallback().failed(getHadoopId(), e);
58
		}
59
	}
60

  
61
	@Override
62
	public String getHadoopId() {
63
		return jobId;
64
	}
65

  
66
	public OozieClient getOozieClient() {
67
		return oozieClient;
68
	}
69

  
70
	@Override
71
	public String getStatus() {
72
		try {
73
			return doGetStatus().toString();
74
		} catch (OozieClientException e) {
75
			log.error("error accessing job status", e);
76
			return "UNKNOWN";
77
		}
78
	}
79

  
80
	private Status doGetStatus() throws OozieClientException {
81
		return getOozieClient().getJobInfo(getHadoopId()).getStatus();
82
	}
83

  
84
	@Override
85
	public Date getLastActivity() {
86
		return lastActivity;
87
	}
88

  
89
	@Override
90
	public Date getStartTime() throws HadoopServiceException {
91
		try {
92
			return getOozieClient().getJobInfo(getHadoopId()).getStartTime();
93
		} catch (OozieClientException e) {
94
			throw new HadoopServiceException("unable to read job start time", e);
95
		}
96
	}
97

  
98
	@Override
99
	public String getTrackerUrl() {
100
		return getOozieClient().getOozieUrl();
101
	}
102

  
103
	@Override
104
	public void kill() {
105
		try {
106
			getOozieClient().kill(getHadoopId());
107
		} catch (OozieClientException e) {
108
			log.error("unable to kill job: " + getHadoopId(), e);
109
		}
110
	}
111

  
112
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceImpl.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Map.Entry;
7
import java.util.Set;
8

  
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import eu.dnetlib.data.hadoop.config.ClusterName;
12
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
13
import eu.dnetlib.data.hadoop.rmi.HadoopService;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
16
import eu.dnetlib.enabling.tools.AbstractBaseService;
17
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler;
18
import org.apache.hadoop.conf.Configuration;
19
import org.springframework.beans.factory.annotation.Autowired;
20
import org.springframework.beans.factory.annotation.Required;
21

  
22
/**
23
 * The Class HadoopServiceImpl.
24
 */
25
public class HadoopServiceImpl extends AbstractBaseService implements HadoopService {
26

  
27
	/**
28
	 * notification handler.
29
	 */
30
	private NotificationHandler notificationHandler;
31

  
32
	/** The hadoop service core. */
33
	@Autowired
34
	private HadoopServiceCore hadoopServiceCore;
35

  
36
	/** The job registry. */
37
	@Autowired
38
	private JobRegistry jobRegistry;
39

  
40
	/*
41
	 * (non-Javadoc)
42
	 * 
43
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listAvailableJobs()
44
	 */
45
	@Override
46
	public List<String> listAvailableJobs() throws HadoopServiceException {
47
		List<String> res = Lists.newArrayList();
48
		return res;
49
	}
50

  
51
	/*
52
	 * (non-Javadoc)
53
	 * 
54
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listJobs(java.lang.String)
55
	 */
56
	@Override
57
	public List<HadoopJobDescriptor> listJobs(final String clusterName) throws HadoopServiceException {
58
		return jobRegistry.listJobs(checkExists(clusterName));
59
	}
60

  
61
	/*
62
	 * (non-Javadoc)
63
	 * 
64
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#killJob(java.lang.String)
65
	 */
66
	@Override
67
	public boolean killJob(final String jobId) throws HadoopServiceException {
68
		jobRegistry.unregisterJob(jobId);
69
		return true;
70
	}
71

  
72
	/*
73
	 * (non-Javadoc)
74
	 * 
75
	 * @see eu.dnetlib.enabling.tools.AbstractBaseService#notify(java.lang.String, java.lang.String, java.lang.String, java.lang.String)
76
	 */
77
	@Override
78
	public void notify(final String subscriptionId, final String topic, final String isId, final String message) {
79
		getNotificationHandler().notified(subscriptionId, topic, isId, message);
80
	}
81

  
82
	/*
83
	 * (non-Javadoc)
84
	 * 
85
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#listHbaseTables(java.lang.String)
86
	 */
87
	@Override
88
	public List<String> listHbaseTables(final String clusterName) throws HadoopServiceException {
89
		try {
90
			return hadoopServiceCore.listTables(checkExists(clusterName));
91
		} catch (IOException e) {
92
			throw new HadoopServiceException(e);
93
		}
94
	}
95

  
96
	/*
97
	 * (non-Javadoc)
98
	 * 
99
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#ensureHbaseTable(java.lang.String, java.lang.String, java.util.Set)
100
	 */
101
	@Override
102
	public boolean ensureHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
103
		try {
104
			hadoopServiceCore.ensureTable(checkExists(clusterName), tableName, columns);
105
			return true;
106
		} catch (IOException e) {
107
			throw new HadoopServiceException(e);
108
		}
109
	}
110

  
111
	/*
112
	 * (non-Javadoc)
113
	 * 
114
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.Set)
115
	 */
116
	@Override
117
	public boolean createHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException {
118
		try {
119
			hadoopServiceCore.createTable(checkExists(clusterName), tableName, columns);
120
			return true;
121
		} catch (IOException e) {
122
			throw new HadoopServiceException(e);
123
		}
124
	}
125

  
126
	/*
127
	 * (non-Javadoc)
128
	 * 
129
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#truncateHbaseTable(java.lang.String, java.lang.String)
130
	 */
131
	@Override
132
	public boolean truncateHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
133
		try {
134
			hadoopServiceCore.truncateTable(checkExists(clusterName), tableName);
135
		} catch (IOException e) {
136
			throw new HadoopServiceException(e);
137
		}
138
		return true;
139
	}
140

  
141
	/*
142
	 * (non-Javadoc)
143
	 * 
144
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#dropHbaseTable(java.lang.String, java.lang.String)
145
	 */
146
	@Override
147
	public boolean dropHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
148
		try {
149
			hadoopServiceCore.dropTable(checkExists(clusterName), tableName);
150
		} catch (IOException e) {
151
			throw new HadoopServiceException(e);
152
		}
153
		return true;
154
	}
155

  
156
	/*
157
	 * (non-Javadoc)
158
	 * 
159
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#existHbaseTable(java.lang.String, java.lang.String)
160
	 */
161
	@Override
162
	public boolean existHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
163
		try {
164
			return hadoopServiceCore.existTable(checkExists(clusterName), tableName);
165
		} catch (IOException e) {
166
			throw new HadoopServiceException(e);
167
		}
168
	}
169

  
170
	/*
171
	 * (non-Javadoc)
172
	 * 
173
	 * @see eu.dnetlib.data.hadoop.rmi.HadoopService#getClusterConfiguration(java.lang.String)
174
	 */
175
	@Override
176
	public Map<String, String> getClusterConfiguration(final String clusterName) throws HadoopServiceException {
177

  
178
		final Configuration conf = hadoopServiceCore.getClusterConiguration(checkExists(clusterName));
179
		final Map<String, String> res = Maps.newHashMap();
180
		for (Entry<String, String> e : conf) {
181
			res.put(e.getKey(), e.getValue());
182
		}
183

  
184
		return res;
185
	}
186

  
187
	@Override
188
	public boolean deleteHdfsPath(final String clusterName, final String path) throws HadoopServiceException {
189
		return hadoopServiceCore.deleteFromHdfs(checkExists(clusterName), path);
190
	}
191

  
192
	@Override
193
	public boolean createHdfsDirectory(final String clusterName, final String path, final boolean force) throws HadoopServiceException {
194
		return hadoopServiceCore.createHdfsDir(checkExists(clusterName), path, force);
195
	}
196

  
197
	@Override
198
	public List<String> listClusters() throws HadoopServiceException {
199
		try {
200
			return ClusterName.asStringList();
201
		} catch (Throwable e) {
202
			throw new HadoopServiceException(e);
203
		}
204
	}
205

  
206
	@Override
207
	public List<String> describeHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException {
208
		try {
209
			return hadoopServiceCore.describeTable(checkExists(clusterName), tableName);
210
		} catch (IOException e) {
211
			throw new HadoopServiceException(e);
212
		}
213
	}
214

  
215
	@Override
216
	public HBaseRowDescriptor describeHBaseColumn(final String clusterName, final String tableName, final String rowKey) throws HadoopServiceException {
217
		try {
218
			return hadoopServiceCore.describeRow(checkExists(clusterName), tableName, rowKey);
219
		} catch (IOException e) {
220
			throw new HadoopServiceException(e);
221
		}
222
	}
223

  
224
	@Override
225
	public Map<String, HBaseRowDescriptor> describeHBaseColumns(final String clusterName, final String tableName, final List<String> rowKeys) throws HadoopServiceException {
226
		try {
227
			return hadoopServiceCore.describeRows(checkExists(clusterName), tableName, rowKeys);
228
		} catch (IOException e) {
229
			throw new HadoopServiceException(e);
230
		}
231
	}
232

  
233
	@Override
234
	public boolean deleteHBaseColumn(final String clusterName, final String tableName, final HBaseRowDescriptor column) throws HadoopServiceException {
235
		try {
236
			hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, Lists.newArrayList(column));
237
			return true;
238
		} catch (IOException e) {
239
			throw new HadoopServiceException(e);
240
		}
241
	}
242

  
243
	@Override
244
	public boolean deleteHBaseColumns(final String clusterName, final String tableName, final List<HBaseRowDescriptor> column) throws HadoopServiceException {
245
		try {
246
			hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, column);
247
			return true;
248
		} catch (IOException e) {
249
			throw new HadoopServiceException(e);
250
		}
251
	}
252

  
253
	///////////////////
254

  
255
	/**
256
	 * Check exists.
257
	 *
258
	 * @param clusterName
259
	 *            the cluster name
260
	 * @return the cluster name
261
	 * @throws HadoopServiceException
262
	 *             the hadoop service exception
263
	 */
264
	private ClusterName checkExists(final String clusterName) throws HadoopServiceException {
265
		try {
266
			return ClusterName.valueOf(clusterName);
267
		} catch (final IllegalArgumentException e) {
268
			throw new HadoopServiceException("Invalid cluster name: " + clusterName);
269
		}
270
	}
271

  
272
	/**
273
	 * Gets the notification handler.
274
	 *
275
	 * @return the notification handler
276
	 */
277
	public NotificationHandler getNotificationHandler() {
278
		return notificationHandler;
279
	}
280

  
281
	/**
282
	 * Sets the notification handler.
283
	 *
284
	 * @param notificationHandler
285
	 *            the new notification handler
286
	 */
287
	@Required
288
	public void setNotificationHandler(final NotificationHandler notificationHandler) {
289
		this.notificationHandler = notificationHandler;
290
	}
291

  
292
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/utils/ScanFactory.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.io.ByteArrayOutputStream;
4
import java.io.DataOutputStream;
5
import java.io.IOException;
6
import java.util.Map;
7

  
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.hbase.client.Scan;
12
import org.apache.hadoop.hbase.filter.PrefixFilter;
13
import org.apache.hadoop.hbase.util.Base64;
14
import org.dom4j.Document;
15
import org.dom4j.Node;
16

  
17
public class ScanFactory {
18

  
19
	private static final Log log = LogFactory.getLog(ScanFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
20

  
21
	public static String getScan(final ScanProperties scanProperties) throws IOException {
22
		Scan scan = new Scan();
23

  
24
		scan.setCaching(scanProperties.getCaching());
25
		scan.setCacheBlocks(false); // don't set to true for MR jobs
26

  
27
		scan.setFilter(scanProperties.getFilterList());
28
		for (String family : scanProperties.getFamilies()) {
29
			scan.addFamily(family.getBytes());
30
		}
31

  
32
		log.debug("serializing scan");
33
		return convertScanToString(scan);
34
	}
35

  
36
	public static ScanProperties parseScanProperties(final Document doc, final Map<String, String> bbParams) {
37
		log.debug("setting job scanner");
38

  
39
		ScanProperties scanProperties = new ScanProperties(doc.valueOf("//FILTERS/@operator"));
40

  
41
		String caching = doc.valueOf("//SCAN/@caching");
42
		if (!StringUtils.isBlank(caching)) {
43
			log.info("overriding default scan caching with: " + caching);
44
			scanProperties.setCaching(Integer.valueOf(caching));
45
		}
46

  
47
		for (Object o : doc.selectNodes("//SCAN/FAMILIES/FAMILY")) {
48
			Node node = (Node) o;
49
			String value = node.valueOf("./@value");
50
			if ((value == null) || value.isEmpty()) {
51
				value = bbParams.get(node.valueOf("./@param"));
52
			}
53
			log.debug("scanner family value: " + value);
54
			scanProperties.getFamilies().add(value);
55
		}
56
		for (Object o : doc.selectNodes("//SCAN/FILTERS/FILTER")) {
57
			Node node = (Node) o;
58
			String filterType = node.valueOf("./@type");
59

  
60
			String value = node.valueOf("./@value");
61
			if ((value == null) || value.isEmpty()) {
62
				value = bbParams.get(node.valueOf("./@param"));
63
			}
64

  
65
			if (filterType.equals("prefix")) {
66
				log.debug("scanner prefix filter, value: " + value);
67
				scanProperties.getFilterList().addFilter(new PrefixFilter(value.getBytes()));
68
			} // TODO add more filterType cases here
69
		}
70
		return scanProperties;
71
	}
72

  
73
	/**
74
	 * Writes the given scan into a Base64 encoded string.
75
	 *
76
	 * @param scan
77
	 *            The scan to write out.
78
	 * @return The scan saved in a Base64 encoded string.
79
	 * @throws IOException
80
	 *             When writing the scan fails.
81
	 */
82
	private static String convertScanToString(final Scan scan) throws IOException {
83
		ByteArrayOutputStream out = new ByteArrayOutputStream();
84
		DataOutputStream dos = new DataOutputStream(out);
85
		scan.write(dos);
86
		return Base64.encodeBytes(out.toByteArray());
87
	}
88

  
89
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/hbase/HbaseTableFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.List;
6

  
7
import com.google.common.base.Predicates;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Lists;
10
import eu.dnetlib.data.hadoop.config.ClusterName;
11
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
12
import eu.dnetlib.data.transform.Row;
13
import eu.dnetlib.data.transform.XsltRowTransformerFactory;
14
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17
import org.apache.hadoop.conf.Configuration;
18
import org.apache.hadoop.hbase.client.HTable;
19
import org.apache.hadoop.hbase.client.Mutation;
20
import org.springframework.beans.factory.annotation.Autowired;
21
import org.springframework.beans.factory.annotation.Required;
22

  
23
/**
24
 * The Class HbaseTableFeeder provides abstraction to ship batch operation on an HBase table
25
 */
26
public abstract class HbaseTableFeeder {
27

  
28
	/**
29
	 * The Constant log.
30
	 */
31
	private static final Log log = LogFactory.getLog(HbaseTableFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
32
	/**
33
	 * The configuration enumerator.
34
	 */
35
	@Autowired
36
	protected ConfigurationEnumerator configurationEnumerator;
37
	/**
38
	 * The batch size.
39
	 */
40
	private int batchSize = 100;
41
	/**
42
	 * The result set client factory.
43
	 */
44
	private ResultSetClientFactory resultSetClientFactory;
45

  
46
	/**
47
	 * Adds the operation.
48
	 *
49
	 * @param buffer the buffer
50
	 * @param row    the row
51
	 */
52
	protected abstract void addOperation(final List<Mutation> buffer, final Row row);
53

  
54
	/**
55
	 * Feed.
56
	 *
57
	 * @param epr         the epr
58
	 * @param xsl         the xsl
59
	 * @param clusterName the cluster name
60
	 * @param tableName   the table name
61
	 * @param simulation  the simulation
62
	 * @return the int
63
	 * @throws IOException          Signals that an I/O exception has occurred.
64
	 * @throws InterruptedException the interrupted exception
65
	 */
66
	public int feed(final String epr, final String xsl, final ClusterName clusterName, final String tableName, final boolean simulation)
67
			throws IOException, InterruptedException {
68
		return doWrite(asRows(epr, xsl), getConf(clusterName), tableName, simulation);
69
	}
70

  
71
	/**
72
	 * Do writeOnHBase.
73
	 *
74
	 * @param rows          the rows
75
	 * @param configuration the configuration
76
	 * @param tableName     the table name
77
	 * @param simulation    the simulation
78
	 * @return the int
79
	 * @throws IOException          Signals that an I/O exception has occurred.
80
	 * @throws InterruptedException the interrupted exception
81
	 */
82
	private int doWrite(final Iterable<Row> rows, final Configuration configuration, final String tableName, final boolean simulation)
83
			throws IOException, InterruptedException {
84
		final List<Mutation> buffer = Lists.newArrayList();
85

  
86
		int count = 0;
87
		if (simulation) {
88
			log.info("running in simulation mode ...");
89
			log.info(String.format("... simulated import of %d records", Iterables.size(rows)));
90
		} else {
91

  
92
			final HTable htable = new HTable(configuration, tableName);
93
			try {
94
				int i = 0;
95
				for (final Row row : rows) {
96
					addOperation(buffer, row);
97
					if ((++i % getBatchSize()) == 0) {
98
						flush(tableName, buffer, htable);
99
						count += buffer.size();
100
						buffer.clear();
101
					}
102
				}
103
			} finally {
104
				if (!buffer.isEmpty()) {
105
					flush(tableName, buffer, htable);
106
					count += buffer.size();
107
				}
108
				htable.flushCommits();
109
				htable.close();
110
			}
111
		}
112
		return count;
113
	}
114

  
115
	private void flush(final String tableName, final List<Mutation> buffer, final HTable htable) throws IOException, InterruptedException {
116
		if (!checkOp(htable.batch(buffer), tableName)) throw new IOException("unable to flush operation on HBase table: " + tableName);
117
	}
118

  
119
	private boolean checkOp(final Object[] res, final String tableName) throws IOException {
120
		return Iterables.all(Arrays.asList(res), Predicates.notNull());
121
	}
122

  
123
	/**
124
	 * As rows.
125
	 *
126
	 * @param epr the epr
127
	 * @param xsl the xsl
128
	 * @return the iterable
129
	 */
130
	protected Iterable<Row> asRows(final String epr, final String xsl) {
131
		return Iterables.concat(Iterables.transform(getResultSetClientFactory().getClient(epr), XsltRowTransformerFactory.newInstance(xsl)));
132
	}
133

  
134
	/**
135
	 * Gets the conf.
136
	 *
137
	 * @param clusterName the cluster name
138
	 * @return the conf
139
	 */
140
	protected Configuration getConf(final ClusterName clusterName) {
141
		return configurationEnumerator.get(clusterName);
142
	}
143

  
144
	/**
145
	 * Gets the batch size.
146
	 *
147
	 * @return the batch size
148
	 */
149
	public int getBatchSize() {
150
		return batchSize;
151
	}
152

  
153
	/**
154
	 * Sets the batch size.
155
	 *
156
	 * @param batchSize the new batch size
157
	 */
158
	public void setBatchSize(final int batchSize) {
159
		this.batchSize = batchSize;
160
	}
161

  
162
	/**
163
	 * Gets the result set client factory.
164
	 *
165
	 * @return the result set client factory
166
	 */
167
	public ResultSetClientFactory getResultSetClientFactory() {
168
		return resultSetClientFactory;
169
	}
170

  
171
	/**
172
	 * Sets the result set client factory.
173
	 *
174
	 * @param resultSetClientFactory the new result set client factory
175
	 */
176
	@Required
177
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
178
		this.resultSetClientFactory = resultSetClientFactory;
179
	}
180

  
181
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/utils/JobProfile.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.util.Map;
4
import java.util.Set;
5

  
6
import com.google.common.collect.Maps;
7
import com.google.common.collect.Sets;
8

  
9
import eu.dnetlib.data.hadoop.rmi.HadoopJobType;
10

  
11
public class JobProfile {
12

  
13
	private final Map<String, String> jobDefinition = Maps.newHashMap();
14
	private final Set<String> requiredParams = Sets.newHashSet();
15
	private ScanProperties scanProperties;
16

  
17
	private String name;
18

  
19
	private String description = "";
20

  
21
	private HadoopJobType jobType;
22

  
23
	public boolean isEmpty() {
24
		return getJobDefinition().isEmpty();
25
	}
26

  
27
	public Map<String, String> getJobDefinition() {
28
		return jobDefinition;
29
	}
30

  
31
	public Set<String> getRequiredParams() {
32
		return requiredParams;
33
	}
34

  
35
	public String getName() {
36
		return name;
37
	}
38

  
39
	public void setName(String name) {
40
		this.name = name;
41
	}
42

  
43
	public String getDescription() {
44
		return description;
45
	}
46

  
47
	public void setDescription(String description) {
48
		this.description = description;
49
	}
50

  
51
	public ScanProperties getScanProperties() {
52
		return scanProperties;
53
	}
54

  
55
	public void setScanProperties(ScanProperties scanProperties) {
56
		this.scanProperties = scanProperties;
57
	}
58

  
59
	public HadoopJobType getJobType() {
60
		return jobType;
61
	}
62

  
63
	public void setJobType(HadoopJobType jobType) {
64
		this.jobType = jobType;
65
	}
66

  
67
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseDeleteFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.util.List;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.hbase.client.Delete;
8
import org.apache.hadoop.hbase.client.Mutation;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
import eu.dnetlib.data.transform.Column;
12
import eu.dnetlib.data.transform.Row;
13

  
14
/**
15
 * The Class HBaseDeleteFeeder performs a batch of Delete operations.
16
 */
17
public class HBaseDeleteFeeder extends HbaseTableFeeder {
18

  
19
	/**
20
	 * Logger.
21
	 */
22
	private static final Log log = LogFactory.getLog(HBaseDeleteFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	/*
25
	 * (non-Javadoc)
26
	 *
27
	 * @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row)
28
	 */
29
	@Override
30
	protected void addOperation(final List<Mutation> buffer, final Row row) {
31
		final Delete delete = new Delete(Bytes.toBytes(row.getKey()));
32
		delete.setWriteToWAL(true);
33

  
34
		for (final Column<String, byte[]> col : row) {
35
			log.debug(String.format("deleting K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName()));
36
			delete.deleteColumns(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName()));
37
		}
38

  
39
		buffer.add(delete);
40
	}
41

  
42
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/JobRegistry.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.util.Date;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Map.Entry;
7

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Required;
11

  
12
import com.google.common.collect.BiMap;
13
import com.google.common.collect.HashBiMap;
14
import com.google.common.collect.Iterables;
15
import com.google.common.collect.Lists;
16
import com.google.common.collect.Maps;
17

  
18
import eu.dnetlib.data.hadoop.HadoopJob.Status;
19
import eu.dnetlib.data.hadoop.config.ClusterName;
20
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
21
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
22
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
23

  
24
public class JobRegistry {
25

  
26
	private static final Log log = LogFactory.getLog(JobRegistry.class); // NOPMD by marko on 11/24/08 5:02 PM
27

  
28
	private int maxJobs;
29

  
30
	private final BiMap<String, HadoopJob> jobs = HashBiMap.create();
31

  
32
	public String registerJob(HadoopJob hadoopJob) throws HadoopServiceException {
33

  
34
		if (jobs.containsValue(hadoopJob)) { return jobs.inverse().get(hadoopJob); }
35

  
36
		if (jobs.size() > getMaxJobs()) {
37
			removeOldestProcess();
38
		}
39

  
40
		jobs.put(hadoopJob.getId(), hadoopJob);
41
		log.info("Registered hadoop job " + hadoopJob.getId());
42
		hadoopJob.startMonitor();
43

  
44
		return hadoopJob.getId();
45
	}
46

  
47
	public Status getJobStatus(String id) {
48
		return findJob(id).getStatus();
49
	}
50

  
51
	public HadoopJob findJob(String id) {
52
		return jobs.get(id);
53
	}
54

  
55
	public void unregisterJob(String id) throws HadoopServiceException {
56

  
57
		if (!jobs.containsKey(id)) { throw new HadoopServiceException("unable to unregister job, could not find jobId in registry: " + id); }
58

  
59
		log.info("unregistering job: " + id);
60
		jobs.get(id).getJobMonitor().kill();
61
		jobs.remove(id);
62
	}
63

  
64
	private void removeOldestProcess() throws HadoopServiceException {
65
		Date oldDate = new Date();
66
		String oldId = null;
67

  
68
		for (Entry<String, HadoopJob> e : jobs.entrySet()) {
69
			final HadoopJob hadoopJob = e.getValue();
70

  
71
			if (hadoopJob.isComplete()) {
72
				final Date date = hadoopJob.getLastActivity();
73
				if (date.before(oldDate)) {
74
					oldDate = date;
75
					oldId = e.getKey();
76
				}
77
			}
78
		}
79

  
80
		if (oldId != null) {
81
			unregisterJob(oldId);
82
		}
83

  
84
	}
85

  
86
	public List<HadoopJobDescriptor> listJobs(ClusterName clusterName) {
87
		Map<String, HadoopJob> filtered = Maps.filterValues(jobs, HadoopUtils.filterByCluster(clusterName));
88
		return Lists.newArrayList(Iterables.transform(filtered.entrySet(), HadoopUtils.hadoopJobDescriptor()));
89
	}
90

  
91
	@Required
92
	public void setMaxJobs(final int maxJobs) {
93
		this.maxJobs = maxJobs;
94
	}
95

  
96
	public int getMaxJobs() {
97
		return maxJobs;
98
	}
99

  
100
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/hbase/HBasePutFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.util.List;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff