Revision 52124
Added by Claudio Atzori about 6 years ago
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/pom.xml | ||
---|---|---|
1 |
<?xml version="1.0" encoding="UTF-8"?> |
|
2 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> |
|
3 |
<parent> |
|
4 |
<groupId>eu.dnetlib</groupId> |
|
5 |
<artifactId>dnet45-parent</artifactId> |
|
6 |
<version>1.0.0</version> |
|
7 |
</parent> |
|
8 |
<modelVersion>4.0.0</modelVersion> |
|
9 |
<groupId>eu.dnetlib</groupId> |
|
10 |
<artifactId>dnet-hadoop-service</artifactId> |
|
11 |
<packaging>jar</packaging> |
|
12 |
<version>2.7.6</version> |
|
13 |
<scm> |
|
14 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6</developerConnection> |
|
15 |
</scm> |
|
16 |
<repositories> |
|
17 |
<!-- Cloudera Repositories --> |
|
18 |
<repository> |
|
19 |
<snapshots> |
|
20 |
<enabled>false</enabled> |
|
21 |
</snapshots> |
|
22 |
<id>cloudera-central</id> |
|
23 |
<name>cloundera-libs-release</name> |
|
24 |
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-central</url> |
|
25 |
</repository> |
|
26 |
<repository> |
|
27 |
<id>cloudera-snapshots</id> |
|
28 |
<name>cloudera-libs-snapshot</name> |
|
29 |
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-snapshots</url> |
|
30 |
</repository> |
|
31 |
</repositories> |
|
32 |
<dependencies> |
|
33 |
<dependency> |
|
34 |
<groupId>eu.dnetlib</groupId> |
|
35 |
<artifactId>dnet-hadoop-service-rmi</artifactId> |
|
36 |
<version>[1.0.0,2.0.0)</version> |
|
37 |
</dependency> |
|
38 |
<dependency> |
|
39 |
<groupId>com.google.code.gson</groupId> |
|
40 |
<artifactId>gson</artifactId> |
|
41 |
<version>${google.gson.version}</version> |
|
42 |
</dependency> |
|
43 |
<dependency> |
|
44 |
<groupId>eu.dnetlib</groupId> |
|
45 |
<artifactId>cnr-blackboard-common</artifactId> |
|
46 |
<version>[2.1.0,3.0.0)</version> |
|
47 |
</dependency> |
|
48 |
<dependency> |
|
49 |
<groupId>eu.dnetlib</groupId> |
|
50 |
<artifactId>cnr-resultset-client</artifactId> |
|
51 |
<version>[2.0.0,3.0.0)</version> |
|
52 |
<exclusions> |
|
53 |
<exclusion> |
|
54 |
<artifactId>slf4j-api</artifactId> |
|
55 |
<groupId>org.slf4j</groupId> |
|
56 |
</exclusion> |
|
57 |
</exclusions> |
|
58 |
</dependency> |
|
59 |
<dependency> |
|
60 |
<groupId>eu.dnetlib</groupId> |
|
61 |
<artifactId>dnet-hadoop-commons</artifactId> |
|
62 |
<version>[2.0.0,3.0.0)</version> |
|
63 |
</dependency> |
|
64 |
<!-- |
|
65 |
<dependency> |
|
66 |
<groupId>eu.dnetlib</groupId> |
|
67 |
<artifactId>dnet-mapreduce-submitter</artifactId> |
|
68 |
<version>[3.0.0,4.0.0)</version> |
|
69 |
</dependency> |
|
70 |
--> |
|
71 |
<dependency> |
|
72 |
<groupId>org.apache.hbase</groupId> |
|
73 |
<artifactId>hbase</artifactId> |
|
74 |
<version>${apache.hbase.version}</version> |
|
75 |
<exclusions> |
|
76 |
<exclusion> |
|
77 |
<artifactId>jasper-compiler</artifactId> |
|
78 |
<groupId>tomcat</groupId> |
|
79 |
</exclusion> |
|
80 |
<exclusion> |
|
81 |
<artifactId>jasper-runtime</artifactId> |
|
82 |
<groupId>tomcat</groupId> |
|
83 |
</exclusion> |
|
84 |
<exclusion> |
|
85 |
<artifactId>jamon-runtime</artifactId> |
|
86 |
<groupId>org.jamon</groupId> |
|
87 |
</exclusion> |
|
88 |
<exclusion> |
|
89 |
<artifactId>jsp-api-2.1</artifactId> |
|
90 |
<groupId>org.mortbay.jetty</groupId> |
|
91 |
</exclusion> |
|
92 |
<exclusion> |
|
93 |
<artifactId>jsp-2.1</artifactId> |
|
94 |
<groupId>org.mortbay.jetty</groupId> |
|
95 |
</exclusion> |
|
96 |
<exclusion> |
|
97 |
<artifactId>servlet-api-2.5</artifactId> |
|
98 |
<groupId>org.mortbay.jetty</groupId> |
|
99 |
</exclusion> |
|
100 |
<exclusion> |
|
101 |
<artifactId>jetty</artifactId> |
|
102 |
<groupId>org.mortbay.jetty</groupId> |
|
103 |
</exclusion> |
|
104 |
<exclusion> |
|
105 |
<artifactId>jetty-util</artifactId> |
|
106 |
<groupId>org.mortbay.jetty</groupId> |
|
107 |
</exclusion> |
|
108 |
<exclusion> |
|
109 |
<artifactId>slf4j-api</artifactId> |
|
110 |
<groupId>org.slf4j</groupId> |
|
111 |
</exclusion> |
|
112 |
<exclusion> |
|
113 |
<artifactId>slf4j-log4j12</artifactId> |
|
114 |
<groupId>org.slf4j</groupId> |
|
115 |
</exclusion> |
|
116 |
<exclusion> |
|
117 |
<artifactId>stax-api</artifactId> |
|
118 |
<groupId>stax</groupId> |
|
119 |
</exclusion> |
|
120 |
<exclusion> |
|
121 |
<artifactId>httpclient</artifactId> |
|
122 |
<groupId>org.apache.httpcomponents</groupId> |
|
123 |
</exclusion> |
|
124 |
<exclusion> |
|
125 |
<artifactId>guava</artifactId> |
|
126 |
<groupId>com.google.guava</groupId> |
|
127 |
</exclusion> |
|
128 |
<exclusion> |
|
129 |
<artifactId>asm</artifactId> |
|
130 |
<groupId>asm</groupId> |
|
131 |
</exclusion> |
|
132 |
<exclusion> |
|
133 |
<artifactId>jaxb-api</artifactId> |
|
134 |
<groupId>javax.xml.bind</groupId> |
|
135 |
</exclusion> |
|
136 |
<exclusion> |
|
137 |
<artifactId>jaxb-impl</artifactId> |
|
138 |
<groupId>com.sun.xml.bind</groupId> |
|
139 |
</exclusion> |
|
140 |
<exclusion> |
|
141 |
<artifactId>commons-codec</artifactId> |
|
142 |
<groupId>commons-codec</groupId> |
|
143 |
</exclusion> |
|
144 |
<exclusion> |
|
145 |
<artifactId>commons-logging</artifactId> |
|
146 |
<groupId>commons-logging</groupId> |
|
147 |
</exclusion> |
|
148 |
</exclusions> |
|
149 |
</dependency> |
|
150 |
<dependency> |
|
151 |
<groupId>org.apache.oozie</groupId> |
|
152 |
<artifactId>oozie-client</artifactId> |
|
153 |
<version>${apache.oozie.version}</version> |
|
154 |
<exclusions> |
|
155 |
<exclusion> |
|
156 |
<artifactId>slf4j-simple</artifactId> |
|
157 |
<groupId>org.slf4j</groupId> |
|
158 |
</exclusion> |
|
159 |
<exclusion> |
|
160 |
<artifactId>slf4j-api</artifactId> |
|
161 |
<groupId>org.slf4j</groupId> |
|
162 |
</exclusion> |
|
163 |
<exclusion> |
|
164 |
<artifactId>xercesImpl</artifactId> |
|
165 |
<groupId>xerces</groupId> |
|
166 |
</exclusion> |
|
167 |
</exclusions> |
|
168 |
</dependency> |
|
169 |
<dependency> |
|
170 |
<groupId>junit</groupId> |
|
171 |
<artifactId>junit</artifactId> |
|
172 |
<version>${junit.version}</version> |
|
173 |
<scope>test</scope> |
|
174 |
</dependency> |
|
175 |
<dependency> |
|
176 |
<groupId>org.apache.logging.log4j</groupId> |
|
177 |
<artifactId>log4j-slf4j-impl</artifactId> |
|
178 |
<version>2.0.2</version> |
|
179 |
<scope>test</scope> |
|
180 |
</dependency> |
|
181 |
<dependency> |
|
182 |
<groupId>org.apache.httpcomponents</groupId> |
|
183 |
<artifactId>httpclient</artifactId> |
|
184 |
<version>4.3.1</version> |
|
185 |
</dependency> |
|
186 |
<dependency> |
|
187 |
<groupId>commons-codec</groupId> |
|
188 |
<artifactId>commons-codec</artifactId> |
|
189 |
<version>${commons.codec.version}</version> |
|
190 |
</dependency> |
|
191 |
<dependency> |
|
192 |
<groupId>commons-net</groupId> |
|
193 |
<artifactId>commons-net</artifactId> |
|
194 |
<version>${commons.net.version}</version> |
|
195 |
</dependency> |
|
196 |
<dependency> |
|
197 |
<groupId>org.apache.commons</groupId> |
|
198 |
<artifactId>commons-compress</artifactId> |
|
199 |
<version>${commons.compress.version}</version> |
|
200 |
</dependency> |
|
201 |
|
|
202 |
</dependencies> |
|
203 |
</project> |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/resources/eu/dnetlib/data/hadoop/applicationContext-dnet-hadoop-service.properties | ||
---|---|---|
1 |
services.hadoop.clients={"DM":{"oozie":"true","mapred":"true","hbase":"true"},"IIS":{"oozie":"true","mapred":"false","hbase":"false"}} |
|
2 |
services.hadoop.hbase.tablefeeder.batchsize=500 |
|
3 |
services.hadoop.jobregistry.size=100 |
|
4 |
services.hadoop.lib.path=/user/dnet/lib/dnet-mapreduce-jobs-assembly-0.0.6.2-SNAPSHOT.jar |
|
5 |
services.hadoop.hbase.maxversions=1 |
|
6 |
services.hadoop.clients.init.timeout=30 |
|
7 |
services.hadoop.user=dnet |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieJobMonitor.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.oozie; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.*; |
|
5 |
|
|
6 |
import com.google.common.collect.Maps; |
|
7 |
import com.google.common.collect.Sets; |
|
8 |
import org.apache.commons.io.IOUtils; |
|
9 |
import org.apache.commons.lang.StringUtils; |
|
10 |
import org.apache.commons.logging.Log; |
|
11 |
import org.apache.commons.logging.LogFactory; |
|
12 |
import org.apache.oozie.client.OozieClient; |
|
13 |
import org.apache.oozie.client.OozieClientException; |
|
14 |
import org.apache.oozie.client.WorkflowAction; |
|
15 |
import org.apache.oozie.client.WorkflowJob; |
|
16 |
import org.apache.oozie.client.WorkflowJob.Status; |
|
17 |
|
|
18 |
import eu.dnetlib.data.hadoop.action.JobCompletion; |
|
19 |
import eu.dnetlib.data.hadoop.action.JobMonitor; |
|
20 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
21 |
|
|
22 |
import static java.lang.String.format; |
|
23 |
|
|
24 |
public class OozieJobMonitor extends JobMonitor { |
|
25 |
|
|
26 |
private static final Log log = LogFactory.getLog(JobMonitor.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
27 |
|
|
28 |
private final OozieClient oozieClient; |
|
29 |
|
|
30 |
private final String jobId; |
|
31 |
|
|
32 |
public static final String ACTION_TYPE_SUBWORKFLOW = "sub-workflow"; |
|
33 |
|
|
34 |
private Set<String> workflowActions = Sets.newHashSet(); |
|
35 |
|
|
36 |
@Deprecated |
|
37 |
public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback) { |
|
38 |
super(callback); |
|
39 |
this.oozieClient = oozieClient; |
|
40 |
this.jobId = jobId; |
|
41 |
} |
|
42 |
|
|
43 |
public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback, final Set<String> workflowActions) { |
|
44 |
super(callback); |
|
45 |
this.oozieClient = oozieClient; |
|
46 |
this.jobId = jobId; |
|
47 |
this.workflowActions = workflowActions; |
|
48 |
} |
|
49 |
|
|
50 |
@Override |
|
51 |
public void run() { |
|
52 |
try { |
|
53 |
log.info("waiting for oozie job completion: " + getHadoopId()); |
|
54 |
|
|
55 |
Status status = Status.PREP; |
|
56 |
while (status.equals(Status.PREP) || status.equals(Status.RUNNING)) { |
|
57 |
Thread.sleep(monitorSleepTimeSecs * 1000); |
|
58 |
|
|
59 |
try { |
|
60 |
final Status currentStatus = doGetStatus(); |
|
61 |
|
|
62 |
if (!currentStatus.equals(status)) { |
|
63 |
status = currentStatus; |
|
64 |
lastActivity = new Date(); |
|
65 |
} |
|
66 |
} catch (Throwable e) { |
|
67 |
log.warn(format("error polling status for job %s", jobId), e); |
|
68 |
} |
|
69 |
} |
|
70 |
|
|
71 |
log.debug(format("job %s finihsed with status: %s", jobId, status)); |
|
72 |
if (Status.SUCCEEDED.equals(status)) { |
|
73 |
// TODO set some content to return to the blackboard msg. |
|
74 |
|
|
75 |
log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions)); |
|
76 |
final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions); |
|
77 |
if (report != null) { |
|
78 |
final Map<String, String> map = Maps.newHashMap(); |
|
79 |
report.forEach((k, v) -> map.put(k.toString(), v.toString())); |
|
80 |
log.info("found oozie job report, size: " + map.size()); |
|
81 |
getCallback().done(map); |
|
82 |
} else { |
|
83 |
log.warn("cannot find oozie job report!"); |
|
84 |
getCallback().done(new HashMap<>()); |
|
85 |
} |
|
86 |
} else { |
|
87 |
// TODO retrieve some failure information from the oozie client. |
|
88 |
String msg = format("hadoop job: %s failed with status: %s, oozie log:\n %s\n", getHadoopId(), getStatus(), getOozieClient().getJobLog(getHadoopId())); |
|
89 |
getCallback().failed(msg, new HadoopServiceException(msg)); |
|
90 |
} |
|
91 |
} catch (Throwable e) { |
|
92 |
getCallback().failed(getHadoopId(), e); |
|
93 |
} |
|
94 |
} |
|
95 |
|
|
96 |
/** |
|
97 |
* Provides report entries when found for given oozie job identifier. Returns null when report not found. |
|
98 |
*/ |
|
99 |
private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException { |
|
100 |
WorkflowJob oozieJob = oozieClient.getJobInfo(oozieJobId); |
|
101 |
for (WorkflowAction currentAction : oozieJob.getActions()) { |
|
102 |
log.info(String.format("looking for workflow actions to report, current: '%s'", currentAction.getName())); |
|
103 |
if (workflowActions.contains(currentAction.getName())) { |
|
104 |
log.info(String.format("found workflow action %s", currentAction.getName())); |
|
105 |
if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) { |
|
106 |
log.info(String.format("looking for sub-workflow actions external id: %s", currentAction.getExternalId())); |
|
107 |
Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions); |
|
108 |
if (subworkflowProperties != null) { |
|
109 |
return subworkflowProperties; |
|
110 |
} |
|
111 |
} else if (StringUtils.isNotBlank(currentAction.getData())) { |
|
112 |
Properties properties = new Properties(); |
|
113 |
properties.load(IOUtils.toInputStream(currentAction.getData())); |
|
114 |
log.info(String.format("found workflow action(%s) properties size %s", currentAction.getName(), properties.values().size())); |
|
115 |
return properties; |
|
116 |
} |
|
117 |
} else { |
|
118 |
log.info(String.format("cannot find workflow action(%s) properties", currentAction.getName())); |
|
119 |
} |
|
120 |
} |
|
121 |
return null; |
|
122 |
} |
|
123 |
|
|
124 |
@Override |
|
125 |
public String getHadoopId() { |
|
126 |
return jobId; |
|
127 |
} |
|
128 |
|
|
129 |
public OozieClient getOozieClient() { |
|
130 |
return oozieClient; |
|
131 |
} |
|
132 |
|
|
133 |
@Override |
|
134 |
public String getStatus() { |
|
135 |
try { |
|
136 |
return doGetStatus().toString(); |
|
137 |
} catch (OozieClientException e) { |
|
138 |
log.error("error accessing job status", e); |
|
139 |
return "UNKNOWN"; |
|
140 |
} |
|
141 |
} |
|
142 |
|
|
143 |
private Status doGetStatus() throws OozieClientException { |
|
144 |
return getOozieClient().getJobInfo(getHadoopId()).getStatus(); |
|
145 |
} |
|
146 |
|
|
147 |
@Override |
|
148 |
public Date getLastActivity() { |
|
149 |
return lastActivity; |
|
150 |
} |
|
151 |
|
|
152 |
@Override |
|
153 |
public Date getStartTime() throws HadoopServiceException { |
|
154 |
try { |
|
155 |
return getOozieClient().getJobInfo(getHadoopId()).getStartTime(); |
|
156 |
} catch (OozieClientException e) { |
|
157 |
throw new HadoopServiceException("unable to read job start time", e); |
|
158 |
} |
|
159 |
} |
|
160 |
|
|
161 |
@Override |
|
162 |
public String getTrackerUrl() { |
|
163 |
return getOozieClient().getOozieUrl(); |
|
164 |
} |
|
165 |
|
|
166 |
@Override |
|
167 |
public void kill() { |
|
168 |
try { |
|
169 |
getOozieClient().kill(getHadoopId()); |
|
170 |
} catch (OozieClientException e) { |
|
171 |
log.error("unable to kill job: " + getHadoopId(), e); |
|
172 |
} |
|
173 |
} |
|
174 |
|
|
175 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieClientFactory.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.oozie; |
|
2 |
|
|
3 |
import eu.dnetlib.data.hadoop.AbstractHadoopClient; |
|
4 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
5 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.apache.oozie.client.OozieClient; |
|
9 |
|
|
10 |
/** |
|
11 |
* Factory bean for Oozie client instances. |
|
12 |
* |
|
13 |
* oozie server may not be available in every hadoop deployment, so every oozie specific operation should be defined as optional. |
|
14 |
* |
|
15 |
* @author claudio |
|
16 |
* |
|
17 |
*/ |
|
18 |
public class OozieClientFactory extends AbstractHadoopClient { |
|
19 |
|
|
20 |
private static final String ENV_ATTRIBUTE_OOZIE_SERVICE_LOC = "oozie.service.loc"; |
|
21 |
|
|
22 |
private static final Log log = LogFactory.getLog(OozieClientFactory.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
public OozieClient newInstance(ClusterName clusterName) throws HadoopServiceException { |
|
25 |
final String oozieServiceLocation = configurationEnumerator.get(clusterName).get(ENV_ATTRIBUTE_OOZIE_SERVICE_LOC); |
|
26 |
log.info("init oozie client, cluster: " + clusterName.toString() + ", oozie server: " + oozieServiceLocation); |
|
27 |
setHadoopUser(); |
|
28 |
try { |
|
29 |
return new OozieClient(oozieServiceLocation); |
|
30 |
} catch (Throwable e) { |
|
31 |
throw new HadoopServiceException("unable to initialize oozie client for cluster: " + clusterName.toString(), e); |
|
32 |
} |
|
33 |
} |
|
34 |
|
|
35 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/test/java/eu/dnetlib/data/hadoop/utils/ReadSequenceFileTest.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.utils; |
|
2 |
|
|
3 |
import java.util.Map; |
|
4 |
import java.util.Map.Entry; |
|
5 |
|
|
6 |
import org.apache.commons.math.stat.descriptive.SummaryStatistics; |
|
7 |
import org.apache.hadoop.conf.Configuration; |
|
8 |
import org.apache.hadoop.fs.Path; |
|
9 |
import org.apache.hadoop.io.Text; |
|
10 |
import org.junit.Before; |
|
11 |
import org.junit.Ignore; |
|
12 |
import org.junit.Test; |
|
13 |
import org.springframework.core.io.ClassPathResource; |
|
14 |
|
|
15 |
import com.google.common.collect.Maps; |
|
16 |
|
|
17 |
import eu.dnetlib.data.hadoop.config.ConfigurationFactory; |
|
18 |
import eu.dnetlib.data.hadoop.hdfs.SequenceFileUtils; |
|
19 |
import eu.dnetlib.miscutils.collections.Pair; |
|
20 |
|
|
21 |
public class ReadSequenceFileTest { |
|
22 |
|
|
23 |
private static final Path SEQUENCE_FILE_PATH = new Path("hdfs://nmis-hadoop-cluster/tmp/indexrecords_db_openaireplus_sesam_SESAMALL.seq"); |
|
24 |
private final String HADOOP_CONF_FILE = "/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties"; |
|
25 |
|
|
26 |
private Configuration conf; |
|
27 |
|
|
28 |
@Before |
|
29 |
public void setUp() { |
|
30 |
final ConfigurationFactory confFactory = new ConfigurationFactory(); |
|
31 |
confFactory.setDefaults(new ClassPathResource(HADOOP_CONF_FILE)); |
|
32 |
conf = confFactory.getConfiguration(); |
|
33 |
} |
|
34 |
|
|
35 |
@Test |
|
36 |
@Ignore |
|
37 |
public void testReadSequenceFile() throws Exception { |
|
38 |
final SummaryStatistics statsAll = new SummaryStatistics(); |
|
39 |
|
|
40 |
final Map<String, SummaryStatistics> stats = Maps.newHashMap(); |
|
41 |
|
|
42 |
int i = 0; |
|
43 |
for (Pair<Text, Text> pair : SequenceFileUtils.read(SEQUENCE_FILE_PATH, conf)) { |
|
44 |
final String id = pair.getKey().toString(); |
|
45 |
final String record = pair.getValue().toString(); |
|
46 |
final int length = record.getBytes().length; |
|
47 |
|
|
48 |
final String type = id.substring(0, 2); |
|
49 |
if (!stats.containsKey(type)) { |
|
50 |
stats.put(type, new SummaryStatistics()); |
|
51 |
} |
|
52 |
statsAll.addValue(length); |
|
53 |
stats.get(type).addValue(length); |
|
54 |
|
|
55 |
if (++i % 10000 == 0) { |
|
56 |
System.out.println("Read " + i); |
|
57 |
} |
|
58 |
} |
|
59 |
|
|
60 |
printStats("ALL", statsAll); |
|
61 |
for (Entry<String, SummaryStatistics> e : stats.entrySet()) { |
|
62 |
printStats(e.getKey(), e.getValue()); |
|
63 |
} |
|
64 |
} |
|
65 |
|
|
66 |
private void printStats(final String type, final SummaryStatistics stats) { |
|
67 |
System.out.println("************************************"); |
|
68 |
System.out.println("Type: " + type); |
|
69 |
System.out.println(String.format("\tmin : %.2f KBytes", stats.getMin() / 1024)); |
|
70 |
System.out.println(String.format("\tmax : %.2f KBytes", stats.getMax() / 1024)); |
|
71 |
System.out.println(String.format("\tavg : %.2f KBytes", stats.getMean() / 1024)); |
|
72 |
System.out.println(String.format("\tstdDev : %.2f", stats.getStandardDeviation() / 1024)); |
|
73 |
} |
|
74 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/deploy.info | ||
---|---|---|
1 |
{"type_source": "SVN", "goal": "package -U -T 4C source:jar", "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-hadoop-service/trunk/", "deploy_repository": "dnet45-snapshots", "version": "4", "mail": "sandro.labruzzo@isti.cnr.it,michele.artini@isti.cnr.it, claudio.atzori@isti.cnr.it, alessia.bardi@isti.cnr.it", "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots", "name": "dnet-hadoop-service"} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/AbstractHadoopClient.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
4 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
5 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.springframework.beans.factory.annotation.Autowired; |
|
9 |
|
|
10 |
public abstract class AbstractHadoopClient { |
|
11 |
|
|
12 |
private static final Log log = LogFactory.getLog(AbstractHadoopClient.class); |
|
13 |
|
|
14 |
@Autowired |
|
15 |
private ISClient isClient; |
|
16 |
|
|
17 |
@Autowired |
|
18 |
protected ConfigurationEnumerator configurationEnumerator; |
|
19 |
|
|
20 |
protected void setHadoopUser() throws HadoopServiceException { |
|
21 |
setHadoopUser(getDefaultUser()); |
|
22 |
} |
|
23 |
|
|
24 |
private String getDefaultUser() throws HadoopServiceException { |
|
25 |
try { |
|
26 |
return isClient.queryForServiceProperty("default.hadoop.user"); |
|
27 |
} catch (ISLookUpException e) { |
|
28 |
throw new HadoopServiceException(e); |
|
29 |
} |
|
30 |
} |
|
31 |
|
|
32 |
protected void setHadoopUser(final String userName) { |
|
33 |
log.info("setting HADOOP_USER_NAME as " + userName); |
|
34 |
System.setProperty("HADOOP_USER_NAME", userName); |
|
35 |
} |
|
36 |
|
|
37 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/HadoopClientMap.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map; |
|
5 |
|
|
6 |
import com.google.common.collect.Maps; |
|
7 |
import com.google.gson.Gson; |
|
8 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
9 |
import eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory; |
|
10 |
import eu.dnetlib.data.hadoop.mapred.JobClientFactory; |
|
11 |
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory; |
|
12 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
13 |
import org.apache.commons.logging.Log; |
|
14 |
import org.apache.commons.logging.LogFactory; |
|
15 |
import org.apache.hadoop.hbase.client.HBaseAdmin; |
|
16 |
import org.apache.hadoop.mapred.JobClient; |
|
17 |
import org.apache.oozie.client.OozieClient; |
|
18 |
import org.springframework.beans.factory.annotation.Required; |
|
19 |
|
|
20 |
|
|
21 |
public class HadoopClientMap { |
|
22 |
|
|
23 |
private static final Log log = LogFactory.getLog(HadoopClientMap.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
24 |
|
|
25 |
private JobClientFactory jobClientFactory; |
|
26 |
|
|
27 |
private OozieClientFactory oozieClientFactory; |
|
28 |
|
|
29 |
private HBaseAdminFactory hbaseAdminFactory; |
|
30 |
|
|
31 |
private Map<String, Map<String, String>> enabledClients = Maps.newHashMap(); |
|
32 |
|
|
33 |
public boolean isMapreduceAvailable(final ClusterName name) { |
|
34 |
return isClientAvailable(name, "mapred"); |
|
35 |
} |
|
36 |
|
|
37 |
public boolean isOozieAvailable(final ClusterName name) { |
|
38 |
return isClientAvailable(name, "oozie"); |
|
39 |
} |
|
40 |
|
|
41 |
public boolean isHbaseAvailable(final ClusterName name) { |
|
42 |
return isClientAvailable(name, "hbase"); |
|
43 |
} |
|
44 |
|
|
45 |
private boolean isClientAvailable(final ClusterName name, final String clientName) { |
|
46 |
final String clusterName = name.toString(); |
|
47 |
return enabledClients.containsKey(clusterName) && "true".equals(enabledClients.get(clusterName).get(clientName)); |
|
48 |
} |
|
49 |
|
|
50 |
public JobClient getJtClient(final ClusterName clusterName, final String username) throws HadoopServiceException, IOException { |
|
51 |
if (!isMapreduceAvailable(clusterName)) { |
|
52 |
throw new HadoopServiceException("jobtracker is not available for cluster " + clusterName.toString()); |
|
53 |
} |
|
54 |
return getJobClientFactory().newInstance(clusterName, username); |
|
55 |
} |
|
56 |
|
|
57 |
public JobClient getJtClient(final ClusterName clusterName) throws HadoopServiceException, IOException { |
|
58 |
if (!isMapreduceAvailable(clusterName)) { |
|
59 |
throw new HadoopServiceException("jobtracker is not available for cluster " + clusterName.toString()); |
|
60 |
} |
|
61 |
return getJobClientFactory().newInstance(clusterName); |
|
62 |
} |
|
63 |
|
|
64 |
public OozieClient getOozieClient(final ClusterName name) throws HadoopServiceException { |
|
65 |
if (!isOozieAvailable(name)) { |
|
66 |
throw new HadoopServiceException("oozie is not available for cluster " + name.toString()); |
|
67 |
} |
|
68 |
return getOozieClientFactory().newInstance(name); |
|
69 |
} |
|
70 |
|
|
71 |
public HBaseAdmin getHbaseAdmin(final ClusterName name) throws HadoopServiceException { |
|
72 |
if (!isHbaseAvailable(name)) { |
|
73 |
throw new HadoopServiceException("hbase is not available for cluster " + name.toString()); |
|
74 |
} |
|
75 |
return getHbaseAdminFactory().newInstance(name); |
|
76 |
} |
|
77 |
|
|
78 |
// ////////// |
|
79 |
|
|
80 |
public String getEnabledClients() { |
|
81 |
return new Gson().toJson(enabledClients); |
|
82 |
} |
|
83 |
|
|
84 |
@Required |
|
85 |
@SuppressWarnings("unchecked") |
|
86 |
public void setEnabledClients(final String enabledClients) { |
|
87 |
this.enabledClients = new Gson().fromJson(enabledClients, Map.class); |
|
88 |
} |
|
89 |
|
|
90 |
public JobClientFactory getJobClientFactory() { |
|
91 |
return jobClientFactory; |
|
92 |
} |
|
93 |
|
|
94 |
@Required |
|
95 |
public void setJobClientFactory(final JobClientFactory jobClientFactory) { |
|
96 |
this.jobClientFactory = jobClientFactory; |
|
97 |
} |
|
98 |
|
|
99 |
public OozieClientFactory getOozieClientFactory() { |
|
100 |
return oozieClientFactory; |
|
101 |
} |
|
102 |
|
|
103 |
@Required |
|
104 |
public void setOozieClientFactory(final OozieClientFactory oozieClientFactory) { |
|
105 |
this.oozieClientFactory = oozieClientFactory; |
|
106 |
} |
|
107 |
|
|
108 |
public HBaseAdminFactory getHbaseAdminFactory() { |
|
109 |
return hbaseAdminFactory; |
|
110 |
} |
|
111 |
|
|
112 |
@Required |
|
113 |
public void setHbaseAdminFactory(final HBaseAdminFactory hbaseAdminFactory) { |
|
114 |
this.hbaseAdminFactory = hbaseAdminFactory; |
|
115 |
} |
|
116 |
|
|
117 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/ISClient.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import javax.annotation.Resource; |
|
6 |
|
|
7 |
import org.apache.commons.logging.Log; |
|
8 |
import org.apache.commons.logging.LogFactory; |
|
9 |
|
|
10 |
import com.google.common.collect.Iterables; |
|
11 |
|
|
12 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
13 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
|
14 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException; |
|
15 |
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService; |
|
16 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
17 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
18 |
|
|
19 |
public class ISClient { |
|
20 |
|
|
21 |
private static final Log log = LogFactory.getLog(ISClient.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
22 |
|
|
23 |
@Resource |
|
24 |
private UniqueServiceLocator serviceLocator; |
|
25 |
|
|
26 |
public String getJobProfile(final String jobName) throws ISLookUpException { |
|
27 |
return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery( |
|
28 |
"/RESOURCE_PROFILE[.//RESOURCE_TYPE/@value = 'HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" + jobName + "']"); |
|
29 |
} |
|
30 |
|
|
31 |
public String queryForServiceProperty(final String key) throws ISLookUpException { |
|
32 |
return getServiceConfigValue( |
|
33 |
String.format( |
|
34 |
"distinct-values(" |
|
35 |
+ "for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='HadoopServiceResourceType'] " |
|
36 |
+ "return $x//SERVICE_PROPERTIES/PROPERTY[./@ key='%s']/@value/string())", |
|
37 |
key)); |
|
38 |
} |
|
39 |
|
|
40 |
public void updateCountElement(final String jobName, final String element, final String delta) { |
|
41 |
final String xquery = |
|
42 |
"let $x := //RESOURCE_PROFILE[" + ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" + jobName |
|
43 |
+ "'], $tot := $x//STATUS/" + element + "/@value/number() " + delta + " return update replace $x//STATUS/" + element + " with <" |
|
44 |
+ element + " value='{$tot}' />"; |
|
45 |
|
|
46 |
executeXUpdate(xquery); |
|
47 |
} |
|
48 |
|
|
49 |
public void updateDate(final String jobName) { |
|
50 |
log.info("increment last submission date for job: " + jobName); |
|
51 |
executeXUpdate("for $x in collection('')/RESOURCE_PROFILE[" |
|
52 |
+ ".//RESOURCE_TYPE/@value='HadoopJobConfigurationDSResourceType' and .//HADOOP_JOB/@name='" |
|
53 |
+ jobName + "'] " + " return update value $x//LAST_SUBMISSION_DATE/@value with '" + DateUtils.now_ISO8601() + "' "); |
|
54 |
} |
|
55 |
|
|
56 |
private String getServiceConfigValue(final String xquery) throws ISLookUpException { |
|
57 |
log.debug("quering for service property: " + xquery); |
|
58 |
final List<String> urls = serviceLocator.getService(ISLookUpService.class).quickSearchProfile(xquery); |
|
59 |
if ((urls == null) || (urls.size() != 1)) |
|
60 |
throw new IllegalStateException("unable to find unique service property, xquery: " + xquery); |
|
61 |
return Iterables.getOnlyElement(urls); |
|
62 |
} |
|
63 |
|
|
64 |
private boolean executeXUpdate(final String xupdate) { |
|
65 |
try { |
|
66 |
log.debug("running xupdate: " + xupdate); |
|
67 |
return serviceLocator.getService(ISRegistryService.class).executeXUpdate(xupdate); |
|
68 |
} catch (final ISRegistryException e) { |
|
69 |
log.error("unable to run xupdate: " + xupdate, e); |
|
70 |
return false; |
|
71 |
} |
|
72 |
} |
|
73 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseAdminFactory.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import eu.dnetlib.data.hadoop.AbstractHadoopClient; |
|
4 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
5 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.apache.hadoop.hbase.client.HBaseAdmin; |
|
9 |
|
|
10 |
public class HBaseAdminFactory extends AbstractHadoopClient { |
|
11 |
|
|
12 |
private static final Log log = LogFactory.getLog(HBaseAdminFactory.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
13 |
|
|
14 |
public HBaseAdmin newInstance(final ClusterName clusterName) throws HadoopServiceException { |
|
15 |
try { |
|
16 |
log.info("init hbaseAdmin, cluster: " + clusterName.toString()); |
|
17 |
setHadoopUser(); |
|
18 |
return new HBaseAdmin(configurationEnumerator.get(clusterName)); |
|
19 |
} catch (final Throwable e) { |
|
20 |
throw new HadoopServiceException("unable to initialize hbase client for cluster: " + clusterName.toString(), e); |
|
21 |
} |
|
22 |
} |
|
23 |
|
|
24 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/hbase/HbaseTableFeeder.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Arrays; |
|
5 |
import java.util.List; |
|
6 |
|
|
7 |
import com.google.common.base.Predicates; |
|
8 |
import com.google.common.collect.Iterables; |
|
9 |
import com.google.common.collect.Lists; |
|
10 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
11 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
12 |
import eu.dnetlib.data.transform.Row; |
|
13 |
import eu.dnetlib.data.transform.XsltRowTransformerFactory; |
|
14 |
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory; |
|
15 |
import org.apache.commons.logging.Log; |
|
16 |
import org.apache.commons.logging.LogFactory; |
|
17 |
import org.apache.hadoop.conf.Configuration; |
|
18 |
import org.apache.hadoop.hbase.client.HTable; |
|
19 |
import org.apache.hadoop.hbase.client.Mutation; |
|
20 |
import org.springframework.beans.factory.annotation.Autowired; |
|
21 |
import org.springframework.beans.factory.annotation.Required; |
|
22 |
|
|
23 |
/** |
|
24 |
* The Class HbaseTableFeeder provides abstraction to ship batch operation on an HBase table |
|
25 |
*/ |
|
26 |
public abstract class HbaseTableFeeder { |
|
27 |
|
|
28 |
/** |
|
29 |
* The Constant log. |
|
30 |
*/ |
|
31 |
private static final Log log = LogFactory.getLog(HbaseTableFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
32 |
/** |
|
33 |
* The configuration enumerator. |
|
34 |
*/ |
|
35 |
@Autowired |
|
36 |
protected ConfigurationEnumerator configurationEnumerator; |
|
37 |
/** |
|
38 |
* The batch size. |
|
39 |
*/ |
|
40 |
private int batchSize = 100; |
|
41 |
/** |
|
42 |
* The result set client factory. |
|
43 |
*/ |
|
44 |
private ResultSetClientFactory resultSetClientFactory; |
|
45 |
|
|
46 |
/** |
|
47 |
* Adds the operation. |
|
48 |
* |
|
49 |
* @param buffer the buffer |
|
50 |
* @param row the row |
|
51 |
*/ |
|
52 |
protected abstract void addOperation(final List<Mutation> buffer, final Row row); |
|
53 |
|
|
54 |
/** |
|
55 |
* Feed. |
|
56 |
* |
|
57 |
* @param epr the epr |
|
58 |
* @param xsl the xsl |
|
59 |
* @param clusterName the cluster name |
|
60 |
* @param tableName the table name |
|
61 |
* @param simulation the simulation |
|
62 |
* @return the int |
|
63 |
* @throws IOException Signals that an I/O exception has occurred. |
|
64 |
* @throws InterruptedException the interrupted exception |
|
65 |
*/ |
|
66 |
public int feed(final String epr, final String xsl, final ClusterName clusterName, final String tableName, final boolean simulation) |
|
67 |
throws IOException, InterruptedException { |
|
68 |
return doWrite(asRows(epr, xsl), getConf(clusterName), tableName, simulation); |
|
69 |
} |
|
70 |
|
|
71 |
/** |
|
72 |
* Do writeOnHBase. |
|
73 |
* |
|
74 |
* @param rows the rows |
|
75 |
* @param configuration the configuration |
|
76 |
* @param tableName the table name |
|
77 |
* @param simulation the simulation |
|
78 |
* @return the int |
|
79 |
* @throws IOException Signals that an I/O exception has occurred. |
|
80 |
* @throws InterruptedException the interrupted exception |
|
81 |
*/ |
|
82 |
private int doWrite(final Iterable<Row> rows, final Configuration configuration, final String tableName, final boolean simulation) |
|
83 |
throws IOException, InterruptedException { |
|
84 |
final List<Mutation> buffer = Lists.newArrayList(); |
|
85 |
|
|
86 |
int count = 0; |
|
87 |
if (simulation) { |
|
88 |
log.info("running in simulation mode ..."); |
|
89 |
log.info(String.format("... simulated import of %d records", Iterables.size(rows))); |
|
90 |
} else { |
|
91 |
|
|
92 |
final HTable htable = new HTable(configuration, tableName); |
|
93 |
try { |
|
94 |
int i = 0; |
|
95 |
for (final Row row : rows) { |
|
96 |
addOperation(buffer, row); |
|
97 |
if ((++i % getBatchSize()) == 0) { |
|
98 |
flush(tableName, buffer, htable); |
|
99 |
count += buffer.size(); |
|
100 |
buffer.clear(); |
|
101 |
} |
|
102 |
} |
|
103 |
} finally { |
|
104 |
if (!buffer.isEmpty()) { |
|
105 |
flush(tableName, buffer, htable); |
|
106 |
count += buffer.size(); |
|
107 |
} |
|
108 |
htable.flushCommits(); |
|
109 |
htable.close(); |
|
110 |
} |
|
111 |
} |
|
112 |
return count; |
|
113 |
} |
|
114 |
|
|
115 |
private void flush(final String tableName, final List<Mutation> buffer, final HTable htable) throws IOException, InterruptedException { |
|
116 |
if (!checkOp(htable.batch(buffer), tableName)) throw new IOException("unable to flush operation on HBase table: " + tableName); |
|
117 |
} |
|
118 |
|
|
119 |
private boolean checkOp(final Object[] res, final String tableName) throws IOException { |
|
120 |
return Iterables.all(Arrays.asList(res), Predicates.notNull()); |
|
121 |
} |
|
122 |
|
|
123 |
/** |
|
124 |
* As rows. |
|
125 |
* |
|
126 |
* @param epr the epr |
|
127 |
* @param xsl the xsl |
|
128 |
* @return the iterable |
|
129 |
*/ |
|
130 |
protected Iterable<Row> asRows(final String epr, final String xsl) { |
|
131 |
return Iterables.concat(Iterables.transform(getResultSetClientFactory().getClient(epr), XsltRowTransformerFactory.newInstance(xsl))); |
|
132 |
} |
|
133 |
|
|
134 |
/** |
|
135 |
* Gets the conf. |
|
136 |
* |
|
137 |
* @param clusterName the cluster name |
|
138 |
* @return the conf |
|
139 |
*/ |
|
140 |
protected Configuration getConf(final ClusterName clusterName) { |
|
141 |
return configurationEnumerator.get(clusterName); |
|
142 |
} |
|
143 |
|
|
144 |
/** |
|
145 |
* Gets the batch size. |
|
146 |
* |
|
147 |
* @return the batch size |
|
148 |
*/ |
|
149 |
public int getBatchSize() { |
|
150 |
return batchSize; |
|
151 |
} |
|
152 |
|
|
153 |
/** |
|
154 |
* Sets the batch size. |
|
155 |
* |
|
156 |
* @param batchSize the new batch size |
|
157 |
*/ |
|
158 |
public void setBatchSize(final int batchSize) { |
|
159 |
this.batchSize = batchSize; |
|
160 |
} |
|
161 |
|
|
162 |
/** |
|
163 |
* Gets the result set client factory. |
|
164 |
* |
|
165 |
* @return the result set client factory |
|
166 |
*/ |
|
167 |
public ResultSetClientFactory getResultSetClientFactory() { |
|
168 |
return resultSetClientFactory; |
|
169 |
} |
|
170 |
|
|
171 |
/** |
|
172 |
* Sets the result set client factory. |
|
173 |
* |
|
174 |
* @param resultSetClientFactory the new result set client factory |
|
175 |
*/ |
|
176 |
@Required |
|
177 |
public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) { |
|
178 |
this.resultSetClientFactory = resultSetClientFactory; |
|
179 |
} |
|
180 |
|
|
181 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseDeleteFeeder.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.apache.hadoop.hbase.client.Delete; |
|
8 |
import org.apache.hadoop.hbase.client.Mutation; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
|
|
11 |
import eu.dnetlib.data.transform.Column; |
|
12 |
import eu.dnetlib.data.transform.Row; |
|
13 |
|
|
14 |
/** |
|
15 |
* The Class HBaseDeleteFeeder performs a batch of Delete operations. |
|
16 |
*/ |
|
17 |
public class HBaseDeleteFeeder extends HbaseTableFeeder { |
|
18 |
|
|
19 |
/** |
|
20 |
* Logger. |
|
21 |
*/ |
|
22 |
private static final Log log = LogFactory.getLog(HBaseDeleteFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
/* |
|
25 |
* (non-Javadoc) |
|
26 |
* |
|
27 |
* @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row) |
|
28 |
*/ |
|
29 |
@Override |
|
30 |
protected void addOperation(final List<Mutation> buffer, final Row row) { |
|
31 |
final Delete delete = new Delete(Bytes.toBytes(row.getKey())); |
|
32 |
delete.setWriteToWAL(true); |
|
33 |
|
|
34 |
for (final Column<String, byte[]> col : row) { |
|
35 |
log.debug(String.format("deleting K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName())); |
|
36 |
delete.deleteColumns(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName())); |
|
37 |
} |
|
38 |
|
|
39 |
buffer.add(delete); |
|
40 |
} |
|
41 |
|
|
42 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/hbase/HBasePutFeeder.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.apache.hadoop.hbase.client.Mutation; |
|
8 |
import org.apache.hadoop.hbase.client.Put; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
|
|
11 |
import eu.dnetlib.data.transform.Column; |
|
12 |
import eu.dnetlib.data.transform.Row; |
|
13 |
|
|
14 |
/** |
|
15 |
* The Class HBasePutFeeder performs a batch of Put operations.. |
|
16 |
*/ |
|
17 |
public class HBasePutFeeder extends HbaseTableFeeder { |
|
18 |
|
|
19 |
/** |
|
20 |
* Logger. |
|
21 |
*/ |
|
22 |
private static final Log log = LogFactory.getLog(HBasePutFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
/* |
|
25 |
* (non-Javadoc) |
|
26 |
* |
|
27 |
* @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row) |
|
28 |
*/ |
|
29 |
@Override |
|
30 |
protected void addOperation(final List<Mutation> buffer, final Row row) { |
|
31 |
final Put put = new Put(Bytes.toBytes(row.getKey())); |
|
32 |
put.setWriteToWAL(true); |
|
33 |
|
|
34 |
for (final Column<String, byte[]> col : row) { |
|
35 |
log.debug(String.format("adding value to K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName())); |
|
36 |
put.add(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName()), col.getValue()); |
|
37 |
} |
|
38 |
|
|
39 |
buffer.add(put); |
|
40 |
} |
|
41 |
|
|
42 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.net.URI; |
|
5 |
import java.util.*; |
|
6 |
import java.util.Map.Entry; |
|
7 |
import java.util.stream.Collectors; |
|
8 |
|
|
9 |
import com.google.common.collect.Lists; |
|
10 |
import com.google.common.collect.Maps; |
|
11 |
import com.google.common.collect.Sets; |
|
12 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
13 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
14 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
15 |
import eu.dnetlib.data.hadoop.rmi.hbase.Column; |
|
16 |
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor; |
|
17 |
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableDescriptor; |
|
18 |
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableRegionInfo; |
|
19 |
import org.apache.commons.lang.StringUtils; |
|
20 |
import org.apache.commons.logging.Log; |
|
21 |
import org.apache.commons.logging.LogFactory; |
|
22 |
import org.apache.hadoop.conf.Configuration; |
|
23 |
import org.apache.hadoop.fs.FileSystem; |
|
24 |
import org.apache.hadoop.fs.Path; |
|
25 |
import org.apache.hadoop.hbase.HColumnDescriptor; |
|
26 |
import org.apache.hadoop.hbase.HRegionInfo; |
|
27 |
import org.apache.hadoop.hbase.HTableDescriptor; |
|
28 |
import org.apache.hadoop.hbase.client.*; |
|
29 |
import org.apache.hadoop.hbase.util.Bytes; |
|
30 |
import org.springframework.beans.factory.annotation.Autowired; |
|
31 |
import org.springframework.beans.factory.annotation.Required; |
|
32 |
|
|
33 |
public class HadoopServiceCore { |
|
34 |
|
|
35 |
private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
36 |
@Autowired |
|
37 |
protected ConfigurationEnumerator configurationEnumerator; |
|
38 |
@Autowired |
|
39 |
private HadoopClientMap clients; |
|
40 |
private int maxVersions; |
|
41 |
|
|
42 |
public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException { |
|
43 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
44 |
return Arrays.asList(admin.listTables()) |
|
45 |
.stream() |
|
46 |
.map(HTableDescriptor::getNameAsString) |
|
47 |
.collect(Collectors.toList()); |
|
48 |
} |
|
49 |
} |
|
50 |
|
|
51 |
public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException { |
|
52 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
53 |
|
|
54 |
if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null"); |
|
55 |
|
|
56 |
if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString())); |
|
57 |
|
|
58 |
final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes()); |
|
59 |
|
|
60 |
final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes()); |
|
61 |
|
|
62 |
final Set<String> columns = Sets.newHashSet(); |
|
63 |
|
|
64 |
for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) { |
|
65 |
columns.add(hColDesc.getNameAsString()); |
|
66 |
} |
|
67 |
|
|
68 |
HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor(); |
|
69 |
htDescriptor.setColumns(columns); |
|
70 |
|
|
71 |
List<HBaseTableRegionInfo> regions = Lists.newArrayList(); |
|
72 |
|
|
73 |
for (HRegionInfo info : tableRegions) { |
|
74 |
regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey()))); |
|
75 |
} |
|
76 |
htDescriptor.setRegions(regions); |
|
77 |
|
|
78 |
if (log.isDebugEnabled()) { |
|
79 |
log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString()); |
|
80 |
} |
|
81 |
|
|
82 |
return htDescriptor.toString(); |
|
83 |
} |
|
84 |
} |
|
85 |
|
|
86 |
public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
87 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
88 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
89 |
return desc.getFamilies().stream() |
|
90 |
.map(d -> d.getNameAsString()) |
|
91 |
.collect(Collectors.toList()); |
|
92 |
} |
|
93 |
} |
|
94 |
|
|
95 |
public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
96 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
97 |
|
|
98 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table"); |
|
99 |
|
|
100 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
101 |
|
|
102 |
log.info("disabling table: " + table); |
|
103 |
admin.disableTable(table); |
|
104 |
|
|
105 |
log.info("deleting table: " + table); |
|
106 |
admin.deleteTable(table); |
|
107 |
|
|
108 |
log.info("creating table: " + table); |
|
109 |
admin.createTable(desc); |
|
110 |
} |
|
111 |
} |
|
112 |
|
|
113 |
public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
114 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
115 |
|
|
116 |
return admin.tableExists(table); |
|
117 |
} |
|
118 |
} |
|
119 |
|
|
120 |
public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
121 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
122 |
|
|
123 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'"); |
|
124 |
|
|
125 |
log.info("disabling table: " + table); |
|
126 |
admin.disableTable(table); |
|
127 |
|
|
128 |
log.info("deleting table: " + table); |
|
129 |
admin.deleteTable(table); |
|
130 |
} |
|
131 |
} |
|
132 |
|
|
133 |
public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException { |
|
134 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
135 |
|
|
136 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
137 |
|
|
138 |
if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration"); |
|
139 |
|
|
140 |
final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration); |
|
141 |
|
|
142 |
doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions()); |
|
143 |
} |
|
144 |
} |
|
145 |
|
|
146 |
public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
|
147 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
148 |
|
|
149 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
150 |
|
|
151 |
doCreateTable(clusterName, table, columns, null); |
|
152 |
} |
|
153 |
} |
|
154 |
|
|
155 |
public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions) |
|
156 |
throws IOException, HadoopServiceException { |
|
157 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
158 |
|
|
159 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
160 |
|
|
161 |
final HTableDescriptor desc = new HTableDescriptor(table); |
|
162 |
for (final String column : columns) { |
|
163 |
final HColumnDescriptor hds = new HColumnDescriptor(column); |
|
164 |
hds.setMaxVersions(getMaxVersions()); |
|
165 |
desc.addFamily(hds); |
|
166 |
} |
|
167 |
|
|
168 |
log.info("creating hbase table: " + table); |
|
169 |
|
|
170 |
if (regions != null && !regions.isEmpty()) { |
|
171 |
log.debug(String.format("create using %s regions: %s", regions.size(), regions)); |
|
172 |
admin.createTable(desc, getSplitKeys(regions)); |
|
173 |
} else { |
|
174 |
admin.createTable(desc); |
|
175 |
} |
|
176 |
|
|
177 |
log.info("created hbase table: [" + table + "]"); |
|
178 |
log.debug("descriptor: [" + desc.toString() + "]"); |
|
179 |
} |
|
180 |
} |
|
181 |
|
|
182 |
private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) { |
|
183 |
byte[][] splits = new byte[regions.size() - 1][]; |
|
184 |
for (int i = 0; i < regions.size() - 1; i++) { |
|
185 |
splits[i] = regions.get(i).getEndKey().getBytes(); |
|
186 |
} |
|
187 |
return splits; |
|
188 |
} |
|
189 |
|
|
190 |
public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
|
191 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
192 |
|
|
193 |
if (!admin.tableExists(table)) { |
|
194 |
createTable(clusterName, table, columns); |
|
195 |
} else { |
|
196 |
final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table)); |
|
197 |
|
|
198 |
final Set<String> foundColumns = desc.getFamilies().stream() |
|
199 |
.map(d -> d.getNameAsString()) |
|
200 |
.collect(Collectors.toCollection(HashSet::new)); |
|
201 |
|
|
202 |
log.info("ensuring columns on table " + table + ": " + columns); |
|
203 |
final Collection<String> missingColumns = Sets.difference(columns, foundColumns); |
|
204 |
if (!missingColumns.isEmpty()) { |
|
205 |
|
|
206 |
if (admin.isTableEnabled(table)) { |
|
207 |
admin.disableTable(table); |
|
208 |
} |
|
209 |
|
|
210 |
for (final String column : missingColumns) { |
|
211 |
log.info("hbase table: '" + table + "', adding column: " + column); |
|
212 |
admin.addColumn(table, new HColumnDescriptor(column)); |
|
213 |
} |
|
214 |
|
|
215 |
admin.enableTable(table); |
|
216 |
} |
|
217 |
} |
|
218 |
} |
|
219 |
} |
|
220 |
|
|
221 |
public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException { |
|
222 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
223 |
final HTable table = new HTable(conf, tableName); |
|
224 |
|
|
225 |
try { |
|
226 |
table.put(puts); |
|
227 |
} finally { |
|
228 |
table.flushCommits(); |
|
229 |
table.close(); |
|
230 |
} |
|
231 |
} |
|
232 |
|
|
233 |
public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException { |
|
234 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
235 |
final HTable table = new HTable(conf, tableName); |
|
236 |
try { |
|
237 |
table.delete(deletes); |
|
238 |
} finally { |
|
239 |
table.flushCommits(); |
|
240 |
table.close(); |
|
241 |
} |
|
242 |
} |
|
243 |
|
|
244 |
public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException { |
|
245 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
246 |
final HTable table = new HTable(conf, tableName); |
|
247 |
try { |
|
248 |
for(HBaseRowDescriptor desc : columns) { |
|
249 |
|
|
250 |
final Delete d = new Delete(Bytes.toBytes(desc.getRowKey())); |
|
251 |
d.setWriteToWAL(true); |
|
252 |
for(Column c : desc.getColumns()) { |
|
253 |
for(String qualifier : c.getQualifier()) { |
|
254 |
log.info(String.format("delete from row '%s' cf '%s:%s'", desc.getRowKey(), c.getFamily(), qualifier)); |
|
255 |
d.deleteColumns(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier)); |
|
256 |
} |
|
257 |
} |
|
258 |
table.delete(d); |
|
259 |
} |
|
260 |
} finally { |
|
261 |
table.flushCommits(); |
|
262 |
table.close(); |
|
263 |
} |
|
264 |
} |
|
265 |
|
|
266 |
public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException { |
|
267 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
268 |
final HTable table = new HTable(conf, tableName); |
|
269 |
try { |
|
270 |
return table.get(new Get(id)); |
|
271 |
} finally { |
|
272 |
table.close(); |
|
273 |
} |
|
274 |
} |
|
275 |
|
|
276 |
public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException { |
|
277 |
final Map<String, HBaseRowDescriptor> map = Maps.newHashMap(); |
|
278 |
for(String rowKey : rowKeys) { |
|
279 |
map.put(rowKey, describeRow(clusterName, tableName, rowKey)); |
|
280 |
} |
|
281 |
return map; |
|
282 |
} |
|
283 |
|
|
284 |
public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException { |
|
285 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
286 |
final HTable table = new HTable(conf, tableName); |
|
287 |
|
|
288 |
final HBaseRowDescriptor desc = new HBaseRowDescriptor(); |
|
289 |
|
|
290 |
try { |
|
291 |
final Result r = table.get(new Get(Bytes.toBytes(rowKey))); |
|
292 |
|
|
293 |
if (r.isEmpty()) { |
|
294 |
return desc; |
|
295 |
} |
|
296 |
|
|
297 |
final List<Column> columns = Lists.newArrayList(); |
|
298 |
|
|
299 |
for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) { |
|
300 |
final Set<byte[]> qualifiers = e.getValue().keySet(); |
|
301 |
final String family = new String(e.getKey()); |
|
302 |
final Column col = new Column(family); |
|
303 |
|
|
304 |
for(byte[] q : qualifiers) { |
|
305 |
String qs = new String(q); |
|
306 |
col.getQualifier().add(qs); |
|
307 |
} |
|
308 |
columns.add(col); |
|
309 |
} |
|
310 |
desc.setColumns(columns); |
|
311 |
desc.setRowKey(rowKey); |
|
312 |
|
|
313 |
return desc; |
|
314 |
} finally { |
|
315 |
table.close(); |
|
316 |
} |
|
317 |
} |
|
318 |
|
|
319 |
public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException { |
|
320 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
321 |
try(final HTable table = new HTable(conf, tableName)) { |
|
322 |
final ResultScanner rs = table.getScanner(scan); |
|
323 |
try { |
|
324 |
return Lists.newArrayList(rs.iterator()); |
|
325 |
} finally { |
|
326 |
rs.close(); |
|
327 |
} |
|
328 |
} |
|
329 |
} |
|
330 |
|
|
331 |
public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException { |
|
332 |
if (StringUtils.isBlank(path)) |
|
333 |
throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path."); |
|
334 |
|
|
335 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
336 |
|
|
337 |
try(final FileSystem hdfs = FileSystem.get(conf)) { |
|
338 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
|
339 |
|
|
340 |
if (hdfs.exists(absolutePath)) { |
|
341 |
log.debug("deleteFromHBase path: " + absolutePath.toString()); |
|
342 |
hdfs.delete(absolutePath, true); |
|
343 |
log.info("deleted path: " + absolutePath.toString()); |
|
344 |
return true; |
|
345 |
} else { |
|
346 |
log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString()); |
|
347 |
return false; |
|
348 |
} |
|
349 |
} catch (IOException e) { |
|
350 |
throw new HadoopServiceException(e); |
|
351 |
} |
|
352 |
} |
|
353 |
|
|
354 |
public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException { |
|
355 |
if (StringUtils.isBlank(path)) |
|
356 |
throw new HadoopServiceException("Cannot create an empty HDFS path."); |
|
357 |
|
|
358 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
359 |
|
|
360 |
try(final FileSystem hdfs = FileSystem.get(conf)) { |
|
361 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
|
362 |
if (!hdfs.exists(absolutePath)) { |
|
363 |
hdfs.mkdirs(absolutePath); |
|
364 |
log.info("created path: " + absolutePath.toString()); |
|
365 |
return true; |
|
366 |
} else if (force) { |
|
367 |
log.info(String.format("found directory '%s', force delete it", absolutePath.toString())); |
|
368 |
hdfs.delete(absolutePath, true); |
|
369 |
|
|
370 |
hdfs.mkdirs(absolutePath); |
|
371 |
log.info("created path: " + absolutePath.toString()); |
|
372 |
return true; |
|
373 |
} else { |
|
374 |
log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString())); |
|
375 |
return false; |
|
376 |
} |
|
377 |
} catch (IOException e) { |
|
378 |
throw new HadoopServiceException(e); |
|
379 |
} |
|
380 |
} |
|
381 |
|
|
382 |
public boolean existHdfsPath(final ClusterName clusterName, final String path) throws HadoopServiceException { |
|
383 |
if (StringUtils.isBlank(path)) |
|
384 |
throw new HadoopServiceException("invalid empty path"); |
|
385 |
|
|
386 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
387 |
try(final FileSystem hdfs = FileSystem.get(conf)) { |
|
388 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
|
389 |
return hdfs.exists(absolutePath); |
|
390 |
} catch (IOException e) { |
|
391 |
throw new HadoopServiceException(e); |
|
392 |
} |
|
393 |
} |
|
394 |
|
|
395 |
public Configuration getClusterConiguration(final ClusterName clusterName) { |
|
396 |
return configurationEnumerator.get(clusterName); |
|
397 |
} |
|
398 |
|
|
399 |
public int getMaxVersions() { |
|
400 |
return maxVersions; |
|
401 |
} |
|
402 |
|
|
403 |
@Required |
|
404 |
public void setMaxVersions(final int maxVersions) { |
|
405 |
this.maxVersions = maxVersions; |
|
406 |
} |
|
407 |
|
|
408 |
public HadoopClientMap getClients() { |
|
409 |
return clients; |
|
410 |
} |
|
411 |
|
|
412 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.6/src/main/java/eu/dnetlib/data/hadoop/utils/ScanProperties.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.utils; |
|
2 |
|
Also available in: Unified diff
[maven-release-plugin] copy for tag dnet-hadoop-service-2.7.6