Project

General

Profile

« Previous | Next » 

Revision 52124

[maven-release-plugin] copy for tag dnet-hadoop-service-2.7.6

View differences:

modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/pom.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3
	<parent>
4
		<groupId>eu.dnetlib</groupId>
5
		<artifactId>dnet45-parent</artifactId>
6
		<version>1.0.0</version>
7
	</parent>
8
	<modelVersion>4.0.0</modelVersion>
9
	<groupId>eu.dnetlib</groupId>
10
	<artifactId>dnet-hadoop-service</artifactId>
11
	<packaging>jar</packaging>
12
	<version>2.7.6</version>
13
	<scm>
14
	    <developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6</developerConnection>
15
	</scm>
16
	<repositories>
17
		<!-- Cloudera Repositories -->
18
		<repository>
19
			<snapshots>
20
				<enabled>false</enabled>
21
			</snapshots>
22
			<id>cloudera-central</id>
23
			<name>cloundera-libs-release</name>
24
			<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-central</url>
25
		</repository>
26
		<repository>
27
			<id>cloudera-snapshots</id>
28
			<name>cloudera-libs-snapshot</name>
29
			<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-snapshots</url>
30
		</repository>
31
	</repositories>
32
	<dependencies>
33
		<dependency>
34
			<groupId>eu.dnetlib</groupId>
35
			<artifactId>dnet-hadoop-service-rmi</artifactId>
36
			<version>[1.0.0,2.0.0)</version>
37
		</dependency>
38
		<dependency>
39
			<groupId>com.google.code.gson</groupId>
40
			<artifactId>gson</artifactId>
41
			<version>${google.gson.version}</version>
42
		</dependency>
43
		<dependency>
44
			<groupId>eu.dnetlib</groupId>
45
			<artifactId>cnr-blackboard-common</artifactId>
46
			<version>[2.1.0,3.0.0)</version>
47
		</dependency>
48
		<dependency>
49
			<groupId>eu.dnetlib</groupId>
50
			<artifactId>cnr-resultset-client</artifactId>
51
			<version>[2.0.0,3.0.0)</version>
52
			<exclusions>
53
				<exclusion>
54
					<artifactId>slf4j-api</artifactId>
55
					<groupId>org.slf4j</groupId>
56
				</exclusion>
57
			</exclusions>
58
		</dependency>
59
		<dependency>
60
			<groupId>eu.dnetlib</groupId>
61
			<artifactId>dnet-hadoop-commons</artifactId>
62
			<version>[2.0.0,3.0.0)</version>
63
		</dependency>
64
<!--
65
		<dependency>
66
			<groupId>eu.dnetlib</groupId>
67
			<artifactId>dnet-mapreduce-submitter</artifactId>
68
			<version>[3.0.0,4.0.0)</version>
69
		</dependency>
70
-->
71
		<dependency>
72
			<groupId>org.apache.hbase</groupId>
73
			<artifactId>hbase</artifactId>
74
			<version>${apache.hbase.version}</version>
75
			<exclusions>
76
                                <exclusion>
77
                                        <artifactId>jasper-compiler</artifactId>
78
                                        <groupId>tomcat</groupId>
79
                                </exclusion>
80
                                <exclusion>
81
                                        <artifactId>jasper-runtime</artifactId>
82
                                        <groupId>tomcat</groupId>
83
                                </exclusion>
84
                                <exclusion>
85
                                        <artifactId>jamon-runtime</artifactId>
86
                                        <groupId>org.jamon</groupId>
87
                                </exclusion>
88
				<exclusion>
89
					<artifactId>jsp-api-2.1</artifactId>
90
					<groupId>org.mortbay.jetty</groupId>
91
				</exclusion>
92
                                <exclusion>
93
                                        <artifactId>jsp-2.1</artifactId>
94
                                        <groupId>org.mortbay.jetty</groupId>
95
                                </exclusion>
96
				<exclusion>
97
					<artifactId>servlet-api-2.5</artifactId>
98
					<groupId>org.mortbay.jetty</groupId>
99
				</exclusion>
100
                                <exclusion>
101
                                        <artifactId>jetty</artifactId>
102
                                        <groupId>org.mortbay.jetty</groupId>
103
                                </exclusion>
104
                                <exclusion>
105
                                        <artifactId>jetty-util</artifactId>
106
                                        <groupId>org.mortbay.jetty</groupId>
107
                                </exclusion>
108
				<exclusion>
109
					<artifactId>slf4j-api</artifactId>
110
					<groupId>org.slf4j</groupId>
111
				</exclusion>
112
				<exclusion>
113
					<artifactId>slf4j-log4j12</artifactId>
114
					<groupId>org.slf4j</groupId>
115
				</exclusion>
116
				<exclusion>
117
					<artifactId>stax-api</artifactId>
118
					<groupId>stax</groupId>
119
				</exclusion>
120
				<exclusion>
121
					<artifactId>httpclient</artifactId>
122
					<groupId>org.apache.httpcomponents</groupId>
123
				</exclusion>
124
				<exclusion>
125
					<artifactId>guava</artifactId>
126
					<groupId>com.google.guava</groupId>
127
				</exclusion>
128
				<exclusion>
129
					<artifactId>asm</artifactId>
130
					<groupId>asm</groupId>
131
				</exclusion>
132
				<exclusion>
133
					<artifactId>jaxb-api</artifactId>
134
					<groupId>javax.xml.bind</groupId>
135
				</exclusion>
136
				<exclusion>
137
					<artifactId>jaxb-impl</artifactId>
138
					<groupId>com.sun.xml.bind</groupId>
139
				</exclusion>
140
				<exclusion>
141
					<artifactId>commons-codec</artifactId>
142
					<groupId>commons-codec</groupId>
143
				</exclusion>
144
                                <exclusion>
145
                                        <artifactId>commons-logging</artifactId>
146
                                        <groupId>commons-logging</groupId>
147
                                </exclusion>
148
			</exclusions>
149
		</dependency>
150
		<dependency>
151
			<groupId>org.apache.oozie</groupId>
152
			<artifactId>oozie-client</artifactId>
153
			<version>${apache.oozie.version}</version>
154
			<exclusions>
155
				<exclusion>
156
					<artifactId>slf4j-simple</artifactId>
157
					<groupId>org.slf4j</groupId>
158
				</exclusion>
159
				<exclusion>
160
					<artifactId>slf4j-api</artifactId>
161
					<groupId>org.slf4j</groupId>
162
				</exclusion>
163
				<exclusion>
164
					<artifactId>xercesImpl</artifactId>
165
					<groupId>xerces</groupId>
166
				</exclusion>
167
			</exclusions>
168
		</dependency>
169
		<dependency>
170
			<groupId>junit</groupId>
171
			<artifactId>junit</artifactId>
172
			<version>${junit.version}</version>
173
			<scope>test</scope>
174
		</dependency>
175
		<dependency>
176
			<groupId>org.apache.logging.log4j</groupId>
177
			<artifactId>log4j-slf4j-impl</artifactId>
178
			<version>2.0.2</version>
179
			<scope>test</scope>
180
		</dependency>
181
		<dependency>
182
			<groupId>org.apache.httpcomponents</groupId>
183
			<artifactId>httpclient</artifactId>
184
			<version>4.3.1</version>
185
		</dependency>
186
                <dependency>
187
                        <groupId>commons-codec</groupId>
188
                        <artifactId>commons-codec</artifactId>
189
                        <version>${commons.codec.version}</version>
190
                </dependency>
191
                <dependency>
192
                        <groupId>commons-net</groupId>
193
                        <artifactId>commons-net</artifactId>
194
                        <version>${commons.net.version}</version>
195
                </dependency>
196
		<dependency>
197
                  <groupId>org.apache.commons</groupId>
198
                  <artifactId>commons-compress</artifactId>
199
                  <version>${commons.compress.version}</version>
200
		</dependency>
201

  
202
	</dependencies>
203
</project>
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/resources/eu/dnetlib/data/hadoop/applicationContext-dnet-hadoop-service.properties
1
services.hadoop.clients={"DM":{"oozie":"true","mapred":"true","hbase":"true"},"IIS":{"oozie":"true","mapred":"false","hbase":"false"}}
2
services.hadoop.hbase.tablefeeder.batchsize=500
3
services.hadoop.jobregistry.size=100
4
services.hadoop.lib.path=/user/dnet/lib/dnet-mapreduce-jobs-assembly-0.0.6.2-SNAPSHOT.jar
5
services.hadoop.hbase.maxversions=1
6
services.hadoop.clients.init.timeout=30
7
services.hadoop.user=dnet
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieJobMonitor.java
1
package eu.dnetlib.data.hadoop.oozie;
2

  
3
import java.io.IOException;
4
import java.util.*;
5

  
6
import com.google.common.collect.Maps;
7
import com.google.common.collect.Sets;
8
import org.apache.commons.io.IOUtils;
9
import org.apache.commons.lang.StringUtils;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.apache.oozie.client.OozieClient;
13
import org.apache.oozie.client.OozieClientException;
14
import org.apache.oozie.client.WorkflowAction;
15
import org.apache.oozie.client.WorkflowJob;
16
import org.apache.oozie.client.WorkflowJob.Status;
17

  
18
import eu.dnetlib.data.hadoop.action.JobCompletion;
19
import eu.dnetlib.data.hadoop.action.JobMonitor;
20
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
21

  
22
import static java.lang.String.format;
23

  
24
public class OozieJobMonitor extends JobMonitor {
25

  
26
	private static final Log log = LogFactory.getLog(JobMonitor.class); // NOPMD by marko on 11/24/08 5:02 PM
27

  
28
	private final OozieClient oozieClient;
29

  
30
	private final String jobId;
31

  
32
	public static final String ACTION_TYPE_SUBWORKFLOW = "sub-workflow";
33

  
34
	private Set<String> workflowActions = Sets.newHashSet();
35

  
36
	@Deprecated
37
	public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback) {
38
		super(callback);
39
		this.oozieClient = oozieClient;
40
		this.jobId = jobId;
41
	}
42

  
43
	public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback, final Set<String> workflowActions) {
44
		super(callback);
45
		this.oozieClient = oozieClient;
46
		this.jobId = jobId;
47
		this.workflowActions = workflowActions;
48
	}
49

  
50
	@Override
51
	public void run() {
52
		try {
53
			log.info("waiting for oozie job completion: " + getHadoopId());
54

  
55
			Status status = Status.PREP;
56
			while (status.equals(Status.PREP) || status.equals(Status.RUNNING)) {
57
				Thread.sleep(monitorSleepTimeSecs * 1000);
58

  
59
				try {
60
					final Status currentStatus = doGetStatus();
61

  
62
					if (!currentStatus.equals(status)) {
63
						status = currentStatus;
64
						lastActivity = new Date();
65
					}
66
				} catch (Throwable e) {
67
					log.warn(format("error polling status for job %s", jobId), e);
68
				}
69
			}
70

  
71
			log.debug(format("job %s finihsed with status: %s", jobId, status));
72
			if (Status.SUCCEEDED.equals(status)) {
73
				// TODO set some content to return to the blackboard msg.
74

  
75
				log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions));
76
				final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions);
77
				if (report != null) {
78
					final Map<String, String> map = Maps.newHashMap();
79
					report.forEach((k, v) -> map.put(k.toString(), v.toString()));
80
					log.info("found oozie job report, size: " + map.size());
81
					getCallback().done(map);
82
				} else {
83
					log.warn("cannot find oozie job report!");
84
					getCallback().done(new HashMap<>());
85
				}
86
            } else {
87
				// TODO retrieve some failure information from the oozie client.
88
				String msg = format("hadoop job: %s failed with status: %s, oozie log:\n %s\n", getHadoopId(), getStatus(), getOozieClient().getJobLog(getHadoopId()));
89
				getCallback().failed(msg, new HadoopServiceException(msg));
90
            }
91
		} catch (Throwable e) {
92
			getCallback().failed(getHadoopId(), e);
93
		}
94
	}
95

  
96
	/**
97
	 * Provides report entries when found for given oozie job identifier. Returns null when report not found.
98
	 */
99
	private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
100
		WorkflowJob oozieJob = oozieClient.getJobInfo(oozieJobId);
101
		for (WorkflowAction currentAction : oozieJob.getActions()) {
102
			log.info(String.format("looking for workflow actions to report, current: '%s'", currentAction.getName()));
103
			if (workflowActions.contains(currentAction.getName())) {
104
				log.info(String.format("found workflow action %s", currentAction.getName()));
105
				if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) {
106
					log.info(String.format("looking for sub-workflow actions external id: %s", currentAction.getExternalId()));
107
					Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
108
					if (subworkflowProperties != null) {
109
						return subworkflowProperties;
110
					}
111
				} else if (StringUtils.isNotBlank(currentAction.getData())) {
112
					Properties properties = new Properties();
113
					properties.load(IOUtils.toInputStream(currentAction.getData()));
114
					log.info(String.format("found workflow action(%s) properties size %s", currentAction.getName(), properties.values().size()));
115
					return properties;
116
				}
117
			} else {
118
				log.info(String.format("cannot find workflow action(%s) properties", currentAction.getName()));
119
			}
120
		}
121
		return null;
122
	}
123

  
124
	@Override
125
	public String getHadoopId() {
126
		return jobId;
127
	}
128

  
129
	public OozieClient getOozieClient() {
130
		return oozieClient;
131
	}
132

  
133
	@Override
134
	public String getStatus() {
135
		try {
136
			return doGetStatus().toString();
137
		} catch (OozieClientException e) {
138
			log.error("error accessing job status", e);
139
			return "UNKNOWN";
140
		}
141
	}
142

  
143
	private Status doGetStatus() throws OozieClientException {
144
		return getOozieClient().getJobInfo(getHadoopId()).getStatus();
145
	}
146

  
147
	@Override
148
	public Date getLastActivity() {
149
		return lastActivity;
150
	}
151

  
152
	@Override
153
	public Date getStartTime() throws HadoopServiceException {
154
		try {
155
			return getOozieClient().getJobInfo(getHadoopId()).getStartTime();
156
		} catch (OozieClientException e) {
157
			throw new HadoopServiceException("unable to read job start time", e);
158
		}
159
	}
160

  
161
	@Override
162
	public String getTrackerUrl() {
163
		return getOozieClient().getOozieUrl();
164
	}
165

  
166
	@Override
167
	public void kill() {
168
		try {
169
			getOozieClient().kill(getHadoopId());
170
		} catch (OozieClientException e) {
171
			log.error("unable to kill job: " + getHadoopId(), e);
172
		}
173
	}
174

  
175
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieClientFactory.java
1
package eu.dnetlib.data.hadoop.oozie;
2

  
3
import eu.dnetlib.data.hadoop.AbstractHadoopClient;
4
import eu.dnetlib.data.hadoop.config.ClusterName;
5
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.apache.oozie.client.OozieClient;
9

  
10
/**
11
 * Factory bean for Oozie client instances.
12
 * 
13
 * oozie server may not be available in every hadoop deployment, so every oozie specific operation should be defined as optional.
14
 * 
15
 * @author claudio
16
 * 
17
 */
18
public class OozieClientFactory extends AbstractHadoopClient {
19

  
20
	private static final String ENV_ATTRIBUTE_OOZIE_SERVICE_LOC = "oozie.service.loc";
21

  
22
	private static final Log log = LogFactory.getLog(OozieClientFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	public OozieClient newInstance(ClusterName clusterName) throws HadoopServiceException {
25
		final String oozieServiceLocation = configurationEnumerator.get(clusterName).get(ENV_ATTRIBUTE_OOZIE_SERVICE_LOC);
26
		log.info("init oozie client, cluster: " + clusterName.toString() + ", oozie server: " + oozieServiceLocation);
27
		setHadoopUser();
28
		try {
29
			return new OozieClient(oozieServiceLocation);
30
		} catch (Throwable e) {
31
			throw new HadoopServiceException("unable to initialize oozie client for cluster: " + clusterName.toString(), e);
32
		}
33
	}
34

  
35
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/test/java/eu/dnetlib/data/hadoop/utils/ReadSequenceFileTest.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.util.Map;
4
import java.util.Map.Entry;
5

  
6
import org.apache.commons.math.stat.descriptive.SummaryStatistics;
7
import org.apache.hadoop.conf.Configuration;
8
import org.apache.hadoop.fs.Path;
9
import org.apache.hadoop.io.Text;
10
import org.junit.Before;
11
import org.junit.Ignore;
12
import org.junit.Test;
13
import org.springframework.core.io.ClassPathResource;
14

  
15
import com.google.common.collect.Maps;
16

  
17
import eu.dnetlib.data.hadoop.config.ConfigurationFactory;
18
import eu.dnetlib.data.hadoop.hdfs.SequenceFileUtils;
19
import eu.dnetlib.miscutils.collections.Pair;
20

  
21
public class ReadSequenceFileTest {
22

  
23
	private static final Path SEQUENCE_FILE_PATH = new Path("hdfs://nmis-hadoop-cluster/tmp/indexrecords_db_openaireplus_sesam_SESAMALL.seq");
24
	private final String HADOOP_CONF_FILE = "/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties";
25

  
26
	private Configuration conf;
27

  
28
	@Before
29
	public void setUp() {
30
		final ConfigurationFactory confFactory = new ConfigurationFactory();
31
		confFactory.setDefaults(new ClassPathResource(HADOOP_CONF_FILE));
32
		conf = confFactory.getConfiguration();
33
	}
34

  
35
	@Test
36
	@Ignore
37
	public void testReadSequenceFile() throws Exception {
38
		final SummaryStatistics statsAll = new SummaryStatistics();
39

  
40
		final Map<String, SummaryStatistics> stats = Maps.newHashMap();
41

  
42
		int i = 0;
43
		for (Pair<Text, Text> pair : SequenceFileUtils.read(SEQUENCE_FILE_PATH, conf)) {
44
			final String id = pair.getKey().toString();
45
			final String record = pair.getValue().toString();
46
			final int length = record.getBytes().length;
47

  
48
			final String type = id.substring(0, 2);
49
			if (!stats.containsKey(type)) {
50
				stats.put(type, new SummaryStatistics());
51
			}
52
			statsAll.addValue(length);
53
			stats.get(type).addValue(length);
54

  
55
			if (++i % 10000 == 0) {
56
				System.out.println("Read " + i);
57
			}
58
		}
59

  
60
		printStats("ALL", statsAll);
61
		for (Entry<String, SummaryStatistics> e : stats.entrySet()) {
62
			printStats(e.getKey(), e.getValue());
63
		}
64
	}
65

  
66
	private void printStats(final String type, final SummaryStatistics stats) {
67
		System.out.println("************************************");
68
		System.out.println("Type: " + type);
69
		System.out.println(String.format("\tmin    : %.2f KBytes", stats.getMin() / 1024));
70
		System.out.println(String.format("\tmax    : %.2f KBytes", stats.getMax() / 1024));
71
		System.out.println(String.format("\tavg    : %.2f KBytes", stats.getMean() / 1024));
72
		System.out.println(String.format("\tstdDev : %.2f", stats.getStandardDeviation() / 1024));
73
	}
74
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/deploy.info
1
{"type_source": "SVN", "goal": "package -U -T 4C source:jar", "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-hadoop-service/trunk/", "deploy_repository": "dnet45-snapshots", "version": "4", "mail": "sandro.labruzzo@isti.cnr.it,michele.artini@isti.cnr.it, claudio.atzori@isti.cnr.it, alessia.bardi@isti.cnr.it", "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots", "name": "dnet-hadoop-service"}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/AbstractHadoopClient.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
4
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
5
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Autowired;
9

  
10
public abstract class AbstractHadoopClient {
11

  
12
	private static final Log log = LogFactory.getLog(AbstractHadoopClient.class);
13

  
14
	@Autowired
15
	private ISClient isClient;
16

  
17
	@Autowired
18
	protected ConfigurationEnumerator configurationEnumerator;
19

  
20
	protected void setHadoopUser() throws HadoopServiceException {
21
		setHadoopUser(getDefaultUser());
22
	}
23

  
24
	private String getDefaultUser() throws HadoopServiceException {
25
		try {
26
			return isClient.queryForServiceProperty("default.hadoop.user");
27
		} catch (ISLookUpException e) {
28
			throw new HadoopServiceException(e);
29
		}
30
	}
31

  
32
	protected void setHadoopUser(final String userName) {
33
		log.info("setting HADOOP_USER_NAME as " + userName);
34
		System.setProperty("HADOOP_USER_NAME", userName);
35
	}
36

  
37
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/HadoopClientMap.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5

  
6
import com.google.common.collect.Maps;
7
import com.google.gson.Gson;
8
import eu.dnetlib.data.hadoop.config.ClusterName;
9
import eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory;
10
import eu.dnetlib.data.hadoop.mapred.JobClientFactory;
11
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory;
12
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.apache.hadoop.hbase.client.HBaseAdmin;
16
import org.apache.hadoop.mapred.JobClient;
17
import org.apache.oozie.client.OozieClient;
18
import org.springframework.beans.factory.annotation.Required;
19

  
20

  
21
public class HadoopClientMap {
22

  
23
	private static final Log log = LogFactory.getLog(HadoopClientMap.class); // NOPMD by marko on 11/24/08 5:02 PM
24

  
25
	private JobClientFactory jobClientFactory;
26

  
27
	private OozieClientFactory oozieClientFactory;
28

  
29
	private HBaseAdminFactory hbaseAdminFactory;
30

  
31
	private Map<String, Map<String, String>> enabledClients = Maps.newHashMap();
32

  
33
	public boolean isMapreduceAvailable(final ClusterName name) {
34
		return isClientAvailable(name, "mapred");
35
	}
36

  
37
	public boolean isOozieAvailable(final ClusterName name) {
38
		return isClientAvailable(name, "oozie");
39
	}
40

  
41
	public boolean isHbaseAvailable(final ClusterName name) {
42
		return isClientAvailable(name, "hbase");
43
	}
44

  
45
	private boolean isClientAvailable(final ClusterName name, final String clientName) {
46
		final String clusterName = name.toString();
47
		return enabledClients.containsKey(clusterName) && "true".equals(enabledClients.get(clusterName).get(clientName));
48
	}
49

  
50
	public JobClient getJtClient(final ClusterName clusterName, final String username) throws HadoopServiceException, IOException {
51
		if (!isMapreduceAvailable(clusterName)) {
52
			throw new HadoopServiceException("jobtracker is not available for cluster " + clusterName.toString());
53
		}
54
		return getJobClientFactory().newInstance(clusterName, username);
55
	}
56

  
57
	public JobClient getJtClient(final ClusterName clusterName) throws HadoopServiceException, IOException {
58
		if (!isMapreduceAvailable(clusterName)) {
59
			throw new HadoopServiceException("jobtracker is not available for cluster " + clusterName.toString());
60
		}
61
		return getJobClientFactory().newInstance(clusterName);
62
	}
63

  
64
	public OozieClient getOozieClient(final ClusterName name) throws HadoopServiceException {
65
		if (!isOozieAvailable(name)) {
66
			throw new HadoopServiceException("oozie is not available for cluster " + name.toString());
67
		}
68
		return getOozieClientFactory().newInstance(name);
69
	}
70

  
71
	public HBaseAdmin getHbaseAdmin(final ClusterName name) throws HadoopServiceException {
72
		if (!isHbaseAvailable(name)) {
73
			throw new HadoopServiceException("hbase is not available for cluster " + name.toString());
74
		}
75
		return getHbaseAdminFactory().newInstance(name);
76
	}
77

  
78
	// //////////
79

  
80
	public String getEnabledClients() {
81
		return new Gson().toJson(enabledClients);
82
	}
83

  
84
	@Required
85
	@SuppressWarnings("unchecked")
86
	public void setEnabledClients(final String enabledClients) {
87
		this.enabledClients = new Gson().fromJson(enabledClients, Map.class);
88
	}
89

  
90
	public JobClientFactory getJobClientFactory() {
91
		return jobClientFactory;
92
	}
93

  
94
	@Required
95
	public void setJobClientFactory(final JobClientFactory jobClientFactory) {
96
		this.jobClientFactory = jobClientFactory;
97
	}
98

  
99
	public OozieClientFactory getOozieClientFactory() {
100
		return oozieClientFactory;
101
	}
102

  
103
	@Required
104
	public void setOozieClientFactory(final OozieClientFactory oozieClientFactory) {
105
		this.oozieClientFactory = oozieClientFactory;
106
	}
107

  
108
	public HBaseAdminFactory getHbaseAdminFactory() {
109
		return hbaseAdminFactory;
110
	}
111

  
112
	@Required
113
	public void setHbaseAdminFactory(final HBaseAdminFactory hbaseAdminFactory) {
114
		this.hbaseAdminFactory = hbaseAdminFactory;
115
	}
116

  
117
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/ISClient.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.util.List;
4

  
5
import javax.annotation.Resource;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9

  
10
import com.google.common.collect.Iterables;
11

  
12
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
13
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
14
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
15
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
16
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
17
import eu.dnetlib.miscutils.datetime.DateUtils;
18

  
19
public class ISClient {
20

  
21
	private static final Log log = LogFactory.getLog(ISClient.class); // NOPMD by marko on 11/24/08 5:02 PM
22

  
23
	@Resource
24
	private UniqueServiceLocator serviceLocator;
25

  
26
	public String getJobProfile(final String jobName) throws ISLookUpException {
27
		return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(
28
				"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" + jobName + "']");
29
	}
30

  
31
	public String queryForServiceProperty(final String key) throws ISLookUpException {
32
		return getServiceConfigValue(
33
				String.format(
34
						"distinct-values("
35
								+ "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='HadoopServiceResourceType'] "
36
								+ "return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='%s']/@value/string())",
37
						key));
38
	}
39

  
40
	public void updateCountElement(final String jobName, final String element, final String delta) {
41
		final String xquery =
42
				"let $x := //RESOURCE_PROFILE[" + ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" + jobName
43
						+ "'], $tot := $x//STATUS/" + element + "/@value/number() " + delta + " return update replace $x//STATUS/" + element + " with <"
44
						+ element + " value='{$tot}' />";
45

  
46
		executeXUpdate(xquery);
47
	}
48

  
49
	public void updateDate(final String jobName) {
50
		log.info("increment last submission date for job: " + jobName);
51
		executeXUpdate("for $x in collection('')/RESOURCE_PROFILE["
52
				+ ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='"
53
				+ jobName + "'] " + " return update value $x//LAST_SUBMISSION_DATE/@value with '" + DateUtils.now_ISO8601() + "' ");
54
	}
55

  
56
	private String getServiceConfigValue(final String xquery) throws ISLookUpException {
57
		log.debug("quering for service property: " + xquery);
58
		final List<String> urls = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery);
59
		if ((urls == null) || (urls.size() != 1))
60
			throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
61
		return Iterables.getOnlyElement(urls);
62
	}
63

  
64
	private boolean executeXUpdate(final String xupdate) {
65
		try {
66
			log.debug("running xupdate: " + xupdate);
67
			return serviceLocator.getService(ISRegistryService.class).executeXUpdate(xupdate);
68
		} catch (final ISRegistryException e) {
69
			log.error("unable to run xupdate: " + xupdate, e);
70
			return false;
71
		}
72
	}
73
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseAdminFactory.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import eu.dnetlib.data.hadoop.AbstractHadoopClient;
4
import eu.dnetlib.data.hadoop.config.ClusterName;
5
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.apache.hadoop.hbase.client.HBaseAdmin;
9

  
10
public class HBaseAdminFactory extends AbstractHadoopClient {
11

  
12
	private static final Log log = LogFactory.getLog(HBaseAdminFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
13

  
14
	public HBaseAdmin newInstance(final ClusterName clusterName) throws HadoopServiceException {
15
		try {
16
			log.info("init hbaseAdmin, cluster: " + clusterName.toString());
17
			setHadoopUser();
18
			return new HBaseAdmin(configurationEnumerator.get(clusterName));
19
		} catch (final Throwable e) {
20
			throw new HadoopServiceException("unable to initialize hbase client for cluster: " + clusterName.toString(), e);
21
		}
22
	}
23

  
24
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/hbase/HbaseTableFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.List;
6

  
7
import com.google.common.base.Predicates;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Lists;
10
import eu.dnetlib.data.hadoop.config.ClusterName;
11
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
12
import eu.dnetlib.data.transform.Row;
13
import eu.dnetlib.data.transform.XsltRowTransformerFactory;
14
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17
import org.apache.hadoop.conf.Configuration;
18
import org.apache.hadoop.hbase.client.HTable;
19
import org.apache.hadoop.hbase.client.Mutation;
20
import org.springframework.beans.factory.annotation.Autowired;
21
import org.springframework.beans.factory.annotation.Required;
22

  
23
/**
24
 * The Class HbaseTableFeeder provides abstraction to ship batch operation on an HBase table
25
 */
26
public abstract class HbaseTableFeeder {
27

  
28
	/**
29
	 * The Constant log.
30
	 */
31
	private static final Log log = LogFactory.getLog(HbaseTableFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
32
	/**
33
	 * The configuration enumerator.
34
	 */
35
	@Autowired
36
	protected ConfigurationEnumerator configurationEnumerator;
37
	/**
38
	 * The batch size.
39
	 */
40
	private int batchSize = 100;
41
	/**
42
	 * The result set client factory.
43
	 */
44
	private ResultSetClientFactory resultSetClientFactory;
45

  
46
	/**
47
	 * Adds the operation.
48
	 *
49
	 * @param buffer the buffer
50
	 * @param row    the row
51
	 */
52
	protected abstract void addOperation(final List<Mutation> buffer, final Row row);
53

  
54
	/**
55
	 * Feed.
56
	 *
57
	 * @param epr         the epr
58
	 * @param xsl         the xsl
59
	 * @param clusterName the cluster name
60
	 * @param tableName   the table name
61
	 * @param simulation  the simulation
62
	 * @return the int
63
	 * @throws IOException          Signals that an I/O exception has occurred.
64
	 * @throws InterruptedException the interrupted exception
65
	 */
66
	public int feed(final String epr, final String xsl, final ClusterName clusterName, final String tableName, final boolean simulation)
67
			throws IOException, InterruptedException {
68
		return doWrite(asRows(epr, xsl), getConf(clusterName), tableName, simulation);
69
	}
70

  
71
	/**
72
	 * Do writeOnHBase.
73
	 *
74
	 * @param rows          the rows
75
	 * @param configuration the configuration
76
	 * @param tableName     the table name
77
	 * @param simulation    the simulation
78
	 * @return the int
79
	 * @throws IOException          Signals that an I/O exception has occurred.
80
	 * @throws InterruptedException the interrupted exception
81
	 */
82
	private int doWrite(final Iterable<Row> rows, final Configuration configuration, final String tableName, final boolean simulation)
83
			throws IOException, InterruptedException {
84
		final List<Mutation> buffer = Lists.newArrayList();
85

  
86
		int count = 0;
87
		if (simulation) {
88
			log.info("running in simulation mode ...");
89
			log.info(String.format("... simulated import of %d records", Iterables.size(rows)));
90
		} else {
91

  
92
			final HTable htable = new HTable(configuration, tableName);
93
			try {
94
				int i = 0;
95
				for (final Row row : rows) {
96
					addOperation(buffer, row);
97
					if ((++i % getBatchSize()) == 0) {
98
						flush(tableName, buffer, htable);
99
						count += buffer.size();
100
						buffer.clear();
101
					}
102
				}
103
			} finally {
104
				if (!buffer.isEmpty()) {
105
					flush(tableName, buffer, htable);
106
					count += buffer.size();
107
				}
108
				htable.flushCommits();
109
				htable.close();
110
			}
111
		}
112
		return count;
113
	}
114

  
115
	private void flush(final String tableName, final List<Mutation> buffer, final HTable htable) throws IOException, InterruptedException {
116
		if (!checkOp(htable.batch(buffer), tableName)) throw new IOException("unable to flush operation on HBase table: " + tableName);
117
	}
118

  
119
	private boolean checkOp(final Object[] res, final String tableName) throws IOException {
120
		return Iterables.all(Arrays.asList(res), Predicates.notNull());
121
	}
122

  
123
	/**
124
	 * As rows.
125
	 *
126
	 * @param epr the epr
127
	 * @param xsl the xsl
128
	 * @return the iterable
129
	 */
130
	protected Iterable<Row> asRows(final String epr, final String xsl) {
131
		return Iterables.concat(Iterables.transform(getResultSetClientFactory().getClient(epr), XsltRowTransformerFactory.newInstance(xsl)));
132
	}
133

  
134
	/**
135
	 * Gets the conf.
136
	 *
137
	 * @param clusterName the cluster name
138
	 * @return the conf
139
	 */
140
	protected Configuration getConf(final ClusterName clusterName) {
141
		return configurationEnumerator.get(clusterName);
142
	}
143

  
144
	/**
145
	 * Gets the batch size.
146
	 *
147
	 * @return the batch size
148
	 */
149
	public int getBatchSize() {
150
		return batchSize;
151
	}
152

  
153
	/**
154
	 * Sets the batch size.
155
	 *
156
	 * @param batchSize the new batch size
157
	 */
158
	public void setBatchSize(final int batchSize) {
159
		this.batchSize = batchSize;
160
	}
161

  
162
	/**
163
	 * Gets the result set client factory.
164
	 *
165
	 * @return the result set client factory
166
	 */
167
	public ResultSetClientFactory getResultSetClientFactory() {
168
		return resultSetClientFactory;
169
	}
170

  
171
	/**
172
	 * Sets the result set client factory.
173
	 *
174
	 * @param resultSetClientFactory the new result set client factory
175
	 */
176
	@Required
177
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
178
		this.resultSetClientFactory = resultSetClientFactory;
179
	}
180

  
181
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseDeleteFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.util.List;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.hbase.client.Delete;
8
import org.apache.hadoop.hbase.client.Mutation;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
import eu.dnetlib.data.transform.Column;
12
import eu.dnetlib.data.transform.Row;
13

  
14
/**
15
 * The Class HBaseDeleteFeeder performs a batch of Delete operations.
16
 */
17
public class HBaseDeleteFeeder extends HbaseTableFeeder {
18

  
19
	/**
20
	 * Logger.
21
	 */
22
	private static final Log log = LogFactory.getLog(HBaseDeleteFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	/*
25
	 * (non-Javadoc)
26
	 *
27
	 * @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row)
28
	 */
29
	@Override
30
	protected void addOperation(final List<Mutation> buffer, final Row row) {
31
		final Delete delete = new Delete(Bytes.toBytes(row.getKey()));
32
		delete.setWriteToWAL(true);
33

  
34
		for (final Column<String, byte[]> col : row) {
35
			log.debug(String.format("deleting K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName()));
36
			delete.deleteColumns(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName()));
37
		}
38

  
39
		buffer.add(delete);
40
	}
41

  
42
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/hbase/HBasePutFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.util.List;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.hbase.client.Mutation;
8
import org.apache.hadoop.hbase.client.Put;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
import eu.dnetlib.data.transform.Column;
12
import eu.dnetlib.data.transform.Row;
13

  
14
/**
15
 * The Class HBasePutFeeder performs a batch of Put operations..
16
 */
17
public class HBasePutFeeder extends HbaseTableFeeder {
18

  
19
	/**
20
	 * Logger.
21
	 */
22
	private static final Log log = LogFactory.getLog(HBasePutFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	/*
25
	 * (non-Javadoc)
26
	 *
27
	 * @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row)
28
	 */
29
	@Override
30
	protected void addOperation(final List<Mutation> buffer, final Row row) {
31
		final Put put = new Put(Bytes.toBytes(row.getKey()));
32
		put.setWriteToWAL(true);
33

  
34
		for (final Column<String, byte[]> col : row) {
35
			log.debug(String.format("adding value to K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName()));
36
			put.add(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName()), col.getValue());
37
		}
38

  
39
		buffer.add(put);
40
	}
41

  
42
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.io.IOException;
4
import java.net.URI;
5
import java.util.*;
6
import java.util.Map.Entry;
7
import java.util.stream.Collectors;
8

  
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.data.hadoop.rmi.hbase.Column;
16
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
17
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableDescriptor;
18
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableRegionInfo;
19
import org.apache.commons.lang.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.hadoop.conf.Configuration;
23
import org.apache.hadoop.fs.FileSystem;
24
import org.apache.hadoop.fs.Path;
25
import org.apache.hadoop.hbase.HColumnDescriptor;
26
import org.apache.hadoop.hbase.HRegionInfo;
27
import org.apache.hadoop.hbase.HTableDescriptor;
28
import org.apache.hadoop.hbase.client.*;
29
import org.apache.hadoop.hbase.util.Bytes;
30
import org.springframework.beans.factory.annotation.Autowired;
31
import org.springframework.beans.factory.annotation.Required;
32

  
33
public class HadoopServiceCore {
34

  
35
	private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM
36
	@Autowired
37
	protected ConfigurationEnumerator configurationEnumerator;
38
	@Autowired
39
	private HadoopClientMap clients;
40
	private int maxVersions;
41

  
42
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
43
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
44
			return Arrays.asList(admin.listTables())
45
					.stream()
46
					.map(HTableDescriptor::getNameAsString)
47
					.collect(Collectors.toList());
48
		}
49
	}
50

  
51
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
52
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
53

  
54
			if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
55

  
56
			if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
57

  
58
			final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
59

  
60
			final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
61

  
62
			final Set<String> columns = Sets.newHashSet();
63

  
64
			for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) {
65
				columns.add(hColDesc.getNameAsString());
66
			}
67

  
68
			HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
69
			htDescriptor.setColumns(columns);
70

  
71
			List<HBaseTableRegionInfo> regions = Lists.newArrayList();
72

  
73
			for (HRegionInfo info : tableRegions) {
74
				regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
75
			}
76
			htDescriptor.setRegions(regions);
77

  
78
			if (log.isDebugEnabled()) {
79
				log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
80
			}
81

  
82
			return htDescriptor.toString();
83
		}
84
	}
85

  
86
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
87
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
88
			final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
89
			return desc.getFamilies().stream()
90
					.map(d -> d.getNameAsString())
91
					.collect(Collectors.toList());
92
		}
93
	}
94

  
95
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
96
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
97

  
98
			if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
99

  
100
			final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
101

  
102
			log.info("disabling table: " + table);
103
			admin.disableTable(table);
104

  
105
			log.info("deleting table: " + table);
106
			admin.deleteTable(table);
107

  
108
			log.info("creating table: " + table);
109
			admin.createTable(desc);
110
		}
111
	}
112

  
113
	public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
114
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
115

  
116
			return admin.tableExists(table);
117
		}
118
	}
119

  
120
	public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
121
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
122

  
123
			if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
124

  
125
			log.info("disabling table: " + table);
126
			admin.disableTable(table);
127

  
128
			log.info("deleting table: " + table);
129
			admin.deleteTable(table);
130
		}
131
	}
132

  
133
	public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException {
134
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
135

  
136
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
137

  
138
			if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
139

  
140
			final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
141

  
142
			doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
143
		}
144
	}
145

  
146
	public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
147
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
148

  
149
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
150

  
151
			doCreateTable(clusterName, table, columns, null);
152
		}
153
	}
154

  
155
	public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions)
156
			throws IOException, HadoopServiceException {
157
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
158

  
159
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
160

  
161
			final HTableDescriptor desc = new HTableDescriptor(table);
162
			for (final String column : columns) {
163
				final HColumnDescriptor hds = new HColumnDescriptor(column);
164
				hds.setMaxVersions(getMaxVersions());
165
				desc.addFamily(hds);
166
			}
167

  
168
			log.info("creating hbase table: " + table);
169

  
170
			if (regions != null && !regions.isEmpty()) {
171
				log.debug(String.format("create using %s regions: %s", regions.size(), regions));
172
				admin.createTable(desc, getSplitKeys(regions));
173
			} else {
174
				admin.createTable(desc);
175
			}
176

  
177
			log.info("created hbase table: [" + table + "]");
178
			log.debug("descriptor: [" + desc.toString() + "]");
179
		}
180
	}
181

  
182
	private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) {
183
		byte[][] splits = new byte[regions.size() - 1][];
184
		for (int i = 0; i < regions.size() - 1; i++) {
185
			splits[i] = regions.get(i).getEndKey().getBytes();
186
		}
187
		return splits;
188
	}
189

  
190
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
191
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
192

  
193
			if (!admin.tableExists(table)) {
194
				createTable(clusterName, table, columns);
195
			} else {
196
				final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
197

  
198
				final Set<String> foundColumns = desc.getFamilies().stream()
199
						.map(d -> d.getNameAsString())
200
						.collect(Collectors.toCollection(HashSet::new));
201

  
202
				log.info("ensuring columns on table " + table + ": " + columns);
203
				final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
204
				if (!missingColumns.isEmpty()) {
205

  
206
					if (admin.isTableEnabled(table)) {
207
						admin.disableTable(table);
208
					}
209

  
210
					for (final String column : missingColumns) {
211
						log.info("hbase table: '" + table + "', adding column: " + column);
212
						admin.addColumn(table, new HColumnDescriptor(column));
213
					}
214

  
215
					admin.enableTable(table);
216
				}
217
			}
218
		}
219
	}
220

  
221
	public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
222
		final Configuration conf = configurationEnumerator.get(clusterName);
223
		final HTable table = new HTable(conf, tableName);
224

  
225
		try {
226
			table.put(puts);
227
		} finally {
228
			table.flushCommits();
229
			table.close();
230
		}
231
	}
232

  
233
	public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
234
		final Configuration conf = configurationEnumerator.get(clusterName);
235
		final HTable table = new HTable(conf, tableName);
236
		try {
237
			table.delete(deletes);
238
		} finally {
239
			table.flushCommits();
240
			table.close();
241
		}
242
	}
243

  
244
	public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException {
245
		final Configuration conf = configurationEnumerator.get(clusterName);
246
		final HTable table = new HTable(conf, tableName);
247
		try {
248
			for(HBaseRowDescriptor desc : columns) {
249

  
250
				final Delete d = new Delete(Bytes.toBytes(desc.getRowKey()));
251
				d.setWriteToWAL(true);
252
				for(Column c : desc.getColumns()) {
253
					for(String qualifier : c.getQualifier()) {
254
						log.info(String.format("delete from row '%s' cf '%s:%s'", desc.getRowKey(), c.getFamily(), qualifier));
255
						d.deleteColumns(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier));
256
					}
257
				}
258
				table.delete(d);
259
			}
260
		} finally {
261
			table.flushCommits();
262
			table.close();
263
		}
264
	}
265

  
266
	public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException {
267
		final Configuration conf = configurationEnumerator.get(clusterName);
268
		final HTable table = new HTable(conf, tableName);
269
		try {
270
			return table.get(new Get(id));
271
		} finally {
272
			table.close();
273
		}
274
	}
275

  
276
	public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException {
277
		final Map<String, HBaseRowDescriptor> map = Maps.newHashMap();
278
		for(String rowKey : rowKeys) {
279
			map.put(rowKey, describeRow(clusterName, tableName, rowKey));
280
		}
281
		return map;
282
	}
283

  
284
	public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException {
285
		final Configuration conf = configurationEnumerator.get(clusterName);
286
		final HTable table = new HTable(conf, tableName);
287

  
288
		final HBaseRowDescriptor desc = new HBaseRowDescriptor();
289

  
290
		try {
291
			final Result r = table.get(new Get(Bytes.toBytes(rowKey)));
292

  
293
			if (r.isEmpty()) {
294
				return desc;
295
			}
296

  
297
			final List<Column> columns = Lists.newArrayList();
298

  
299
			for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) {
300
				final Set<byte[]> qualifiers = e.getValue().keySet();
301
				final String family = new String(e.getKey());
302
				final Column col = new Column(family);
303

  
304
				for(byte[] q : qualifiers) {
305
					String qs = new String(q);
306
					col.getQualifier().add(qs);
307
				}
308
				columns.add(col);
309
			}
310
			desc.setColumns(columns);
311
			desc.setRowKey(rowKey);
312

  
313
			return desc;
314
		} finally {
315
			table.close();
316
		}
317
	}
318

  
319
	public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException {
320
		final Configuration conf = configurationEnumerator.get(clusterName);
321
		try(final HTable table = new HTable(conf, tableName)) {
322
			final ResultScanner rs = table.getScanner(scan);
323
			try {
324
				return Lists.newArrayList(rs.iterator());
325
			} finally {
326
				rs.close();
327
			}
328
		}
329
	}
330

  
331
	public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException {
332
		if (StringUtils.isBlank(path))
333
			throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path.");
334

  
335
		final Configuration conf = configurationEnumerator.get(clusterName);
336

  
337
		try(final FileSystem hdfs = FileSystem.get(conf)) {
338
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
339

  
340
			if (hdfs.exists(absolutePath)) {
341
				log.debug("deleteFromHBase path: " + absolutePath.toString());
342
				hdfs.delete(absolutePath, true);
343
				log.info("deleted path: " + absolutePath.toString());
344
				return true;
345
			} else {
346
				log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString());
347
				return false;
348
			}
349
		} catch (IOException e) {
350
			throw new HadoopServiceException(e);
351
		}
352
	}
353

  
354
	public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException {
355
		if (StringUtils.isBlank(path))
356
			throw new HadoopServiceException("Cannot create an empty HDFS path.");
357

  
358
		final Configuration conf = configurationEnumerator.get(clusterName);
359

  
360
		try(final FileSystem hdfs = FileSystem.get(conf)) {
361
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
362
			if (!hdfs.exists(absolutePath)) {
363
				hdfs.mkdirs(absolutePath);
364
				log.info("created path: " + absolutePath.toString());
365
				return true;
366
			} else if (force) {
367
				log.info(String.format("found directory '%s', force delete it", absolutePath.toString()));
368
				hdfs.delete(absolutePath, true);
369

  
370
				hdfs.mkdirs(absolutePath);
371
				log.info("created path: " + absolutePath.toString());
372
				return true;
373
			} else {
374
				log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString()));
375
				return false;
376
			}
377
		} catch (IOException e) {
378
			throw new HadoopServiceException(e);
379
		}
380
	}
381

  
382
	public boolean existHdfsPath(final ClusterName clusterName, final String path) throws HadoopServiceException {
383
		if (StringUtils.isBlank(path))
384
			throw new HadoopServiceException("invalid empty path");
385

  
386
		final Configuration conf = configurationEnumerator.get(clusterName);
387
		try(final FileSystem hdfs = FileSystem.get(conf)) {
388
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
389
			return hdfs.exists(absolutePath);
390
		} catch (IOException e) {
391
			throw new HadoopServiceException(e);
392
		}
393
	}
394

  
395
	public Configuration getClusterConiguration(final ClusterName clusterName) {
396
		return configurationEnumerator.get(clusterName);
397
	}
398

  
399
	public int getMaxVersions() {
400
		return maxVersions;
401
	}
402

  
403
	@Required
404
	public void setMaxVersions(final int maxVersions) {
405
		this.maxVersions = maxVersions;
406
	}
407

  
408
	public HadoopClientMap getClients() {
409
		return clients;
410
	}
411

  
412
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/utils/ScanProperties.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff