Revision 48802
Added by Claudio Atzori over 6 years ago
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/deploy.info | ||
---|---|---|
1 |
{"type_source": "SVN", "goal": "package -U -T 4C source:jar", "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-hadoop-service/trunk/", "deploy_repository": "dnet45-snapshots", "version": "4", "mail": "sandro.labruzzo@isti.cnr.it,michele.artini@isti.cnr.it, claudio.atzori@isti.cnr.it, alessia.bardi@isti.cnr.it", "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots", "name": "dnet-hadoop-service"} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/test/java/eu/dnetlib/data/hadoop/utils/CopyTableTest.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.utils; |
|
2 |
|
|
3 |
import static org.junit.Assert.assertNotNull; |
|
4 |
|
|
5 |
import java.io.IOException; |
|
6 |
import java.util.Map.Entry; |
|
7 |
|
|
8 |
import org.apache.hadoop.conf.Configuration; |
|
9 |
import org.apache.hadoop.hbase.mapreduce.CopyTable; |
|
10 |
import org.apache.hadoop.mapreduce.Job; |
|
11 |
import org.junit.Test; |
|
12 |
|
|
13 |
public class CopyTableTest { |
|
14 |
|
|
15 |
@Test |
|
16 |
public void testCopyTable() throws IOException { |
|
17 |
|
|
18 |
Job job = |
|
19 |
CopyTable.createSubmittableJob(new Configuration(), new String[] { "--peer.adr=server1,server2,server3:2181:/hbase", |
|
20 |
"--families=myOldCf:myNewCf,cf2,cf3", "tableName" }); |
|
21 |
assertNotNull(job); |
|
22 |
|
|
23 |
Configuration conf = job.getConfiguration(); |
|
24 |
|
|
25 |
for (Entry<String, String> e : conf) { |
|
26 |
System.out.println(String.format("<PROPERTY key=\"%s\" value=\"%s\"/>", e.getKey(), e.getValue())); |
|
27 |
} |
|
28 |
|
|
29 |
} |
|
30 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/test/java/eu/dnetlib/data/hadoop/utils/ReadSequenceFileTest.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.utils; |
|
2 |
|
|
3 |
import java.util.Map; |
|
4 |
import java.util.Map.Entry; |
|
5 |
|
|
6 |
import org.apache.commons.math.stat.descriptive.SummaryStatistics; |
|
7 |
import org.apache.hadoop.conf.Configuration; |
|
8 |
import org.apache.hadoop.fs.Path; |
|
9 |
import org.apache.hadoop.io.Text; |
|
10 |
import org.junit.Before; |
|
11 |
import org.junit.Ignore; |
|
12 |
import org.junit.Test; |
|
13 |
import org.springframework.core.io.ClassPathResource; |
|
14 |
|
|
15 |
import com.google.common.collect.Maps; |
|
16 |
|
|
17 |
import eu.dnetlib.data.hadoop.config.ConfigurationFactory; |
|
18 |
import eu.dnetlib.data.hadoop.hdfs.SequenceFileUtils; |
|
19 |
import eu.dnetlib.miscutils.collections.Pair; |
|
20 |
|
|
21 |
public class ReadSequenceFileTest { |
|
22 |
|
|
23 |
private static final Path SEQUENCE_FILE_PATH = new Path("hdfs://nmis-hadoop-cluster/tmp/indexrecords_db_openaireplus_sesam_SESAMALL.seq"); |
|
24 |
private final String HADOOP_CONF_FILE = "/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties"; |
|
25 |
|
|
26 |
private Configuration conf; |
|
27 |
|
|
28 |
@Before |
|
29 |
public void setUp() { |
|
30 |
final ConfigurationFactory confFactory = new ConfigurationFactory(); |
|
31 |
confFactory.setDefaults(new ClassPathResource(HADOOP_CONF_FILE)); |
|
32 |
conf = confFactory.getConfiguration(); |
|
33 |
} |
|
34 |
|
|
35 |
@Test |
|
36 |
@Ignore |
|
37 |
public void testReadSequenceFile() throws Exception { |
|
38 |
final SummaryStatistics statsAll = new SummaryStatistics(); |
|
39 |
|
|
40 |
final Map<String, SummaryStatistics> stats = Maps.newHashMap(); |
|
41 |
|
|
42 |
int i = 0; |
|
43 |
for (Pair<Text, Text> pair : SequenceFileUtils.read(SEQUENCE_FILE_PATH, conf)) { |
|
44 |
final String id = pair.getKey().toString(); |
|
45 |
final String record = pair.getValue().toString(); |
|
46 |
final int length = record.getBytes().length; |
|
47 |
|
|
48 |
final String type = id.substring(0, 2); |
|
49 |
if (!stats.containsKey(type)) { |
|
50 |
stats.put(type, new SummaryStatistics()); |
|
51 |
} |
|
52 |
statsAll.addValue(length); |
|
53 |
stats.get(type).addValue(length); |
|
54 |
|
|
55 |
if (++i % 10000 == 0) { |
|
56 |
System.out.println("Read " + i); |
|
57 |
} |
|
58 |
} |
|
59 |
|
|
60 |
printStats("ALL", statsAll); |
|
61 |
for (Entry<String, SummaryStatistics> e : stats.entrySet()) { |
|
62 |
printStats(e.getKey(), e.getValue()); |
|
63 |
} |
|
64 |
} |
|
65 |
|
|
66 |
private void printStats(final String type, final SummaryStatistics stats) { |
|
67 |
System.out.println("************************************"); |
|
68 |
System.out.println("Type: " + type); |
|
69 |
System.out.println(String.format("\tmin : %.2f KBytes", stats.getMin() / 1024)); |
|
70 |
System.out.println(String.format("\tmax : %.2f KBytes", stats.getMax() / 1024)); |
|
71 |
System.out.println(String.format("\tavg : %.2f KBytes", stats.getMean() / 1024)); |
|
72 |
System.out.println(String.format("\tstdDev : %.2f", stats.getStandardDeviation() / 1024)); |
|
73 |
} |
|
74 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTestContextConfiguration.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import org.springframework.context.annotation.Bean; |
|
4 |
import org.springframework.context.annotation.Configuration; |
|
5 |
import org.springframework.context.annotation.Profile; |
|
6 |
import org.springframework.core.io.ClassPathResource; |
|
7 |
import org.springframework.core.io.Resource; |
|
8 |
|
|
9 |
import eu.dnetlib.data.hadoop.HadoopClientMap; |
|
10 |
import eu.dnetlib.data.hadoop.HadoopServiceCore; |
|
11 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
12 |
import eu.dnetlib.data.hadoop.config.ConfigurationFactory; |
|
13 |
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory; |
|
14 |
import eu.dnetlib.data.mapreduce.JobClientFactory; |
|
15 |
|
|
16 |
@Configuration |
|
17 |
@Profile(value = "test") |
|
18 |
public class HBaseTestContextConfiguration { |
|
19 |
|
|
20 |
public static final String ENABLED_CLIENTS = "{" |
|
21 |
+ "\"DM\":{\"oozie\":\"false\",\"mapred\":\"false\",\"hbase\":\"true\"}," |
|
22 |
+ "\"IIS\":{\"oozie\":\"false\",\"mapred\":\"false\",\"hbase\":\"false\"}" |
|
23 |
+ "}"; |
|
24 |
|
|
25 |
public static final int MAX_VERSIONS = 10; |
|
26 |
|
|
27 |
@Bean |
|
28 |
public HadoopServiceCore hadoopServiceCore() { |
|
29 |
final HadoopServiceCore core = new HadoopServiceCore(); |
|
30 |
|
|
31 |
core.setMaxVersions(MAX_VERSIONS); |
|
32 |
|
|
33 |
System.out.println("using hbase max versions: " + MAX_VERSIONS); |
|
34 |
return core; |
|
35 |
} |
|
36 |
|
|
37 |
@Bean(initMethod = "init") |
|
38 |
public HadoopClientMap hadoopClientMap() throws InterruptedException { |
|
39 |
final HadoopClientMap clientMap = new HadoopClientMap(); |
|
40 |
clientMap.setEnabledClients(ENABLED_CLIENTS); |
|
41 |
clientMap.setClientsInitTime(10); |
|
42 |
|
|
43 |
return clientMap; |
|
44 |
} |
|
45 |
|
|
46 |
@Bean |
|
47 |
public HBaseAdminFactory hBaseAdminFactory() { |
|
48 |
return new HBaseAdminFactory(); |
|
49 |
} |
|
50 |
|
|
51 |
@Bean |
|
52 |
public OozieClientFactory oozieClientFactory() { |
|
53 |
return new OozieClientFactory(); |
|
54 |
} |
|
55 |
|
|
56 |
@Bean |
|
57 |
public JobClientFactory jobClientFactory() { |
|
58 |
return new JobClientFactory(); |
|
59 |
} |
|
60 |
|
|
61 |
@Bean |
|
62 |
public ConfigurationEnumerator configurationEnumerator() { |
|
63 |
return new ConfigurationEnumerator(); |
|
64 |
} |
|
65 |
|
|
66 |
@Bean |
|
67 |
public ConfigurationFactory DM() { |
|
68 |
return get(new ClassPathResource("/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties")); |
|
69 |
} |
|
70 |
|
|
71 |
@Bean |
|
72 |
public ConfigurationFactory IIS() { |
|
73 |
return get(new ClassPathResource("/eu/dnetlib/data/hadoop/config/hadoop-default.iis.icm.properties")); |
|
74 |
} |
|
75 |
|
|
76 |
protected ConfigurationFactory get(final Resource props) { |
|
77 |
final ConfigurationFactory configurationFactory = new ConfigurationFactory(); |
|
78 |
configurationFactory.setDefaults(props); |
|
79 |
return configurationFactory; |
|
80 |
} |
|
81 |
|
|
82 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTest.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import static org.junit.Assert.assertNotNull; |
|
4 |
import static org.junit.Assert.assertTrue; |
|
5 |
|
|
6 |
import java.io.IOException; |
|
7 |
import java.util.Map.Entry; |
|
8 |
import java.util.NavigableMap; |
|
9 |
import java.util.Set; |
|
10 |
|
|
11 |
import org.apache.hadoop.hbase.client.Get; |
|
12 |
import org.apache.hadoop.hbase.client.HTable; |
|
13 |
import org.apache.hadoop.hbase.client.Put; |
|
14 |
import org.apache.hadoop.hbase.client.Result; |
|
15 |
import org.apache.hadoop.hbase.util.Bytes; |
|
16 |
import org.junit.After; |
|
17 |
import org.junit.Before; |
|
18 |
import org.junit.Ignore; |
|
19 |
import org.junit.Test; |
|
20 |
import org.junit.runner.RunWith; |
|
21 |
import org.springframework.beans.factory.annotation.Autowired; |
|
22 |
import org.springframework.test.context.ActiveProfiles; |
|
23 |
import org.springframework.test.context.ContextConfiguration; |
|
24 |
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; |
|
25 |
|
|
26 |
import com.google.common.collect.Sets; |
|
27 |
|
|
28 |
import eu.dnetlib.data.hadoop.HadoopServiceCore; |
|
29 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
30 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
31 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
32 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
33 |
|
|
34 |
@ActiveProfiles("test") |
|
35 |
@RunWith(SpringJUnit4ClassRunner.class) |
|
36 |
@ContextConfiguration(classes = HBaseTestContextConfiguration.class) |
|
37 |
public class HBaseTest { |
|
38 |
|
|
39 |
protected static final String TEST_TABLE = "dnet_test_table"; |
|
40 |
|
|
41 |
protected static final int NUM_VERSIONS = 10; |
|
42 |
|
|
43 |
@Autowired |
|
44 |
private HadoopServiceCore hadoopServiceCore; |
|
45 |
|
|
46 |
@Autowired |
|
47 |
private ConfigurationEnumerator configurationEnumerator; |
|
48 |
|
|
49 |
@Before |
|
50 |
public void setUp() throws HadoopServiceException, IOException, InterruptedException { |
|
51 |
assertNotNull(hadoopServiceCore); |
|
52 |
|
|
53 |
System.out.println("waiting for clients to be ready... timeout? " + !hadoopServiceCore.getClients().waitClients()); |
|
54 |
|
|
55 |
ensureDropTable(); |
|
56 |
} |
|
57 |
|
|
58 |
@After |
|
59 |
public void tearDown() throws HadoopServiceException, IOException { |
|
60 |
ensureDropTable(); |
|
61 |
} |
|
62 |
|
|
63 |
@Test |
|
64 |
@Ignore |
|
65 |
// TODO allow testing on a dev cluster instance |
|
66 |
public void testReadWrite() throws HadoopServiceException, IOException, InterruptedException { |
|
67 |
|
|
68 |
hadoopServiceCore.createTable(ClusterName.DM, TEST_TABLE, testSchema()); |
|
69 |
assertTrue(hadoopServiceCore.existTable(ClusterName.DM, TEST_TABLE)); |
|
70 |
|
|
71 |
final HTable htable = new HTable(configurationEnumerator.get(ClusterName.DM), TEST_TABLE); |
|
72 |
|
|
73 |
final Put put = new Put(Bytes.toBytes("1")); |
|
74 |
|
|
75 |
for (int i = 0; i < NUM_VERSIONS; i++) { |
|
76 |
put.add(Bytes.toBytes("result"), Bytes.toBytes("body"), Bytes.toBytes(i + "")); |
|
77 |
htable.put(put); |
|
78 |
Thread.sleep(1000); |
|
79 |
} |
|
80 |
final Get get = new Get(Bytes.toBytes("1")); |
|
81 |
get.setMaxVersions(HBaseTestContextConfiguration.MAX_VERSIONS); |
|
82 |
|
|
83 |
final Result r = htable.get(get); |
|
84 |
|
|
85 |
// Map<family,Map<qualifier,Map<timestamp,value>>> |
|
86 |
final NavigableMap<Long, byte[]> versions = r.getMap().get(Bytes.toBytes("result")).get(Bytes.toBytes("body")); |
|
87 |
|
|
88 |
for (final Entry<Long, byte[]> e : versions.entrySet()) { |
|
89 |
System.out.println("t: " + DateUtils.calculate_ISO8601(e.getKey()) + ", v: " + Bytes.toString(e.getValue())); |
|
90 |
} |
|
91 |
|
|
92 |
htable.close(); |
|
93 |
|
|
94 |
} |
|
95 |
|
|
96 |
protected void ensureDropTable() throws HadoopServiceException, IOException { |
|
97 |
if (hadoopServiceCore.existTable(ClusterName.DM, TEST_TABLE)) { |
|
98 |
hadoopServiceCore.dropTable(ClusterName.DM, TEST_TABLE); |
|
99 |
} |
|
100 |
} |
|
101 |
|
|
102 |
protected Set<String> testSchema() { |
|
103 |
final Set<String> schema = Sets.newHashSet(); |
|
104 |
|
|
105 |
schema.add("result"); |
|
106 |
|
|
107 |
return schema; |
|
108 |
} |
|
109 |
|
|
110 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/test/resources/log4j.properties | ||
---|---|---|
1 |
### Root Level ### |
|
2 |
log4j.rootLogger=WARN, LOGFILE, CONSOLE |
|
3 |
|
|
4 |
### Configuration for the LOGFILE appender ### |
|
5 |
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender |
|
6 |
log4j.appender.LOGFILE.MaxFileSize=25MB |
|
7 |
log4j.appender.LOGFILE.MaxBackupIndex=10 |
|
8 |
log4j.appender.LOGFILE.File=logs/dnet.log |
|
9 |
log4j.appender.LOGFILE.Append=true |
|
10 |
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout |
|
11 |
log4j.appender.LOGFILE.layout.ConversionPattern=[%-5p] %d %c - %m%n |
|
12 |
|
|
13 |
### Configuration for the CONSOLE appender ### |
|
14 |
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender |
|
15 |
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout |
|
16 |
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c - %m%n |
|
17 |
|
|
18 |
org.apache.cxf.Logger=org.apache.cxf.common.logging.Log4jLogger |
|
19 |
|
|
20 |
### Application Level ### |
|
21 |
log4j.logger.eu.dnetlib=INFO |
|
22 |
log4j.logger.eu.dnetlib.enabling.is.sn=INFO |
|
23 |
log4j.logger.org.apache.cxf.interceptor=FATAL |
|
24 |
log4j.logger.org.apache.cxf.ws.addressing.ContextUtils=FATAL |
|
25 |
log4j.logger.eu.dnetlib.enabling.tools.AbstractBaseService=INFO |
|
26 |
log4j.logger.eu.dnetlib.enabling.inspector=DEBUG |
|
27 |
log4j.logger.eu.dnetlib.xml.database.LoggingTrigger=WARN |
|
28 |
log4j.logger.eu.dnetlib.enabling.tools.registration.ServiceRegistrator=INFO |
|
29 |
log4j.logger.eu.dnetlib.enabling.inspector=FATAL |
|
30 |
log4j.logger.eu.dnetlib.enabling.inspector.SubscriptionController=DEBUG |
|
31 |
log4j.logger.eu.dnetlib.springutils.stringtemplate.StringTemplateViewResolver=FATAL |
|
32 |
log4j.logger.eu.dnetlib.enabling.is.sn.SynchronousNotificationSenderImpl=WARN |
|
33 |
log4j.logger.eu.dnetlib.enabling.tools.LocalServiceResolverImpl=WARN |
|
34 |
log4j.logger.eu.dnetlib.enabling.is.sn.NotificationInvokerImpl=WARN |
|
35 |
log4j.logger.eu.dnetlib.data.collective=INFO |
|
36 |
log4j.logger.eu.dnetlib.data.hadoop.utils.ScanFactory=DEBUG |
|
37 |
log4j.logger.org.apache.xerces.parsers.SAXParser=OFF |
|
38 |
log4j.logger.eu.dnetlib.conf.PropertyFetcher=WARN |
|
39 |
#log4j.logger.eu.dnetlib.data.transform.XsltRowTransformerFactory=DEBUG |
|
40 |
|
|
41 |
log4j.logger.eu.dnetlib.enabling.is.sn.ISSNServiceImpl=OFF |
|
42 |
log4j.logger.eu.dnetlib.enabling.datasources.DatasourceManagerClients=FATAL |
|
43 |
log4j.logger.eu.dnetlib.data.mdstore.modular.mongodb.utils.MetadataCheckJob=DEBUG |
|
44 |
log4j.logger.eu.dnetlib.enabling.is.sn.ISSNServiceCore=WARN |
|
45 |
log4j.logger.eu.dnetlib.xml.database.exist.ExistDatabase=WARN |
|
46 |
log4j.logger.eu.dnetlib.enabling.is.store.AbstractContentInitializer=FATAL |
|
47 |
|
|
48 |
log4j.logger.org.apache.hadoop.hbase.mapreduce.TableInputFormatBase=FATAL |
|
49 |
|
|
50 |
### Spring ### |
|
51 |
log4j.logger.org.springframework=ERROR |
|
52 |
|
|
53 |
|
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/utils/ScanProperties.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.utils; |
|
2 |
|
|
3 |
import java.util.Set; |
|
4 |
|
|
5 |
import org.apache.hadoop.hbase.filter.FilterList; |
|
6 |
import org.apache.hadoop.hbase.filter.FilterList.Operator; |
|
7 |
|
|
8 |
import com.google.common.collect.Sets; |
|
9 |
|
|
10 |
public class ScanProperties { |
|
11 |
|
|
12 |
private static final Operator DEFAULT_OPERATOR = Operator.MUST_PASS_ALL; |
|
13 |
|
|
14 |
private int caching = 100; |
|
15 |
private FilterList filterList; |
|
16 |
private Set<String> families = Sets.newHashSet(); |
|
17 |
|
|
18 |
public ScanProperties(final String op) { |
|
19 |
Operator operator = DEFAULT_OPERATOR; |
|
20 |
if ((op != null) && !op.isEmpty()) { |
|
21 |
operator = Operator.valueOf(op); |
|
22 |
} |
|
23 |
filterList = new FilterList(operator); |
|
24 |
} |
|
25 |
|
|
26 |
public FilterList getFilterList() { |
|
27 |
return filterList; |
|
28 |
} |
|
29 |
|
|
30 |
public void setFilterList(final FilterList filterList) { |
|
31 |
this.filterList = filterList; |
|
32 |
} |
|
33 |
|
|
34 |
public Set<String> getFamilies() { |
|
35 |
return families; |
|
36 |
} |
|
37 |
|
|
38 |
public void setFamilies(final Set<String> families) { |
|
39 |
this.families = families; |
|
40 |
} |
|
41 |
|
|
42 |
public int getCaching() { |
|
43 |
return caching; |
|
44 |
} |
|
45 |
|
|
46 |
public void setCaching(int caching) { |
|
47 |
this.caching = caching; |
|
48 |
} |
|
49 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/utils/HadoopUtils.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.utils; |
|
2 |
|
|
3 |
import eu.dnetlib.data.hadoop.HadoopJob; |
|
4 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor; |
|
5 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
|
|
9 |
public class HadoopUtils { |
|
10 |
|
|
11 |
private static final Log log = LogFactory.getLog(HadoopUtils.class); |
|
12 |
|
|
13 |
public static java.util.function.Function<HadoopJob, HadoopJobDescriptor> asDescriptor() { |
|
14 |
return d -> { |
|
15 |
try { |
|
16 |
return d.asDescriptor(); |
|
17 |
} catch (HadoopServiceException e) { |
|
18 |
log.warn(e); |
|
19 |
return null; |
|
20 |
} |
|
21 |
}; |
|
22 |
} |
|
23 |
|
|
24 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/utils/JobProfile.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.utils; |
|
2 |
|
|
3 |
import java.util.Map; |
|
4 |
import java.util.Set; |
|
5 |
|
|
6 |
import com.google.common.collect.Maps; |
|
7 |
import com.google.common.collect.Sets; |
|
8 |
|
|
9 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobType; |
|
10 |
|
|
11 |
public class JobProfile { |
|
12 |
|
|
13 |
private final Map<String, String> jobDefinition = Maps.newHashMap(); |
|
14 |
private final Set<String> requiredParams = Sets.newHashSet(); |
|
15 |
private ScanProperties scanProperties; |
|
16 |
|
|
17 |
private String name; |
|
18 |
|
|
19 |
private String description = ""; |
|
20 |
|
|
21 |
private HadoopJobType jobType; |
|
22 |
|
|
23 |
public boolean isEmpty() { |
|
24 |
return getJobDefinition().isEmpty(); |
|
25 |
} |
|
26 |
|
|
27 |
public Map<String, String> getJobDefinition() { |
|
28 |
return jobDefinition; |
|
29 |
} |
|
30 |
|
|
31 |
public Set<String> getRequiredParams() { |
|
32 |
return requiredParams; |
|
33 |
} |
|
34 |
|
|
35 |
public String getName() { |
|
36 |
return name; |
|
37 |
} |
|
38 |
|
|
39 |
public void setName(String name) { |
|
40 |
this.name = name; |
|
41 |
} |
|
42 |
|
|
43 |
public String getDescription() { |
|
44 |
return description; |
|
45 |
} |
|
46 |
|
|
47 |
public void setDescription(String description) { |
|
48 |
this.description = description; |
|
49 |
} |
|
50 |
|
|
51 |
public ScanProperties getScanProperties() { |
|
52 |
return scanProperties; |
|
53 |
} |
|
54 |
|
|
55 |
public void setScanProperties(ScanProperties scanProperties) { |
|
56 |
this.scanProperties = scanProperties; |
|
57 |
} |
|
58 |
|
|
59 |
public HadoopJobType getJobType() { |
|
60 |
return jobType; |
|
61 |
} |
|
62 |
|
|
63 |
public void setJobType(HadoopJobType jobType) { |
|
64 |
this.jobType = jobType; |
|
65 |
} |
|
66 |
|
|
67 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/utils/ScanFactory.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.utils; |
|
2 |
|
|
3 |
import java.io.ByteArrayOutputStream; |
|
4 |
import java.io.DataOutputStream; |
|
5 |
import java.io.IOException; |
|
6 |
import java.util.Map; |
|
7 |
|
|
8 |
import org.apache.commons.lang.StringUtils; |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.apache.hadoop.hbase.client.Scan; |
|
12 |
import org.apache.hadoop.hbase.filter.PrefixFilter; |
|
13 |
import org.apache.hadoop.hbase.util.Base64; |
|
14 |
import org.dom4j.Document; |
|
15 |
import org.dom4j.Node; |
|
16 |
|
|
17 |
public class ScanFactory { |
|
18 |
|
|
19 |
private static final Log log = LogFactory.getLog(ScanFactory.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
20 |
|
|
21 |
public static String getScan(final ScanProperties scanProperties) throws IOException { |
|
22 |
Scan scan = new Scan(); |
|
23 |
|
|
24 |
scan.setCaching(scanProperties.getCaching()); |
|
25 |
scan.setCacheBlocks(false); // don't set to true for MR jobs |
|
26 |
|
|
27 |
scan.setFilter(scanProperties.getFilterList()); |
|
28 |
for (String family : scanProperties.getFamilies()) { |
|
29 |
scan.addFamily(family.getBytes()); |
|
30 |
} |
|
31 |
|
|
32 |
log.debug("serializing scan"); |
|
33 |
return convertScanToString(scan); |
|
34 |
} |
|
35 |
|
|
36 |
public static ScanProperties parseScanProperties(final Document doc, final Map<String, String> bbParams) { |
|
37 |
log.debug("setting job scanner"); |
|
38 |
|
|
39 |
ScanProperties scanProperties = new ScanProperties(doc.valueOf("//FILTERS/@operator")); |
|
40 |
|
|
41 |
String caching = doc.valueOf("//SCAN/@caching"); |
|
42 |
if (!StringUtils.isBlank(caching)) { |
|
43 |
log.info("overriding default scan caching with: " + caching); |
|
44 |
scanProperties.setCaching(Integer.valueOf(caching)); |
|
45 |
} |
|
46 |
|
|
47 |
for (Object o : doc.selectNodes("//SCAN/FAMILIES/FAMILY")) { |
|
48 |
Node node = (Node) o; |
|
49 |
String value = node.valueOf("./@value"); |
|
50 |
if ((value == null) || value.isEmpty()) { |
|
51 |
value = bbParams.get(node.valueOf("./@param")); |
|
52 |
} |
|
53 |
log.debug("scanner family value: " + value); |
|
54 |
scanProperties.getFamilies().add(value); |
|
55 |
} |
|
56 |
for (Object o : doc.selectNodes("//SCAN/FILTERS/FILTER")) { |
|
57 |
Node node = (Node) o; |
|
58 |
String filterType = node.valueOf("./@type"); |
|
59 |
|
|
60 |
String value = node.valueOf("./@value"); |
|
61 |
if ((value == null) || value.isEmpty()) { |
|
62 |
value = bbParams.get(node.valueOf("./@param")); |
|
63 |
} |
|
64 |
|
|
65 |
if (filterType.equals("prefix")) { |
|
66 |
log.debug("scanner prefix filter, value: " + value); |
|
67 |
scanProperties.getFilterList().addFilter(new PrefixFilter(value.getBytes())); |
|
68 |
} // TODO add more filterType cases here |
|
69 |
} |
|
70 |
return scanProperties; |
|
71 |
} |
|
72 |
|
|
73 |
/** |
|
74 |
* Writes the given scan into a Base64 encoded string. |
|
75 |
* |
|
76 |
* @param scan |
|
77 |
* The scan to write out. |
|
78 |
* @return The scan saved in a Base64 encoded string. |
|
79 |
* @throws IOException |
|
80 |
* When writing the scan fails. |
|
81 |
*/ |
|
82 |
private static String convertScanToString(final Scan scan) throws IOException { |
|
83 |
ByteArrayOutputStream out = new ByteArrayOutputStream(); |
|
84 |
DataOutputStream dos = new DataOutputStream(out); |
|
85 |
scan.write(dos); |
|
86 |
return Base64.encodeBytes(out.toByteArray()); |
|
87 |
} |
|
88 |
|
|
89 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseDeleteFeeder.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.apache.hadoop.hbase.client.Delete; |
|
8 |
import org.apache.hadoop.hbase.client.Mutation; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
|
|
11 |
import eu.dnetlib.data.transform.Column; |
|
12 |
import eu.dnetlib.data.transform.Row; |
|
13 |
|
|
14 |
/** |
|
15 |
* The Class HBaseDeleteFeeder performs a batch of Delete operations. |
|
16 |
*/ |
|
17 |
public class HBaseDeleteFeeder extends HbaseTableFeeder { |
|
18 |
|
|
19 |
/** |
|
20 |
* Logger. |
|
21 |
*/ |
|
22 |
private static final Log log = LogFactory.getLog(HBaseDeleteFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
/* |
|
25 |
* (non-Javadoc) |
|
26 |
* |
|
27 |
* @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row) |
|
28 |
*/ |
|
29 |
@Override |
|
30 |
protected void addOperation(final List<Mutation> buffer, final Row row) { |
|
31 |
final Delete delete = new Delete(Bytes.toBytes(row.getKey())); |
|
32 |
delete.setWriteToWAL(true); |
|
33 |
|
|
34 |
for (final Column<String, byte[]> col : row) { |
|
35 |
log.debug(String.format("deleting K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName())); |
|
36 |
delete.deleteColumns(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName())); |
|
37 |
} |
|
38 |
|
|
39 |
buffer.add(delete); |
|
40 |
} |
|
41 |
|
|
42 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseAdminFactory.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import org.apache.commons.logging.Log; |
|
4 |
import org.apache.commons.logging.LogFactory; |
|
5 |
import org.apache.hadoop.hbase.client.HBaseAdmin; |
|
6 |
import org.springframework.beans.factory.annotation.Autowired; |
|
7 |
|
|
8 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
9 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
10 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
11 |
|
|
12 |
public class HBaseAdminFactory { |
|
13 |
|
|
14 |
private static final Log log = LogFactory.getLog(HBaseAdminFactory.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
15 |
|
|
16 |
@Autowired |
|
17 |
private ConfigurationEnumerator configurationEnumerator; |
|
18 |
|
|
19 |
public HBaseAdmin newInstance(final ClusterName clusterName) throws HadoopServiceException { |
|
20 |
try { |
|
21 |
log.info("init hbaseAdmin, cluster: " + clusterName.toString()); |
|
22 |
return new HBaseAdmin(configurationEnumerator.get(clusterName)); |
|
23 |
} catch (final Throwable e) { |
|
24 |
log.warn("unable to initialize hbase client for cluster: " + clusterName.toString(), e); |
|
25 |
return null; |
|
26 |
} |
|
27 |
} |
|
28 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/hbase/HbaseTableFeeder.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Arrays; |
|
5 |
import java.util.List; |
|
6 |
|
|
7 |
import com.google.common.base.Predicates; |
|
8 |
import com.google.common.collect.Iterables; |
|
9 |
import com.google.common.collect.Lists; |
|
10 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
11 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
12 |
import eu.dnetlib.data.transform.Row; |
|
13 |
import eu.dnetlib.data.transform.XsltRowTransformerFactory; |
|
14 |
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory; |
|
15 |
import org.apache.commons.logging.Log; |
|
16 |
import org.apache.commons.logging.LogFactory; |
|
17 |
import org.apache.hadoop.conf.Configuration; |
|
18 |
import org.apache.hadoop.hbase.client.HTable; |
|
19 |
import org.apache.hadoop.hbase.client.Mutation; |
|
20 |
import org.springframework.beans.factory.annotation.Autowired; |
|
21 |
import org.springframework.beans.factory.annotation.Required; |
|
22 |
|
|
23 |
/** |
|
24 |
* The Class HbaseTableFeeder provides abstraction to ship batch operation on an HBase table |
|
25 |
*/ |
|
26 |
public abstract class HbaseTableFeeder { |
|
27 |
|
|
28 |
/** |
|
29 |
* The Constant log. |
|
30 |
*/ |
|
31 |
private static final Log log = LogFactory.getLog(HbaseTableFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
32 |
/** |
|
33 |
* The configuration enumerator. |
|
34 |
*/ |
|
35 |
@Autowired |
|
36 |
protected ConfigurationEnumerator configurationEnumerator; |
|
37 |
/** |
|
38 |
* The batch size. |
|
39 |
*/ |
|
40 |
private int batchSize = 100; |
|
41 |
/** |
|
42 |
* The result set client factory. |
|
43 |
*/ |
|
44 |
private ResultSetClientFactory resultSetClientFactory; |
|
45 |
|
|
46 |
/** |
|
47 |
* Adds the operation. |
|
48 |
* |
|
49 |
* @param buffer the buffer |
|
50 |
* @param row the row |
|
51 |
*/ |
|
52 |
protected abstract void addOperation(final List<Mutation> buffer, final Row row); |
|
53 |
|
|
54 |
/** |
|
55 |
* Feed. |
|
56 |
* |
|
57 |
* @param epr the epr |
|
58 |
* @param xsl the xsl |
|
59 |
* @param clusterName the cluster name |
|
60 |
* @param tableName the table name |
|
61 |
* @param simulation the simulation |
|
62 |
* @return the int |
|
63 |
* @throws IOException Signals that an I/O exception has occurred. |
|
64 |
* @throws InterruptedException the interrupted exception |
|
65 |
*/ |
|
66 |
public int feed(final String epr, final String xsl, final ClusterName clusterName, final String tableName, final boolean simulation) |
|
67 |
throws IOException, InterruptedException { |
|
68 |
return doWrite(asRows(epr, xsl), getConf(clusterName), tableName, simulation); |
|
69 |
} |
|
70 |
|
|
71 |
/** |
|
72 |
* Do writeOnHBase. |
|
73 |
* |
|
74 |
* @param rows the rows |
|
75 |
* @param configuration the configuration |
|
76 |
* @param tableName the table name |
|
77 |
* @param simulation the simulation |
|
78 |
* @return the int |
|
79 |
* @throws IOException Signals that an I/O exception has occurred. |
|
80 |
* @throws InterruptedException the interrupted exception |
|
81 |
*/ |
|
82 |
private int doWrite(final Iterable<Row> rows, final Configuration configuration, final String tableName, final boolean simulation) |
|
83 |
throws IOException, InterruptedException { |
|
84 |
final List<Mutation> buffer = Lists.newArrayList(); |
|
85 |
|
|
86 |
int count = 0; |
|
87 |
if (simulation) { |
|
88 |
log.info("running in simulation mode ..."); |
|
89 |
log.info(String.format("... simulated import of %d records", Iterables.size(rows))); |
|
90 |
} else { |
|
91 |
|
|
92 |
final HTable htable = new HTable(configuration, tableName); |
|
93 |
try { |
|
94 |
int i = 0; |
|
95 |
for (final Row row : rows) { |
|
96 |
addOperation(buffer, row); |
|
97 |
if ((++i % getBatchSize()) == 0) { |
|
98 |
flush(tableName, buffer, htable); |
|
99 |
count += buffer.size(); |
|
100 |
buffer.clear(); |
|
101 |
} |
|
102 |
} |
|
103 |
} finally { |
|
104 |
if (!buffer.isEmpty()) { |
|
105 |
flush(tableName, buffer, htable); |
|
106 |
count += buffer.size(); |
|
107 |
} |
|
108 |
htable.flushCommits(); |
|
109 |
htable.close(); |
|
110 |
} |
|
111 |
} |
|
112 |
return count; |
|
113 |
} |
|
114 |
|
|
115 |
private void flush(final String tableName, final List<Mutation> buffer, final HTable htable) throws IOException, InterruptedException { |
|
116 |
if (!checkOp(htable.batch(buffer), tableName)) throw new IOException("unable to flush operation on HBase table: " + tableName); |
|
117 |
} |
|
118 |
|
|
119 |
private boolean checkOp(final Object[] res, final String tableName) throws IOException { |
|
120 |
return Iterables.all(Arrays.asList(res), Predicates.notNull()); |
|
121 |
} |
|
122 |
|
|
123 |
/** |
|
124 |
* As rows. |
|
125 |
* |
|
126 |
* @param epr the epr |
|
127 |
* @param xsl the xsl |
|
128 |
* @return the iterable |
|
129 |
*/ |
|
130 |
protected Iterable<Row> asRows(final String epr, final String xsl) { |
|
131 |
return Iterables.concat(Iterables.transform(getResultSetClientFactory().getClient(epr), XsltRowTransformerFactory.newInstance(xsl))); |
|
132 |
} |
|
133 |
|
|
134 |
/** |
|
135 |
* Gets the conf. |
|
136 |
* |
|
137 |
* @param clusterName the cluster name |
|
138 |
* @return the conf |
|
139 |
*/ |
|
140 |
protected Configuration getConf(final ClusterName clusterName) { |
|
141 |
return configurationEnumerator.get(clusterName); |
|
142 |
} |
|
143 |
|
|
144 |
/** |
|
145 |
* Gets the batch size. |
|
146 |
* |
|
147 |
* @return the batch size |
|
148 |
*/ |
|
149 |
public int getBatchSize() { |
|
150 |
return batchSize; |
|
151 |
} |
|
152 |
|
|
153 |
/** |
|
154 |
* Sets the batch size. |
|
155 |
* |
|
156 |
* @param batchSize the new batch size |
|
157 |
*/ |
|
158 |
public void setBatchSize(final int batchSize) { |
|
159 |
this.batchSize = batchSize; |
|
160 |
} |
|
161 |
|
|
162 |
/** |
|
163 |
* Gets the result set client factory. |
|
164 |
* |
|
165 |
* @return the result set client factory |
|
166 |
*/ |
|
167 |
public ResultSetClientFactory getResultSetClientFactory() { |
|
168 |
return resultSetClientFactory; |
|
169 |
} |
|
170 |
|
|
171 |
/** |
|
172 |
* Sets the result set client factory. |
|
173 |
* |
|
174 |
* @param resultSetClientFactory the new result set client factory |
|
175 |
*/ |
|
176 |
@Required |
|
177 |
public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) { |
|
178 |
this.resultSetClientFactory = resultSetClientFactory; |
|
179 |
} |
|
180 |
|
|
181 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/hbase/HBasePutFeeder.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.apache.hadoop.hbase.client.Mutation; |
|
8 |
import org.apache.hadoop.hbase.client.Put; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
|
|
11 |
import eu.dnetlib.data.transform.Column; |
|
12 |
import eu.dnetlib.data.transform.Row; |
|
13 |
|
|
14 |
/** |
|
15 |
* The Class HBasePutFeeder performs a batch of Put operations.. |
|
16 |
*/ |
|
17 |
public class HBasePutFeeder extends HbaseTableFeeder { |
|
18 |
|
|
19 |
/** |
|
20 |
* Logger. |
|
21 |
*/ |
|
22 |
private static final Log log = LogFactory.getLog(HBasePutFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
/* |
|
25 |
* (non-Javadoc) |
|
26 |
* |
|
27 |
* @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row) |
|
28 |
*/ |
|
29 |
@Override |
|
30 |
protected void addOperation(final List<Mutation> buffer, final Row row) { |
|
31 |
final Put put = new Put(Bytes.toBytes(row.getKey())); |
|
32 |
put.setWriteToWAL(true); |
|
33 |
|
|
34 |
for (final Column<String, byte[]> col : row) { |
|
35 |
log.debug(String.format("adding value to K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName())); |
|
36 |
put.add(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName()), col.getValue()); |
|
37 |
} |
|
38 |
|
|
39 |
buffer.add(put); |
|
40 |
} |
|
41 |
|
|
42 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/JobRegistry.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import java.util.Date; |
|
4 |
import java.util.List; |
|
5 |
import java.util.Map.Entry; |
|
6 |
import java.util.Objects; |
|
7 |
import java.util.stream.Collectors; |
|
8 |
|
|
9 |
import com.google.common.collect.BiMap; |
|
10 |
import com.google.common.collect.HashBiMap; |
|
11 |
import eu.dnetlib.data.hadoop.HadoopJob.Status; |
|
12 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
13 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor; |
|
14 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
15 |
import eu.dnetlib.data.hadoop.utils.HadoopUtils; |
|
16 |
import org.apache.commons.logging.Log; |
|
17 |
import org.apache.commons.logging.LogFactory; |
|
18 |
import org.springframework.beans.factory.annotation.Required; |
|
19 |
|
|
20 |
public class JobRegistry { |
|
21 |
|
|
22 |
private static final Log log = LogFactory.getLog(JobRegistry.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
private int maxJobs; |
|
25 |
|
|
26 |
private final BiMap<String, HadoopJob> jobs = HashBiMap.create(); |
|
27 |
|
|
28 |
public String registerJob(HadoopJob hadoopJob) throws HadoopServiceException { |
|
29 |
|
|
30 |
if (jobs.containsValue(hadoopJob)) { return jobs.inverse().get(hadoopJob); } |
|
31 |
|
|
32 |
if (jobs.size() > getMaxJobs()) { |
|
33 |
removeOldestProcess(); |
|
34 |
} |
|
35 |
|
|
36 |
jobs.put(hadoopJob.getId(), hadoopJob); |
|
37 |
log.info("Registered hadoop job " + hadoopJob.getId()); |
|
38 |
hadoopJob.startMonitor(); |
|
39 |
|
|
40 |
return hadoopJob.getId(); |
|
41 |
} |
|
42 |
|
|
43 |
public Status getJobStatus(String id) { |
|
44 |
return findJob(id).getStatus(); |
|
45 |
} |
|
46 |
|
|
47 |
public HadoopJob findJob(String id) { |
|
48 |
return jobs.get(id); |
|
49 |
} |
|
50 |
|
|
51 |
public void unregisterJob(String id) throws HadoopServiceException { |
|
52 |
|
|
53 |
if (!jobs.containsKey(id)) { throw new HadoopServiceException("unable to unregister job, could not find jobId in registry: " + id); } |
|
54 |
|
|
55 |
log.info("unregistering job: " + id); |
|
56 |
jobs.get(id).getJobMonitor().kill(); |
|
57 |
jobs.remove(id); |
|
58 |
} |
|
59 |
|
|
60 |
private void removeOldestProcess() throws HadoopServiceException { |
|
61 |
Date oldDate = new Date(); |
|
62 |
String oldId = null; |
|
63 |
|
|
64 |
for (Entry<String, HadoopJob> e : jobs.entrySet()) { |
|
65 |
final HadoopJob hadoopJob = e.getValue(); |
|
66 |
|
|
67 |
if (hadoopJob.isComplete()) { |
|
68 |
final Date date = hadoopJob.getLastActivity(); |
|
69 |
if (date.before(oldDate)) { |
|
70 |
oldDate = date; |
|
71 |
oldId = e.getKey(); |
|
72 |
} |
|
73 |
} |
|
74 |
} |
|
75 |
|
|
76 |
if (oldId != null) { |
|
77 |
unregisterJob(oldId); |
|
78 |
} |
|
79 |
|
|
80 |
} |
|
81 |
|
|
82 |
public List<HadoopJobDescriptor> listJobs(ClusterName clusterName) { |
|
83 |
return jobs.values().stream() |
|
84 |
.filter(j -> clusterName.equals(j.getClusterName())) |
|
85 |
.map(HadoopUtils.asDescriptor()) |
|
86 |
.filter(Objects::nonNull) |
|
87 |
.collect(Collectors.toList()); |
|
88 |
} |
|
89 |
|
|
90 |
@Required |
|
91 |
public void setMaxJobs(final int maxJobs) { |
|
92 |
this.maxJobs = maxJobs; |
|
93 |
} |
|
94 |
|
|
95 |
public int getMaxJobs() { |
|
96 |
return maxJobs; |
|
97 |
} |
|
98 |
|
|
99 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.net.URI; |
|
5 |
import java.util.*; |
|
6 |
import java.util.Map.Entry; |
|
7 |
import java.util.stream.Collectors; |
|
8 |
|
|
9 |
import com.google.common.collect.Lists; |
|
10 |
import com.google.common.collect.Maps; |
|
11 |
import com.google.common.collect.Sets; |
|
12 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
13 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
14 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
15 |
import eu.dnetlib.data.hadoop.rmi.hbase.Column; |
|
16 |
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor; |
|
17 |
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableDescriptor; |
|
18 |
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableRegionInfo; |
|
19 |
import org.apache.commons.lang.StringUtils; |
|
20 |
import org.apache.commons.logging.Log; |
|
21 |
import org.apache.commons.logging.LogFactory; |
|
22 |
import org.apache.hadoop.conf.Configuration; |
|
23 |
import org.apache.hadoop.fs.FileSystem; |
|
24 |
import org.apache.hadoop.fs.Path; |
|
25 |
import org.apache.hadoop.hbase.HColumnDescriptor; |
|
26 |
import org.apache.hadoop.hbase.HRegionInfo; |
|
27 |
import org.apache.hadoop.hbase.HTableDescriptor; |
|
28 |
import org.apache.hadoop.hbase.client.*; |
|
29 |
import org.apache.hadoop.hbase.util.Bytes; |
|
30 |
import org.springframework.beans.factory.annotation.Autowired; |
|
31 |
import org.springframework.beans.factory.annotation.Required; |
|
32 |
|
|
33 |
public class HadoopServiceCore { |
|
34 |
|
|
35 |
private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
36 |
@Autowired |
|
37 |
protected ConfigurationEnumerator configurationEnumerator; |
|
38 |
@Autowired |
|
39 |
private HadoopClientMap clients; |
|
40 |
private int maxVersions; |
|
41 |
|
|
42 |
public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException { |
|
43 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
44 |
return Arrays.asList(admin.listTables()) |
|
45 |
.stream() |
|
46 |
.map(HTableDescriptor::getNameAsString) |
|
47 |
.collect(Collectors.toList()); |
|
48 |
} |
|
49 |
|
|
50 |
private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException { |
|
51 |
final HBaseAdmin admin = clients.getHbaseAdmin(clusterName); |
|
52 |
|
|
53 |
if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString())); |
|
54 |
|
|
55 |
return admin; |
|
56 |
} |
|
57 |
|
|
58 |
public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException { |
|
59 |
final HBaseAdmin admin = clients.getHbaseAdmin(clusterName); |
|
60 |
|
|
61 |
if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null"); |
|
62 |
|
|
63 |
if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString())); |
|
64 |
|
|
65 |
final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes()); |
|
66 |
|
|
67 |
final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes()); |
|
68 |
|
|
69 |
final Set<String> columns = Sets.newHashSet(); |
|
70 |
|
|
71 |
for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) { |
|
72 |
columns.add(hColDesc.getNameAsString()); |
|
73 |
} |
|
74 |
|
|
75 |
HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor(); |
|
76 |
htDescriptor.setColumns(columns); |
|
77 |
|
|
78 |
List<HBaseTableRegionInfo> regions = Lists.newArrayList(); |
|
79 |
|
|
80 |
for (HRegionInfo info : tableRegions) { |
|
81 |
regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey()))); |
|
82 |
} |
|
83 |
htDescriptor.setRegions(regions); |
|
84 |
|
|
85 |
if (log.isDebugEnabled()) { |
|
86 |
log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString()); |
|
87 |
} |
|
88 |
|
|
89 |
return htDescriptor.toString(); |
|
90 |
} |
|
91 |
|
|
92 |
public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
93 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
94 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
95 |
return desc.getFamilies().stream() |
|
96 |
.map(d -> d.getNameAsString()) |
|
97 |
.collect(Collectors.toList()); |
|
98 |
} |
|
99 |
|
|
100 |
public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
101 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
102 |
|
|
103 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table"); |
|
104 |
|
|
105 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
106 |
|
|
107 |
log.info("disabling table: " + table); |
|
108 |
admin.disableTable(table); |
|
109 |
|
|
110 |
log.info("deleting table: " + table); |
|
111 |
admin.deleteTable(table); |
|
112 |
|
|
113 |
log.info("creating table: " + table); |
|
114 |
admin.createTable(desc); |
|
115 |
} |
|
116 |
|
|
117 |
public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
118 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
119 |
|
|
120 |
return admin.tableExists(table); |
|
121 |
} |
|
122 |
|
|
123 |
public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
124 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
125 |
|
|
126 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'"); |
|
127 |
|
|
128 |
log.info("disabling table: " + table); |
|
129 |
admin.disableTable(table); |
|
130 |
|
|
131 |
log.info("deleting table: " + table); |
|
132 |
admin.deleteTable(table); |
|
133 |
} |
|
134 |
|
|
135 |
public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException { |
|
136 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
137 |
|
|
138 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
139 |
|
|
140 |
if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration"); |
|
141 |
|
|
142 |
final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration); |
|
143 |
|
|
144 |
doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions()); |
|
145 |
} |
|
146 |
|
|
147 |
public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
|
148 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
149 |
|
|
150 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
151 |
|
|
152 |
doCreateTable(clusterName, table, columns, null); |
|
153 |
} |
|
154 |
|
|
155 |
public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions) |
|
156 |
throws IOException, HadoopServiceException { |
|
157 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
158 |
|
|
159 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
160 |
|
|
161 |
final HTableDescriptor desc = new HTableDescriptor(table); |
|
162 |
for (final String column : columns) { |
|
163 |
final HColumnDescriptor hds = new HColumnDescriptor(column); |
|
164 |
hds.setMaxVersions(getMaxVersions()); |
|
165 |
desc.addFamily(hds); |
|
166 |
} |
|
167 |
|
|
168 |
log.info("creating hbase table: " + table); |
|
169 |
|
|
170 |
if (regions != null && !regions.isEmpty()) { |
|
171 |
log.debug(String.format("create using %s regions: %s", regions.size(), regions)); |
|
172 |
admin.createTable(desc, getSplitKeys(regions)); |
|
173 |
} else { |
|
174 |
admin.createTable(desc); |
|
175 |
} |
|
176 |
|
|
177 |
log.info("created hbase table: [" + table + "]"); |
|
178 |
log.debug("descriptor: [" + desc.toString() + "]"); |
|
179 |
} |
|
180 |
|
|
181 |
private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) { |
|
182 |
byte[][] splits = new byte[regions.size() - 1][]; |
|
183 |
for (int i = 0; i < regions.size() - 1; i++) { |
|
184 |
splits[i] = regions.get(i).getEndKey().getBytes(); |
|
185 |
} |
|
186 |
return splits; |
|
187 |
} |
|
188 |
|
|
189 |
public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
|
190 |
|
|
191 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
192 |
|
|
193 |
if (!admin.tableExists(table)) { |
|
194 |
createTable(clusterName, table, columns); |
|
195 |
} else { |
|
196 |
final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table)); |
|
197 |
|
|
198 |
final Set<String> foundColumns = desc.getFamilies().stream() |
|
199 |
.map(d -> d.getNameAsString()) |
|
200 |
.collect(Collectors.toCollection(HashSet::new)); |
|
201 |
|
|
202 |
log.info("ensuring columns on table " + table + ": " + columns); |
|
203 |
final Collection<String> missingColumns = Sets.difference(columns, foundColumns); |
|
204 |
if (!missingColumns.isEmpty()) { |
|
205 |
|
|
206 |
if (admin.isTableEnabled(table)) { |
|
207 |
admin.disableTable(table); |
|
208 |
} |
|
209 |
|
|
210 |
for (final String column : missingColumns) { |
|
211 |
log.info("hbase table: '" + table + "', adding column: " + column); |
|
212 |
admin.addColumn(table, new HColumnDescriptor(column)); |
|
213 |
} |
|
214 |
|
|
215 |
admin.enableTable(table); |
|
216 |
} |
|
217 |
} |
|
218 |
} |
|
219 |
|
|
220 |
public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException { |
|
221 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
222 |
final HTable table = new HTable(conf, tableName); |
|
223 |
|
|
224 |
try { |
|
225 |
table.put(puts); |
|
226 |
} finally { |
|
227 |
table.flushCommits(); |
|
228 |
table.close(); |
|
229 |
} |
|
230 |
} |
|
231 |
|
|
232 |
public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException { |
|
233 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
234 |
final HTable table = new HTable(conf, tableName); |
|
235 |
try { |
|
236 |
table.delete(deletes); |
|
237 |
} finally { |
|
238 |
table.flushCommits(); |
|
239 |
table.close(); |
|
240 |
} |
|
241 |
} |
|
242 |
|
|
243 |
public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException { |
|
244 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
245 |
final HTable table = new HTable(conf, tableName); |
|
246 |
try { |
|
247 |
for(HBaseRowDescriptor desc : columns) { |
|
248 |
|
|
249 |
final Delete d = new Delete(Bytes.toBytes(desc.getRowKey())); |
|
250 |
d.setWriteToWAL(true); |
|
251 |
for(Column c : desc.getColumns()) { |
|
252 |
for(String qualifier : c.getQualifier()) { |
|
253 |
log.info(String.format("delete from row '%s' cf '%s:%s'", desc.getRowKey(), c.getFamily(), qualifier)); |
|
254 |
d.deleteColumns(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier)); |
|
255 |
} |
|
256 |
} |
|
257 |
table.delete(d); |
|
258 |
} |
|
259 |
} finally { |
|
260 |
table.flushCommits(); |
|
261 |
table.close(); |
|
262 |
} |
|
263 |
} |
|
264 |
|
|
265 |
public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException { |
|
266 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
267 |
final HTable table = new HTable(conf, tableName); |
|
268 |
try { |
|
269 |
return table.get(new Get(id)); |
|
270 |
} finally { |
|
271 |
table.close(); |
|
272 |
} |
|
273 |
} |
|
274 |
|
|
275 |
public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException { |
|
276 |
final Map<String, HBaseRowDescriptor> map = Maps.newHashMap(); |
|
277 |
for(String rowKey : rowKeys) { |
|
278 |
map.put(rowKey, describeRow(clusterName, tableName, rowKey)); |
|
279 |
} |
|
280 |
return map; |
|
281 |
} |
|
282 |
|
|
283 |
public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException { |
|
284 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
285 |
final HTable table = new HTable(conf, tableName); |
|
286 |
|
|
287 |
final HBaseRowDescriptor desc = new HBaseRowDescriptor(); |
|
288 |
|
|
289 |
try { |
|
290 |
final Result r = table.get(new Get(Bytes.toBytes(rowKey))); |
|
291 |
|
|
292 |
if (r.isEmpty()) { |
|
293 |
return desc; |
|
294 |
} |
|
295 |
|
|
296 |
final List<Column> columns = Lists.newArrayList(); |
|
297 |
|
|
298 |
for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) { |
|
299 |
final Set<byte[]> qualifiers = e.getValue().keySet(); |
|
300 |
final String family = new String(e.getKey()); |
|
301 |
final Column col = new Column(family); |
|
302 |
|
|
303 |
for(byte[] q : qualifiers) { |
|
304 |
String qs = new String(q); |
|
305 |
col.getQualifier().add(qs); |
|
306 |
} |
|
307 |
columns.add(col); |
|
308 |
} |
|
309 |
desc.setColumns(columns); |
|
310 |
desc.setRowKey(rowKey); |
|
311 |
|
|
312 |
return desc; |
|
313 |
} finally { |
|
314 |
table.close(); |
|
315 |
} |
|
316 |
} |
|
317 |
|
|
318 |
public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException { |
|
319 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
320 |
final HTable table = new HTable(conf, tableName); |
|
321 |
try { |
|
322 |
final ResultScanner rs = table.getScanner(scan); |
|
323 |
try { |
|
324 |
return Lists.newArrayList(rs.iterator()); |
|
325 |
} finally { |
|
326 |
rs.close(); |
|
327 |
} |
|
328 |
} finally { |
|
329 |
table.close(); |
|
330 |
} |
|
331 |
} |
|
332 |
|
|
333 |
public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException { |
|
334 |
if (StringUtils.isBlank(path)) |
|
335 |
throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path."); |
|
336 |
|
|
337 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
338 |
|
|
339 |
try { |
|
340 |
final FileSystem hdfs = FileSystem.get(conf); |
|
341 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
|
342 |
|
|
343 |
if (hdfs.exists(absolutePath)) { |
|
344 |
log.debug("deleteFromHBase path: " + absolutePath.toString()); |
|
345 |
hdfs.delete(absolutePath, true); |
|
346 |
log.info("deleted path: " + absolutePath.toString()); |
|
347 |
return true; |
|
348 |
} else { |
|
349 |
log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString()); |
|
350 |
return false; |
|
351 |
} |
|
352 |
} catch (IOException e) { |
|
353 |
throw new HadoopServiceException(e); |
|
354 |
} |
|
355 |
} |
|
356 |
|
|
357 |
public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException { |
|
358 |
if (StringUtils.isBlank(path)) |
|
359 |
throw new HadoopServiceException("Cannot create an empty HDFS path."); |
|
360 |
|
|
361 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
362 |
|
|
363 |
try { |
|
364 |
final FileSystem hdfs = FileSystem.get(conf); |
|
365 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
|
366 |
if (!hdfs.exists(absolutePath)) { |
|
367 |
hdfs.mkdirs(absolutePath); |
|
368 |
log.info("created path: " + absolutePath.toString()); |
|
369 |
return true; |
|
370 |
} else if (force) { |
|
371 |
log.info(String.format("found directory '%s', force delete it", absolutePath.toString())); |
|
372 |
hdfs.delete(absolutePath, true); |
|
373 |
|
|
374 |
hdfs.mkdirs(absolutePath); |
|
375 |
log.info("created path: " + absolutePath.toString()); |
|
376 |
return true; |
|
377 |
} else { |
|
378 |
log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString())); |
|
379 |
return false; |
|
380 |
} |
|
381 |
} catch (IOException e) { |
|
382 |
throw new HadoopServiceException(e); |
|
383 |
} |
|
384 |
} |
|
385 |
|
|
386 |
public boolean existHdfsPath(final ClusterName clusterName, final String path) throws HadoopServiceException { |
|
387 |
if (StringUtils.isBlank(path)) |
|
388 |
throw new HadoopServiceException("invalid empty path"); |
|
389 |
|
|
390 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
391 |
try { |
|
392 |
final FileSystem hdfs = FileSystem.get(conf); |
|
393 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
|
394 |
return hdfs.exists(absolutePath); |
|
395 |
} catch (IOException e) { |
|
396 |
throw new HadoopServiceException(e); |
|
397 |
} |
|
398 |
} |
|
399 |
|
|
400 |
public Configuration getClusterConiguration(final ClusterName clusterName) { |
|
401 |
return configurationEnumerator.get(clusterName); |
|
402 |
} |
|
403 |
|
|
404 |
public int getMaxVersions() { |
|
405 |
return maxVersions; |
|
406 |
} |
|
407 |
|
|
408 |
@Required |
|
409 |
public void setMaxVersions(final int maxVersions) { |
|
410 |
this.maxVersions = maxVersions; |
|
411 |
} |
|
412 |
|
|
413 |
public HadoopClientMap getClients() { |
|
414 |
return clients; |
|
415 |
} |
|
416 |
|
|
417 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.6.2/src/main/java/eu/dnetlib/data/hadoop/HadoopJob.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import java.util.Date; |
|
4 |
import java.util.concurrent.Executor; |
|
5 |
import java.util.concurrent.Executors; |
|
6 |
|
|
7 |
import org.apache.commons.logging.Log; |
|
8 |
import org.apache.commons.logging.LogFactory; |
|
9 |
|
|
10 |
import eu.dnetlib.data.hadoop.action.JobMonitor; |
|
11 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
12 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor; |
|
13 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
14 |
import eu.dnetlib.data.hadoop.utils.JobProfile; |
|
15 |
|
|
16 |
public class HadoopJob { |
|
17 |
|
|
18 |
private static final Log log = LogFactory.getLog(HadoopJob.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
19 |
|
|
20 |
/** |
|
21 |
* Defines the possible stati of an hadoop job. |
|
22 |
*/ |
|
23 |
public static enum Status { |
|
24 |
PREP, RUNNING, SUCCEEDED, KILLED, FAILED, SUSPENDED |
|
25 |
} |
|
26 |
|
|
27 |
private final Executor executor = Executors.newSingleThreadExecutor(); |
Also available in: Unified diff
[maven-release-plugin] copy for tag dnet-hadoop-service-2.6.2