Project

General

Profile

« Previous | Next » 

Revision 45304

codebase used to migrate to java8 the production system

View differences:

modules/dnet-hadoop-service/trunk/deploy.info
1
{"type_source": "SVN", "goal": "package -U -T 4C source:jar", "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet40/modules/dnet-hadoop-service/trunk/", "deploy_repository": "dnet4-snapshots", "version": "4", "mail": "sandro.labruzzo@isti.cnr.it,michele.artini@isti.cnr.it, claudio.atzori@isti.cnr.it, alessia.bardi@isti.cnr.it", "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet4-snapshots", "name": "dnet-hadoop-service"}
modules/dnet-hadoop-service/trunk/src/test/java/eu/dnetlib/data/hadoop/utils/CopyTableTest.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import static org.junit.Assert.assertNotNull;
4

  
5
import java.io.IOException;
6
import java.util.Map.Entry;
7

  
8
import org.apache.hadoop.conf.Configuration;
9
import org.apache.hadoop.hbase.mapreduce.CopyTable;
10
import org.apache.hadoop.mapreduce.Job;
11
import org.junit.Test;
12

  
13
public class CopyTableTest {
14

  
15
	@Test
16
	public void testCopyTable() throws IOException {
17

  
18
		Job job =
19
				CopyTable.createSubmittableJob(new Configuration(), new String[] { "--peer.adr=server1,server2,server3:2181:/hbase",
20
					"--families=myOldCf:myNewCf,cf2,cf3", "tableName" });
21
		assertNotNull(job);
22

  
23
		Configuration conf = job.getConfiguration();
24

  
25
		for (Entry<String, String> e : conf) {
26
			System.out.println(String.format("<PROPERTY key=\"%s\" value=\"%s\"/>", e.getKey(), e.getValue()));
27
		}
28

  
29
	}
30
}
modules/dnet-hadoop-service/trunk/src/test/java/eu/dnetlib/data/hadoop/utils/ReadSequenceFileTest.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.util.Map;
4
import java.util.Map.Entry;
5

  
6
import org.apache.commons.math.stat.descriptive.SummaryStatistics;
7
import org.apache.hadoop.conf.Configuration;
8
import org.apache.hadoop.fs.Path;
9
import org.apache.hadoop.io.Text;
10
import org.junit.Before;
11
import org.junit.Ignore;
12
import org.junit.Test;
13
import org.springframework.core.io.ClassPathResource;
14

  
15
import com.google.common.collect.Maps;
16

  
17
import eu.dnetlib.data.hadoop.config.ConfigurationFactory;
18
import eu.dnetlib.data.hadoop.hdfs.SequenceFileUtils;
19
import eu.dnetlib.miscutils.collections.Pair;
20

  
21
public class ReadSequenceFileTest {
22

  
23
	private static final Path SEQUENCE_FILE_PATH = new Path("hdfs://nmis-hadoop-cluster/tmp/indexrecords_db_openaireplus_sesam_SESAMALL.seq");
24
	private final String HADOOP_CONF_FILE = "/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties";
25

  
26
	private Configuration conf;
27

  
28
	@Before
29
	public void setUp() {
30
		final ConfigurationFactory confFactory = new ConfigurationFactory();
31
		confFactory.setDefaults(new ClassPathResource(HADOOP_CONF_FILE));
32
		conf = confFactory.getConfiguration();
33
	}
34

  
35
	@Test
36
	@Ignore
37
	public void testReadSequenceFile() throws Exception {
38
		final SummaryStatistics statsAll = new SummaryStatistics();
39

  
40
		final Map<String, SummaryStatistics> stats = Maps.newHashMap();
41

  
42
		int i = 0;
43
		for (Pair<Text, Text> pair : SequenceFileUtils.read(SEQUENCE_FILE_PATH, conf)) {
44
			final String id = pair.getKey().toString();
45
			final String record = pair.getValue().toString();
46
			final int length = record.getBytes().length;
47

  
48
			final String type = id.substring(0, 2);
49
			if (!stats.containsKey(type)) {
50
				stats.put(type, new SummaryStatistics());
51
			}
52
			statsAll.addValue(length);
53
			stats.get(type).addValue(length);
54

  
55
			if (++i % 10000 == 0) {
56
				System.out.println("Read " + i);
57
			}
58
		}
59

  
60
		printStats("ALL", statsAll);
61
		for (Entry<String, SummaryStatistics> e : stats.entrySet()) {
62
			printStats(e.getKey(), e.getValue());
63
		}
64
	}
65

  
66
	private void printStats(final String type, final SummaryStatistics stats) {
67
		System.out.println("************************************");
68
		System.out.println("Type: " + type);
69
		System.out.println(String.format("\tmin    : %.2f KBytes", stats.getMin() / 1024));
70
		System.out.println(String.format("\tmax    : %.2f KBytes", stats.getMax() / 1024));
71
		System.out.println(String.format("\tavg    : %.2f KBytes", stats.getMean() / 1024));
72
		System.out.println(String.format("\tstdDev : %.2f", stats.getStandardDeviation() / 1024));
73
	}
74
}
modules/dnet-hadoop-service/trunk/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTestContextConfiguration.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import org.springframework.context.annotation.Bean;
4
import org.springframework.context.annotation.Configuration;
5
import org.springframework.context.annotation.Profile;
6
import org.springframework.core.io.ClassPathResource;
7
import org.springframework.core.io.Resource;
8

  
9
import eu.dnetlib.data.hadoop.HadoopClientMap;
10
import eu.dnetlib.data.hadoop.HadoopServiceCore;
11
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
12
import eu.dnetlib.data.hadoop.config.ConfigurationFactory;
13
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory;
14
import eu.dnetlib.data.mapreduce.JobClientFactory;
15

  
16
@Configuration
17
@Profile(value = "test")
18
public class HBaseTestContextConfiguration {
19

  
20
	public static final String ENABLED_CLIENTS = "{"
21
			+ "\"DM\":{\"oozie\":\"false\",\"mapred\":\"false\",\"hbase\":\"true\"},"
22
			+ "\"IIS\":{\"oozie\":\"false\",\"mapred\":\"false\",\"hbase\":\"false\"}"
23
			+ "}";
24

  
25
	public static final int MAX_VERSIONS = 10;
26

  
27
	@Bean
28
	public HadoopServiceCore hadoopServiceCore() {
29
		final HadoopServiceCore core = new HadoopServiceCore();
30

  
31
		core.setMaxVersions(MAX_VERSIONS);
32

  
33
		System.out.println("using hbase max versions: " + MAX_VERSIONS);
34
		return core;
35
	}
36

  
37
	@Bean(initMethod = "init")
38
	public HadoopClientMap hadoopClientMap() throws InterruptedException {
39
		final HadoopClientMap clientMap = new HadoopClientMap();
40
		clientMap.setEnabledClients(ENABLED_CLIENTS);
41
		clientMap.setClientsInitTime(10);
42

  
43
		return clientMap;
44
	}
45

  
46
	@Bean
47
	public HBaseAdminFactory hBaseAdminFactory() {
48
		return new HBaseAdminFactory();
49
	}
50

  
51
	@Bean
52
	public OozieClientFactory oozieClientFactory() {
53
		return new OozieClientFactory();
54
	}
55

  
56
	@Bean
57
	public JobClientFactory jobClientFactory() {
58
		return new JobClientFactory();
59
	}
60

  
61
	@Bean
62
	public ConfigurationEnumerator configurationEnumerator() {
63
		return new ConfigurationEnumerator();
64
	}
65

  
66
	@Bean
67
	public ConfigurationFactory DM() {
68
		return get(new ClassPathResource("/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties"));
69
	}
70

  
71
	@Bean
72
	public ConfigurationFactory IIS() {
73
		return get(new ClassPathResource("/eu/dnetlib/data/hadoop/config/hadoop-default.iis.icm.properties"));
74
	}
75

  
76
	protected ConfigurationFactory get(final Resource props) {
77
		final ConfigurationFactory configurationFactory = new ConfigurationFactory();
78
		configurationFactory.setDefaults(props);
79
		return configurationFactory;
80
	}
81

  
82
}
modules/dnet-hadoop-service/trunk/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTest.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import static org.junit.Assert.assertNotNull;
4
import static org.junit.Assert.assertTrue;
5

  
6
import java.io.IOException;
7
import java.util.Map.Entry;
8
import java.util.NavigableMap;
9
import java.util.Set;
10

  
11
import org.apache.hadoop.hbase.client.Get;
12
import org.apache.hadoop.hbase.client.HTable;
13
import org.apache.hadoop.hbase.client.Put;
14
import org.apache.hadoop.hbase.client.Result;
15
import org.apache.hadoop.hbase.util.Bytes;
16
import org.junit.After;
17
import org.junit.Before;
18
import org.junit.Ignore;
19
import org.junit.Test;
20
import org.junit.runner.RunWith;
21
import org.springframework.beans.factory.annotation.Autowired;
22
import org.springframework.test.context.ActiveProfiles;
23
import org.springframework.test.context.ContextConfiguration;
24
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
25

  
26
import com.google.common.collect.Sets;
27

  
28
import eu.dnetlib.data.hadoop.HadoopServiceCore;
29
import eu.dnetlib.data.hadoop.config.ClusterName;
30
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
31
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
32
import eu.dnetlib.miscutils.datetime.DateUtils;
33

  
34
@ActiveProfiles("test")
35
@RunWith(SpringJUnit4ClassRunner.class)
36
@ContextConfiguration(classes = HBaseTestContextConfiguration.class)
37
public class HBaseTest {
38

  
39
	protected static final String TEST_TABLE = "dnet_test_table";
40

  
41
	protected static final int NUM_VERSIONS = 10;
42

  
43
	@Autowired
44
	private HadoopServiceCore hadoopServiceCore;
45

  
46
	@Autowired
47
	private ConfigurationEnumerator configurationEnumerator;
48

  
49
	@Before
50
	public void setUp() throws HadoopServiceException, IOException, InterruptedException {
51
		assertNotNull(hadoopServiceCore);
52

  
53
		System.out.println("waiting for clients to be ready... timeout? " + !hadoopServiceCore.getClients().waitClients());
54

  
55
		ensureDropTable();
56
	}
57

  
58
	@After
59
	public void tearDown() throws HadoopServiceException, IOException {
60
		ensureDropTable();
61
	}
62

  
63
	@Test
64
	@Ignore
65
	// TODO allow testing on a dev cluster instance
66
	public void testReadWrite() throws HadoopServiceException, IOException, InterruptedException {
67

  
68
		hadoopServiceCore.createTable(ClusterName.DM, TEST_TABLE, testSchema());
69
		assertTrue(hadoopServiceCore.existTable(ClusterName.DM, TEST_TABLE));
70

  
71
		final HTable htable = new HTable(configurationEnumerator.get(ClusterName.DM), TEST_TABLE);
72

  
73
		final Put put = new Put(Bytes.toBytes("1"));
74

  
75
		for (int i = 0; i < NUM_VERSIONS; i++) {
76
			put.add(Bytes.toBytes("result"), Bytes.toBytes("body"), Bytes.toBytes(i + ""));
77
			htable.put(put);
78
			Thread.sleep(1000);
79
		}
80
		final Get get = new Get(Bytes.toBytes("1"));
81
		get.setMaxVersions(HBaseTestContextConfiguration.MAX_VERSIONS);
82

  
83
		final Result r = htable.get(get);
84

  
85
		// Map<family,Map<qualifier,Map<timestamp,value>>>
86
		final NavigableMap<Long, byte[]> versions = r.getMap().get(Bytes.toBytes("result")).get(Bytes.toBytes("body"));
87

  
88
		for (final Entry<Long, byte[]> e : versions.entrySet()) {
89
			System.out.println("t: " + DateUtils.calculate_ISO8601(e.getKey()) + ", v: " + Bytes.toString(e.getValue()));
90
		}
91

  
92
		htable.close();
93

  
94
	}
95

  
96
	protected void ensureDropTable() throws HadoopServiceException, IOException {
97
		if (hadoopServiceCore.existTable(ClusterName.DM, TEST_TABLE)) {
98
			hadoopServiceCore.dropTable(ClusterName.DM, TEST_TABLE);
99
		}
100
	}
101

  
102
	protected Set<String> testSchema() {
103
		final Set<String> schema = Sets.newHashSet();
104

  
105
		schema.add("result");
106

  
107
		return schema;
108
	}
109

  
110
}
modules/dnet-hadoop-service/trunk/src/test/resources/log4j.properties
1
### Root Level ###
2
log4j.rootLogger=WARN, LOGFILE, CONSOLE
3

  
4
### Configuration for the LOGFILE appender ###
5
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender
6
log4j.appender.LOGFILE.MaxFileSize=25MB
7
log4j.appender.LOGFILE.MaxBackupIndex=10
8
log4j.appender.LOGFILE.File=logs/dnet.log
9
log4j.appender.LOGFILE.Append=true
10
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
11
log4j.appender.LOGFILE.layout.ConversionPattern=[%-5p] %d %c - %m%n
12

  
13
### Configuration for the CONSOLE appender ###
14
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
15
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
16
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c - %m%n
17

  
18
org.apache.cxf.Logger=org.apache.cxf.common.logging.Log4jLogger
19

  
20
### Application Level ###
21
log4j.logger.eu.dnetlib=INFO
22
log4j.logger.eu.dnetlib.enabling.is.sn=INFO
23
log4j.logger.org.apache.cxf.interceptor=FATAL
24
log4j.logger.org.apache.cxf.ws.addressing.ContextUtils=FATAL
25
log4j.logger.eu.dnetlib.enabling.tools.AbstractBaseService=INFO
26
log4j.logger.eu.dnetlib.enabling.inspector=DEBUG
27
log4j.logger.eu.dnetlib.xml.database.LoggingTrigger=WARN
28
log4j.logger.eu.dnetlib.enabling.tools.registration.ServiceRegistrator=INFO
29
log4j.logger.eu.dnetlib.enabling.inspector=FATAL
30
log4j.logger.eu.dnetlib.enabling.inspector.SubscriptionController=DEBUG
31
log4j.logger.eu.dnetlib.springutils.stringtemplate.StringTemplateViewResolver=FATAL             
32
log4j.logger.eu.dnetlib.enabling.is.sn.SynchronousNotificationSenderImpl=WARN
33
log4j.logger.eu.dnetlib.enabling.tools.LocalServiceResolverImpl=WARN
34
log4j.logger.eu.dnetlib.enabling.is.sn.NotificationInvokerImpl=WARN
35
log4j.logger.eu.dnetlib.data.collective=INFO
36
log4j.logger.eu.dnetlib.data.hadoop.utils.ScanFactory=DEBUG
37
log4j.logger.org.apache.xerces.parsers.SAXParser=OFF
38
log4j.logger.eu.dnetlib.conf.PropertyFetcher=WARN
39
#log4j.logger.eu.dnetlib.data.transform.XsltRowTransformerFactory=DEBUG
40

  
41
log4j.logger.eu.dnetlib.enabling.is.sn.ISSNServiceImpl=OFF
42
log4j.logger.eu.dnetlib.enabling.datasources.DatasourceManagerClients=FATAL
43
log4j.logger.eu.dnetlib.data.mdstore.modular.mongodb.utils.MetadataCheckJob=DEBUG
44
log4j.logger.eu.dnetlib.enabling.is.sn.ISSNServiceCore=WARN
45
log4j.logger.eu.dnetlib.xml.database.exist.ExistDatabase=WARN
46
log4j.logger.eu.dnetlib.enabling.is.store.AbstractContentInitializer=FATAL
47

  
48
log4j.logger.org.apache.hadoop.hbase.mapreduce.TableInputFormatBase=FATAL
49

  
50
### Spring ###
51
log4j.logger.org.springframework=ERROR
52

  
53

  
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/utils/ScanProperties.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.util.Set;
4

  
5
import org.apache.hadoop.hbase.filter.FilterList;
6
import org.apache.hadoop.hbase.filter.FilterList.Operator;
7

  
8
import com.google.common.collect.Sets;
9

  
10
public class ScanProperties {
11

  
12
	private static final Operator DEFAULT_OPERATOR = Operator.MUST_PASS_ALL;
13

  
14
	private int caching = 100;
15
	private FilterList filterList;
16
	private Set<String> families = Sets.newHashSet();
17

  
18
	public ScanProperties(final String op) {
19
		Operator operator = DEFAULT_OPERATOR;
20
		if ((op != null) && !op.isEmpty()) {
21
			operator = Operator.valueOf(op);
22
		}
23
		filterList = new FilterList(operator);
24
	}
25

  
26
	public FilterList getFilterList() {
27
		return filterList;
28
	}
29

  
30
	public void setFilterList(final FilterList filterList) {
31
		this.filterList = filterList;
32
	}
33

  
34
	public Set<String> getFamilies() {
35
		return families;
36
	}
37

  
38
	public void setFamilies(final Set<String> families) {
39
		this.families = families;
40
	}
41

  
42
	public int getCaching() {
43
		return caching;
44
	}
45

  
46
	public void setCaching(int caching) {
47
		this.caching = caching;
48
	}
49
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/utils/HadoopUtils.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.util.Map.Entry;
4
import java.util.Set;
5

  
6
import org.apache.hadoop.hbase.HColumnDescriptor;
7
import org.apache.hadoop.hbase.HTableDescriptor;
8

  
9
import com.google.common.base.Function;
10
import com.google.common.base.Predicate;
11

  
12
import eu.dnetlib.data.hadoop.HadoopJob;
13
import eu.dnetlib.data.hadoop.config.ClusterName;
14
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
15
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
16

  
17
public class HadoopUtils {
18

  
19
	public static Function<HTableDescriptor, String> tableName() {
20
		return new Function<HTableDescriptor, String>() {
21

  
22
			@Override
23
			public String apply(final HTableDescriptor d) {
24
				return d.getNameAsString();
25
			}
26
		};
27
	}
28

  
29
	public static Function<HColumnDescriptor, String> columnName() {
30
		return new Function<HColumnDescriptor, String>() {
31

  
32
			@Override
33
			public String apply(final HColumnDescriptor d) {
34
				return d.getNameAsString();
35
			}
36
		};
37
	}
38

  
39
	public static Predicate<String> columnPredicate(final Set<String> cols) {
40
		return new HadoopUtils().getSetPredicate(cols);
41
	}
42

  
43
	public SetPredicate getSetPredicate(final Set<String> set) {
44
		return new SetPredicate(set);
45
	}
46

  
47
	class SetPredicate implements Predicate<String> {
48

  
49
		private final Set<String> set;
50

  
51
		public SetPredicate(final Set<String> set) {
52
			this.set = set;
53
		}
54

  
55
		@Override
56
		public boolean apply(final String s) {
57
			return !set.contains(s);
58
		}
59
	}
60

  
61
	public static Function<Entry<String, HadoopJob>, HadoopJobDescriptor> hadoopJobDescriptor() {
62
		return new Function<Entry<String, HadoopJob>, HadoopJobDescriptor>() {
63

  
64
			@Override
65
			public HadoopJobDescriptor apply(final Entry<String, HadoopJob> e) {
66
				try {
67
					return e.getValue().asDescriptor();
68
				} catch (HadoopServiceException e1) {
69
					return null;
70
				}
71
			}
72
		};
73
	}
74

  
75
	public static Predicate<HadoopJob> filterByCluster(final ClusterName clusterName) {
76
		return new Predicate<HadoopJob>() {
77

  
78
			@Override
79
			public boolean apply(final HadoopJob job) {
80
				return job.getClusterName().equals(clusterName);
81
			}
82

  
83
		};
84
	}
85

  
86
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/utils/JobProfile.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.util.Map;
4
import java.util.Set;
5

  
6
import com.google.common.collect.Maps;
7
import com.google.common.collect.Sets;
8

  
9
import eu.dnetlib.data.hadoop.rmi.HadoopJobType;
10

  
11
public class JobProfile {
12

  
13
	private final Map<String, String> jobDefinition = Maps.newHashMap();
14
	private final Set<String> requiredParams = Sets.newHashSet();
15
	private ScanProperties scanProperties;
16

  
17
	private String name;
18

  
19
	private String description = "";
20

  
21
	private HadoopJobType jobType;
22

  
23
	public boolean isEmpty() {
24
		return getJobDefinition().isEmpty();
25
	}
26

  
27
	public Map<String, String> getJobDefinition() {
28
		return jobDefinition;
29
	}
30

  
31
	public Set<String> getRequiredParams() {
32
		return requiredParams;
33
	}
34

  
35
	public String getName() {
36
		return name;
37
	}
38

  
39
	public void setName(String name) {
40
		this.name = name;
41
	}
42

  
43
	public String getDescription() {
44
		return description;
45
	}
46

  
47
	public void setDescription(String description) {
48
		this.description = description;
49
	}
50

  
51
	public ScanProperties getScanProperties() {
52
		return scanProperties;
53
	}
54

  
55
	public void setScanProperties(ScanProperties scanProperties) {
56
		this.scanProperties = scanProperties;
57
	}
58

  
59
	public HadoopJobType getJobType() {
60
		return jobType;
61
	}
62

  
63
	public void setJobType(HadoopJobType jobType) {
64
		this.jobType = jobType;
65
	}
66

  
67
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/utils/ScanFactory.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.io.ByteArrayOutputStream;
4
import java.io.DataOutputStream;
5
import java.io.IOException;
6
import java.util.Map;
7

  
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.hbase.client.Scan;
12
import org.apache.hadoop.hbase.filter.PrefixFilter;
13
import org.apache.hadoop.hbase.util.Base64;
14
import org.dom4j.Document;
15
import org.dom4j.Node;
16

  
17
public class ScanFactory {
18

  
19
	private static final Log log = LogFactory.getLog(ScanFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
20

  
21
	public static String getScan(final ScanProperties scanProperties) throws IOException {
22
		Scan scan = new Scan();
23

  
24
		scan.setCaching(scanProperties.getCaching());
25
		scan.setCacheBlocks(false); // don't set to true for MR jobs
26

  
27
		scan.setFilter(scanProperties.getFilterList());
28
		for (String family : scanProperties.getFamilies()) {
29
			scan.addFamily(family.getBytes());
30
		}
31

  
32
		log.debug("serializing scan");
33
		return convertScanToString(scan);
34
	}
35

  
36
	public static ScanProperties parseScanProperties(final Document doc, final Map<String, String> bbParams) {
37
		log.debug("setting job scanner");
38

  
39
		ScanProperties scanProperties = new ScanProperties(doc.valueOf("//FILTERS/@operator"));
40

  
41
		String caching = doc.valueOf("//SCAN/@caching");
42
		if (!StringUtils.isBlank(caching)) {
43
			log.info("overriding default scan caching with: " + caching);
44
			scanProperties.setCaching(Integer.valueOf(caching));
45
		}
46

  
47
		for (Object o : doc.selectNodes("//SCAN/FAMILIES/FAMILY")) {
48
			Node node = (Node) o;
49
			String value = node.valueOf("./@value");
50
			if ((value == null) || value.isEmpty()) {
51
				value = bbParams.get(node.valueOf("./@param"));
52
			}
53
			log.debug("scanner family value: " + value);
54
			scanProperties.getFamilies().add(value);
55
		}
56
		for (Object o : doc.selectNodes("//SCAN/FILTERS/FILTER")) {
57
			Node node = (Node) o;
58
			String filterType = node.valueOf("./@type");
59

  
60
			String value = node.valueOf("./@value");
61
			if ((value == null) || value.isEmpty()) {
62
				value = bbParams.get(node.valueOf("./@param"));
63
			}
64

  
65
			if (filterType.equals("prefix")) {
66
				log.debug("scanner prefix filter, value: " + value);
67
				scanProperties.getFilterList().addFilter(new PrefixFilter(value.getBytes()));
68
			} // TODO add more filterType cases here
69
		}
70
		return scanProperties;
71
	}
72

  
73
	/**
74
	 * Writes the given scan into a Base64 encoded string.
75
	 *
76
	 * @param scan
77
	 *            The scan to write out.
78
	 * @return The scan saved in a Base64 encoded string.
79
	 * @throws IOException
80
	 *             When writing the scan fails.
81
	 */
82
	private static String convertScanToString(final Scan scan) throws IOException {
83
		ByteArrayOutputStream out = new ByteArrayOutputStream();
84
		DataOutputStream dos = new DataOutputStream(out);
85
		scan.write(dos);
86
		return Base64.encodeBytes(out.toByteArray());
87
	}
88

  
89
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseDeleteFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.util.List;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.hbase.client.Delete;
8
import org.apache.hadoop.hbase.client.Mutation;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
import eu.dnetlib.data.transform.Column;
12
import eu.dnetlib.data.transform.Row;
13

  
14
/**
15
 * The Class HBaseDeleteFeeder performs a batch of Delete operations.
16
 */
17
public class HBaseDeleteFeeder extends HbaseTableFeeder {
18

  
19
	/**
20
	 * Logger.
21
	 */
22
	private static final Log log = LogFactory.getLog(HBaseDeleteFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	/*
25
	 * (non-Javadoc)
26
	 *
27
	 * @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row)
28
	 */
29
	@Override
30
	protected void addOperation(final List<Mutation> buffer, final Row row) {
31
		final Delete delete = new Delete(Bytes.toBytes(row.getKey()));
32
		delete.setWriteToWAL(true);
33

  
34
		for (final Column<String, byte[]> col : row) {
35
			log.debug(String.format("deleting K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName()));
36
			delete.deleteColumns(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName()));
37
		}
38

  
39
		buffer.add(delete);
40
	}
41

  
42
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseAdminFactory.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5
import org.apache.hadoop.hbase.client.HBaseAdmin;
6
import org.springframework.beans.factory.annotation.Autowired;
7

  
8
import eu.dnetlib.data.hadoop.config.ClusterName;
9
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
10
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
11

  
12
public class HBaseAdminFactory {
13

  
14
	private static final Log log = LogFactory.getLog(HBaseAdminFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
15

  
16
	@Autowired
17
	private ConfigurationEnumerator configurationEnumerator;
18

  
19
	public HBaseAdmin newInstance(final ClusterName clusterName) throws HadoopServiceException {
20
		try {
21
			log.info("init hbaseAdmin, cluster: " + clusterName.toString());
22
			return new HBaseAdmin(configurationEnumerator.get(clusterName));
23
		} catch (final Throwable e) {
24
			log.warn("unable to initialize hbase client for cluster: " + clusterName.toString(), e);
25
			return null;
26
		}
27
	}
28
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/hbase/HbaseTableFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.List;
6

  
7
import com.google.common.base.Predicates;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Lists;
10
import eu.dnetlib.data.hadoop.config.ClusterName;
11
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
12
import eu.dnetlib.data.transform.Row;
13
import eu.dnetlib.data.transform.XsltRowTransformerFactory;
14
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17
import org.apache.hadoop.conf.Configuration;
18
import org.apache.hadoop.hbase.client.HTable;
19
import org.apache.hadoop.hbase.client.Mutation;
20
import org.springframework.beans.factory.annotation.Autowired;
21
import org.springframework.beans.factory.annotation.Required;
22

  
23
/**
24
 * The Class HbaseTableFeeder provides abstraction to ship batch operation on an HBase table
25
 */
26
public abstract class HbaseTableFeeder {
27

  
28
	/**
29
	 * The Constant log.
30
	 */
31
	private static final Log log = LogFactory.getLog(HbaseTableFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
32
	/**
33
	 * The configuration enumerator.
34
	 */
35
	@Autowired
36
	protected ConfigurationEnumerator configurationEnumerator;
37
	/**
38
	 * The batch size.
39
	 */
40
	private int batchSize = 100;
41
	/**
42
	 * The result set client factory.
43
	 */
44
	private ResultSetClientFactory resultSetClientFactory;
45

  
46
	/**
47
	 * Adds the operation.
48
	 *
49
	 * @param buffer the buffer
50
	 * @param row    the row
51
	 */
52
	protected abstract void addOperation(final List<Mutation> buffer, final Row row);
53

  
54
	/**
55
	 * Feed.
56
	 *
57
	 * @param epr         the epr
58
	 * @param xsl         the xsl
59
	 * @param clusterName the cluster name
60
	 * @param tableName   the table name
61
	 * @param simulation  the simulation
62
	 * @return the int
63
	 * @throws IOException          Signals that an I/O exception has occurred.
64
	 * @throws InterruptedException the interrupted exception
65
	 */
66
	public int feed(final String epr, final String xsl, final ClusterName clusterName, final String tableName, final boolean simulation)
67
			throws IOException, InterruptedException {
68
		return doWrite(asRows(epr, xsl), getConf(clusterName), tableName, simulation);
69
	}
70

  
71
	/**
72
	 * Do writeOnHBase.
73
	 *
74
	 * @param rows          the rows
75
	 * @param configuration the configuration
76
	 * @param tableName     the table name
77
	 * @param simulation    the simulation
78
	 * @return the int
79
	 * @throws IOException          Signals that an I/O exception has occurred.
80
	 * @throws InterruptedException the interrupted exception
81
	 */
82
	private int doWrite(final Iterable<Row> rows, final Configuration configuration, final String tableName, final boolean simulation)
83
			throws IOException, InterruptedException {
84
		final List<Mutation> buffer = Lists.newArrayList();
85

  
86
		int count = 0;
87
		if (simulation) {
88
			log.info("running in simulation mode ...");
89
			log.info(String.format("... simulated import of %d records", Iterables.size(rows)));
90
		} else {
91

  
92
			final HTable htable = new HTable(configuration, tableName);
93
			try {
94
				int i = 0;
95
				for (final Row row : rows) {
96
					addOperation(buffer, row);
97
					if ((++i % getBatchSize()) == 0) {
98
						flush(tableName, buffer, htable);
99
						count += buffer.size();
100
						buffer.clear();
101
					}
102
				}
103
			} finally {
104
				if (!buffer.isEmpty()) {
105
					flush(tableName, buffer, htable);
106
					count += buffer.size();
107
				}
108
				htable.flushCommits();
109
				htable.close();
110
			}
111
		}
112
		return count;
113
	}
114

  
115
	private void flush(final String tableName, final List<Mutation> buffer, final HTable htable) throws IOException, InterruptedException {
116
		if (!checkOp(htable.batch(buffer), tableName)) throw new IOException("unable to flush operation on HBase table: " + tableName);
117
	}
118

  
119
	private boolean checkOp(final Object[] res, final String tableName) throws IOException {
120
		return Iterables.all(Arrays.asList(res), Predicates.notNull());
121
	}
122

  
123
	/**
124
	 * As rows.
125
	 *
126
	 * @param epr the epr
127
	 * @param xsl the xsl
128
	 * @return the iterable
129
	 */
130
	protected Iterable<Row> asRows(final String epr, final String xsl) {
131
		return Iterables.concat(Iterables.transform(getResultSetClientFactory().getClient(epr), XsltRowTransformerFactory.newInstance(xsl)));
132
	}
133

  
134
	/**
135
	 * Gets the conf.
136
	 *
137
	 * @param clusterName the cluster name
138
	 * @return the conf
139
	 */
140
	protected Configuration getConf(final ClusterName clusterName) {
141
		return configurationEnumerator.get(clusterName);
142
	}
143

  
144
	/**
145
	 * Gets the batch size.
146
	 *
147
	 * @return the batch size
148
	 */
149
	public int getBatchSize() {
150
		return batchSize;
151
	}
152

  
153
	/**
154
	 * Sets the batch size.
155
	 *
156
	 * @param batchSize the new batch size
157
	 */
158
	public void setBatchSize(final int batchSize) {
159
		this.batchSize = batchSize;
160
	}
161

  
162
	/**
163
	 * Gets the result set client factory.
164
	 *
165
	 * @return the result set client factory
166
	 */
167
	public ResultSetClientFactory getResultSetClientFactory() {
168
		return resultSetClientFactory;
169
	}
170

  
171
	/**
172
	 * Sets the result set client factory.
173
	 *
174
	 * @param resultSetClientFactory the new result set client factory
175
	 */
176
	@Required
177
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
178
		this.resultSetClientFactory = resultSetClientFactory;
179
	}
180

  
181
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/hbase/HBasePutFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.util.List;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.hbase.client.Mutation;
8
import org.apache.hadoop.hbase.client.Put;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
import eu.dnetlib.data.transform.Column;
12
import eu.dnetlib.data.transform.Row;
13

  
14
/**
15
 * The Class HBasePutFeeder performs a batch of Put operations..
16
 */
17
public class HBasePutFeeder extends HbaseTableFeeder {
18

  
19
	/**
20
	 * Logger.
21
	 */
22
	private static final Log log = LogFactory.getLog(HBasePutFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	/*
25
	 * (non-Javadoc)
26
	 *
27
	 * @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row)
28
	 */
29
	@Override
30
	protected void addOperation(final List<Mutation> buffer, final Row row) {
31
		final Put put = new Put(Bytes.toBytes(row.getKey()));
32
		put.setWriteToWAL(true);
33

  
34
		for (final Column<String, byte[]> col : row) {
35
			log.debug(String.format("adding value to K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName()));
36
			put.add(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName()), col.getValue());
37
		}
38

  
39
		buffer.add(put);
40
	}
41

  
42
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/JobRegistry.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.util.Date;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Map.Entry;
7

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Required;
11

  
12
import com.google.common.collect.BiMap;
13
import com.google.common.collect.HashBiMap;
14
import com.google.common.collect.Iterables;
15
import com.google.common.collect.Lists;
16
import com.google.common.collect.Maps;
17

  
18
import eu.dnetlib.data.hadoop.HadoopJob.Status;
19
import eu.dnetlib.data.hadoop.config.ClusterName;
20
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
21
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
22
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
23

  
24
public class JobRegistry {
25

  
26
	private static final Log log = LogFactory.getLog(JobRegistry.class); // NOPMD by marko on 11/24/08 5:02 PM
27

  
28
	private int maxJobs;
29

  
30
	private final BiMap<String, HadoopJob> jobs = HashBiMap.create();
31

  
32
	public String registerJob(HadoopJob hadoopJob) throws HadoopServiceException {
33

  
34
		if (jobs.containsValue(hadoopJob)) { return jobs.inverse().get(hadoopJob); }
35

  
36
		if (jobs.size() > getMaxJobs()) {
37
			removeOldestProcess();
38
		}
39

  
40
		jobs.put(hadoopJob.getId(), hadoopJob);
41
		log.info("Registered hadoop job " + hadoopJob.getId());
42
		hadoopJob.startMonitor();
43

  
44
		return hadoopJob.getId();
45
	}
46

  
47
	public Status getJobStatus(String id) {
48
		return findJob(id).getStatus();
49
	}
50

  
51
	public HadoopJob findJob(String id) {
52
		return jobs.get(id);
53
	}
54

  
55
	public void unregisterJob(String id) throws HadoopServiceException {
56

  
57
		if (!jobs.containsKey(id)) { throw new HadoopServiceException("unable to unregister job, could not find jobId in registry: " + id); }
58

  
59
		log.info("unregistering job: " + id);
60
		jobs.get(id).getJobMonitor().kill();
61
		jobs.remove(id);
62
	}
63

  
64
	private void removeOldestProcess() throws HadoopServiceException {
65
		Date oldDate = new Date();
66
		String oldId = null;
67

  
68
		for (Entry<String, HadoopJob> e : jobs.entrySet()) {
69
			final HadoopJob hadoopJob = e.getValue();
70

  
71
			if (hadoopJob.isComplete()) {
72
				final Date date = hadoopJob.getLastActivity();
73
				if (date.before(oldDate)) {
74
					oldDate = date;
75
					oldId = e.getKey();
76
				}
77
			}
78
		}
79

  
80
		if (oldId != null) {
81
			unregisterJob(oldId);
82
		}
83

  
84
	}
85

  
86
	public List<HadoopJobDescriptor> listJobs(ClusterName clusterName) {
87
		Map<String, HadoopJob> filtered = Maps.filterValues(jobs, HadoopUtils.filterByCluster(clusterName));
88
		return Lists.newArrayList(Iterables.transform(filtered.entrySet(), HadoopUtils.hadoopJobDescriptor()));
89
	}
90

  
91
	@Required
92
	public void setMaxJobs(final int maxJobs) {
93
		this.maxJobs = maxJobs;
94
	}
95

  
96
	public int getMaxJobs() {
97
		return maxJobs;
98
	}
99

  
100
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.io.IOException;
4
import java.net.URI;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Map.Entry;
8
import java.util.NavigableMap;
9
import java.util.Set;
10

  
11
import com.google.common.base.Function;
12
import com.google.common.collect.Iterables;
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Sets;
16
import eu.dnetlib.data.hadoop.config.ClusterName;
17
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
18
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
19
import eu.dnetlib.data.hadoop.rmi.hbase.Column;
20
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
21
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableDescriptor;
22
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableRegionInfo;
23
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
24
import org.apache.commons.lang.StringUtils;
25
import org.apache.commons.logging.Log;
26
import org.apache.commons.logging.LogFactory;
27
import org.apache.hadoop.conf.Configuration;
28
import org.apache.hadoop.fs.FileSystem;
29
import org.apache.hadoop.fs.Path;
30
import org.apache.hadoop.hbase.HColumnDescriptor;
31
import org.apache.hadoop.hbase.HRegionInfo;
32
import org.apache.hadoop.hbase.HTableDescriptor;
33
import org.apache.hadoop.hbase.client.*;
34
import org.apache.hadoop.hbase.util.Bytes;
35
import org.springframework.beans.factory.annotation.Autowired;
36
import org.springframework.beans.factory.annotation.Required;
37

  
38
public class HadoopServiceCore {
39

  
40
	private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM
41
	@Autowired
42
	protected ConfigurationEnumerator configurationEnumerator;
43
	@Autowired
44
	private HadoopClientMap clients;
45
	private int maxVersions;
46

  
47
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
48
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
49
		return Lists.newArrayList(Iterables.transform(Lists.newArrayList(admin.listTables()), HadoopUtils.tableName()));
50
	}
51

  
52
	private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException {
53
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
54

  
55
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
56

  
57
		return admin;
58
	}
59

  
60
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
61
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
62

  
63
		if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
64

  
65
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
66

  
67
		final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
68

  
69
		final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
70

  
71
		final Set<String> columns = Sets.newHashSet();
72

  
73
		for (HColumnDescriptor hColDesc : Lists.newArrayList(desc.getColumnFamilies())) {
74
			columns.add(hColDesc.getNameAsString());
75
		}
76

  
77
		HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
78
		htDescriptor.setColumns(columns);
79

  
80
		List<HBaseTableRegionInfo> regions = Lists.newArrayList();
81

  
82
		for (HRegionInfo info : tableRegions) {
83
			regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
84
		}
85
		htDescriptor.setRegions(regions);
86

  
87
		if (log.isDebugEnabled()) {
88
			log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
89
		}
90

  
91
		return htDescriptor.toString();
92
	}
93

  
94
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
95
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
96
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
97

  
98
		return Lists.newArrayList(Iterables.transform(desc.getFamilies(), new Function<HColumnDescriptor, String>() {
99

  
100
			@Override
101
			public String apply(final HColumnDescriptor desc) {
102
				return desc.getNameAsString();
103
			}
104
		}));
105
	}
106

  
107
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
108
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
109

  
110
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
111

  
112
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
113

  
114
		log.info("disabling table: " + table);
115
		admin.disableTable(table);
116

  
117
		log.info("deleting table: " + table);
118
		admin.deleteTable(table);
119

  
120
		log.info("creating table: " + table);
121
		admin.createTable(desc);
122
	}
123

  
124
	public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
125
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
126

  
127
		return admin.tableExists(table);
128
	}
129

  
130
	public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
131
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
132

  
133
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
134

  
135
		log.info("disabling table: " + table);
136
		admin.disableTable(table);
137

  
138
		log.info("deleting table: " + table);
139
		admin.deleteTable(table);
140
	}
141

  
142
	public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException {
143
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
144

  
145
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
146

  
147
		if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
148

  
149
		final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
150

  
151
		doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
152
	}
153

  
154
	public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
155
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
156

  
157
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
158

  
159
		doCreateTable(clusterName, table, columns, null);
160
	}
161

  
162
	public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions)
163
			throws IOException, HadoopServiceException {
164
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
165

  
166
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
167

  
168
		final HTableDescriptor desc = new HTableDescriptor(table);
169
		for (final String column : columns) {
170
			final HColumnDescriptor hds = new HColumnDescriptor(column);
171
			hds.setMaxVersions(getMaxVersions());
172
			desc.addFamily(hds);
173
		}
174

  
175
		log.info("creating hbase table: " + table);
176

  
177
		if (regions != null && !regions.isEmpty()) {
178
			log.debug(String.format("create using %s regions: %s", regions.size(), regions));
179
			admin.createTable(desc, getSplitKeys(regions));
180
		} else {
181
			admin.createTable(desc);
182
		}
183

  
184
		log.info("created hbase table: [" + table + "]");
185
		log.debug("descriptor: [" + desc.toString() + "]");
186
	}
187

  
188
	private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) {
189
		byte[][] splits = new byte[regions.size() - 1][];
190
		for (int i = 0; i < regions.size() - 1; i++) {
191
			splits[i] = regions.get(i).getEndKey().getBytes();
192
		}
193
		return splits;
194
	}
195

  
196
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
197

  
198
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
199

  
200
		if (!admin.tableExists(table)) {
201
			createTable(clusterName, table, columns);
202
		} else {
203
			final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
204
			final Set<String> foundColumns = Sets.newHashSet(Iterables.transform(Lists.newArrayList(desc.getColumnFamilies()), HadoopUtils.columnName()));
205

  
206
			log.info("ensuring columns on table " + table + ": " + columns);
207
			final List<String> missingColumns = Lists.newArrayList(Iterables.filter(columns, HadoopUtils.columnPredicate(foundColumns)));
208
			if (!missingColumns.isEmpty()) {
209

  
210
				if (admin.isTableEnabled(table)) {
211
					admin.disableTable(table);
212
				}
213

  
214
				for (final String column : missingColumns) {
215
					log.info("hbase table: '" + table + "', adding column: " + column);
216
					admin.addColumn(table, new HColumnDescriptor(column));
217
				}
218

  
219
				admin.enableTable(table);
220
			}
221
		}
222
	}
223

  
224
	public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
225
		final Configuration conf = configurationEnumerator.get(clusterName);
226
		final HTable table = new HTable(conf, tableName);
227

  
228
		try {
229
			table.put(puts);
230
		} finally {
231
			table.flushCommits();
232
			table.close();
233
		}
234
	}
235

  
236
	public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
237
		final Configuration conf = configurationEnumerator.get(clusterName);
238
		final HTable table = new HTable(conf, tableName);
239
		try {
240
			table.delete(deletes);
241
		} finally {
242
			table.flushCommits();
243
			table.close();
244
		}
245
	}
246

  
247
	public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException {
248
		final Configuration conf = configurationEnumerator.get(clusterName);
249
		final HTable table = new HTable(conf, tableName);
250
		try {
251
			for(HBaseRowDescriptor desc : columns) {
252

  
253
				final Delete d = new Delete(Bytes.toBytes(desc.getRowKey()));
254
				d.setWriteToWAL(true);
255
				for(Column c : desc.getColumns()) {
256
					for(String qualifier : c.getQualifier()) {
257
						log.info(String.format("delete from row '%s' cf '%s:%s'", desc.getRowKey(), c.getFamily(), qualifier));
258
						d.deleteColumns(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier));
259
					}
260
				}
261
				table.delete(d);
262
			}
263
		} finally {
264
			table.flushCommits();
265
			table.close();
266
		}
267
	}
268

  
269
	public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException {
270
		final Configuration conf = configurationEnumerator.get(clusterName);
271
		final HTable table = new HTable(conf, tableName);
272
		try {
273
			return table.get(new Get(id));
274
		} finally {
275
			table.close();
276
		}
277
	}
278

  
279
	public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException {
280
		final Map<String, HBaseRowDescriptor> map = Maps.newHashMap();
281
		for(String rowKey : rowKeys) {
282
			map.put(rowKey, describeRow(clusterName, tableName, rowKey));
283
		}
284
		return map;
285
	}
286

  
287
	public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException {
288
		final Configuration conf = configurationEnumerator.get(clusterName);
289
		final HTable table = new HTable(conf, tableName);
290

  
291
		final HBaseRowDescriptor desc = new HBaseRowDescriptor();
292

  
293
		try {
294
			final Result r = table.get(new Get(Bytes.toBytes(rowKey)));
295

  
296
			if (r.isEmpty()) {
297
				return desc;
298
			}
299

  
300
			final List<Column> columns = Lists.newArrayList();
301

  
302
			for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) {
303
				final Set<byte[]> qualifiers = e.getValue().keySet();
304
				final String family = new String(e.getKey());
305
				final Column col = new Column(family);
306

  
307
				for(byte[] q : qualifiers) {
308
					String qs = new String(q);
309
					col.getQualifier().add(qs);
310
				}
311
				columns.add(col);
312
			}
313
			desc.setColumns(columns);
314
			desc.setRowKey(rowKey);
315

  
316
			return desc;
317
		} finally {
318
			table.close();
319
		}
320
	}
321

  
322
	public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException {
323
		final Configuration conf = configurationEnumerator.get(clusterName);
324
		final HTable table = new HTable(conf, tableName);
325
		try {
326
			final ResultScanner rs = table.getScanner(scan);
327
			try {
328
				return Lists.newArrayList(rs.iterator());
329
			} finally {
330
				rs.close();
331
			}
332
		} finally {
333
			table.close();
334
		}
335
	}
336

  
337
	public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException {
338
		if (StringUtils.isBlank(path))
339
			throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path.");
340

  
341
		final Configuration conf = configurationEnumerator.get(clusterName);
342

  
343
		try {
344
			final FileSystem hdfs = FileSystem.get(conf);
345
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
346

  
347
			if (hdfs.exists(absolutePath)) {
348
				log.debug("deleteFromHBase path: " + absolutePath.toString());
349
				hdfs.delete(absolutePath, true);
350
				log.info("deleted path: " + absolutePath.toString());
351
				return true;
352
			} else {
353
				log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString());
354
				return false;
355
			}
356
		} catch (IOException e) {
357
			throw new HadoopServiceException(e);
358
		}
359
	}
360

  
361
	public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException {
362
		if (StringUtils.isBlank(path))
363
			throw new HadoopServiceException("Cannot create an empty HDFS path.");
364

  
365
		final Configuration conf = configurationEnumerator.get(clusterName);
366

  
367
		try {
368
			final FileSystem hdfs = FileSystem.get(conf);
369
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
370
			if (!hdfs.exists(absolutePath)) {
371
				hdfs.mkdirs(absolutePath);
372
				log.info("created path: " + absolutePath.toString());
373
				return true;
374
			} else if (force) {
375
				log.info(String.format("found directory '%s', force delete it", absolutePath.toString()));
376
				hdfs.delete(absolutePath, true);
377

  
378
				hdfs.mkdirs(absolutePath);
379
				log.info("created path: " + absolutePath.toString());
380
				return true;
381
			} else {
382
				log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString()));
383
				return false;
384
			}
385
		} catch (IOException e) {
386
			throw new HadoopServiceException(e);
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff