Project

General

Profile

« Previous | Next » 

Revision 55136

[maven-release-plugin] copy for tag dnet-hadoop-service-2.7.7

View differences:

modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/deploy.info
1
{"type_source": "SVN", "goal": "package -U -T 4C source:jar", "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-hadoop-service/trunk/", "deploy_repository": "dnet45-snapshots", "version": "4", "mail": "sandro.labruzzo@isti.cnr.it,michele.artini@isti.cnr.it, claudio.atzori@isti.cnr.it, alessia.bardi@isti.cnr.it", "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet45-snapshots", "name": "dnet-hadoop-service"}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/test/java/eu/dnetlib/data/hadoop/utils/CopyTableTest.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import static org.junit.Assert.assertNotNull;
4

  
5
import java.io.IOException;
6
import java.util.Map.Entry;
7

  
8
import org.apache.hadoop.conf.Configuration;
9
import org.apache.hadoop.hbase.mapreduce.CopyTable;
10
import org.apache.hadoop.mapreduce.Job;
11
import org.junit.Test;
12

  
13
public class CopyTableTest {
14

  
15
	@Test
16
	public void testCopyTable() throws IOException {
17

  
18
		Job job =
19
				CopyTable.createSubmittableJob(new Configuration(), new String[] { "--peer.adr=server1,server2,server3:2181:/hbase",
20
					"--families=myOldCf:myNewCf,cf2,cf3", "tableName" });
21
		assertNotNull(job);
22

  
23
		Configuration conf = job.getConfiguration();
24

  
25
		for (Entry<String, String> e : conf) {
26
			System.out.println(String.format("<PROPERTY key=\"%s\" value=\"%s\"/>", e.getKey(), e.getValue()));
27
		}
28

  
29
	}
30
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/test/java/eu/dnetlib/data/hadoop/utils/ReadSequenceFileTest.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.util.Map;
4
import java.util.Map.Entry;
5

  
6
import org.apache.commons.math.stat.descriptive.SummaryStatistics;
7
import org.apache.hadoop.conf.Configuration;
8
import org.apache.hadoop.fs.Path;
9
import org.apache.hadoop.io.Text;
10
import org.junit.Before;
11
import org.junit.Ignore;
12
import org.junit.Test;
13
import org.springframework.core.io.ClassPathResource;
14

  
15
import com.google.common.collect.Maps;
16

  
17
import eu.dnetlib.data.hadoop.config.ConfigurationFactory;
18
import eu.dnetlib.data.hadoop.hdfs.SequenceFileUtils;
19
import eu.dnetlib.miscutils.collections.Pair;
20

  
21
public class ReadSequenceFileTest {
22

  
23
	private static final Path SEQUENCE_FILE_PATH = new Path("hdfs://nmis-hadoop-cluster/tmp/indexrecords_db_openaireplus_sesam_SESAMALL.seq");
24
	private final String HADOOP_CONF_FILE = "/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties";
25

  
26
	private Configuration conf;
27

  
28
	@Before
29
	public void setUp() {
30
		final ConfigurationFactory confFactory = new ConfigurationFactory();
31
		confFactory.setDefaults(new ClassPathResource(HADOOP_CONF_FILE));
32
		conf = confFactory.getConfiguration();
33
	}
34

  
35
	@Test
36
	@Ignore
37
	public void testReadSequenceFile() throws Exception {
38
		final SummaryStatistics statsAll = new SummaryStatistics();
39

  
40
		final Map<String, SummaryStatistics> stats = Maps.newHashMap();
41

  
42
		int i = 0;
43
		for (Pair<Text, Text> pair : SequenceFileUtils.read(SEQUENCE_FILE_PATH, conf)) {
44
			final String id = pair.getKey().toString();
45
			final String record = pair.getValue().toString();
46
			final int length = record.getBytes().length;
47

  
48
			final String type = id.substring(0, 2);
49
			if (!stats.containsKey(type)) {
50
				stats.put(type, new SummaryStatistics());
51
			}
52
			statsAll.addValue(length);
53
			stats.get(type).addValue(length);
54

  
55
			if (++i % 10000 == 0) {
56
				System.out.println("Read " + i);
57
			}
58
		}
59

  
60
		printStats("ALL", statsAll);
61
		for (Entry<String, SummaryStatistics> e : stats.entrySet()) {
62
			printStats(e.getKey(), e.getValue());
63
		}
64
	}
65

  
66
	private void printStats(final String type, final SummaryStatistics stats) {
67
		System.out.println("************************************");
68
		System.out.println("Type: " + type);
69
		System.out.println(String.format("\tmin    : %.2f KBytes", stats.getMin() / 1024));
70
		System.out.println(String.format("\tmax    : %.2f KBytes", stats.getMax() / 1024));
71
		System.out.println(String.format("\tavg    : %.2f KBytes", stats.getMean() / 1024));
72
		System.out.println(String.format("\tstdDev : %.2f", stats.getStandardDeviation() / 1024));
73
	}
74
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTestContextConfiguration.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import eu.dnetlib.data.hadoop.HadoopClientMap;
4
import eu.dnetlib.data.hadoop.HadoopServiceCore;
5
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
6
import eu.dnetlib.data.hadoop.config.ConfigurationFactory;
7
import eu.dnetlib.data.hadoop.mapred.JobClientFactory;
8
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory;
9
import org.springframework.context.annotation.Bean;
10
import org.springframework.context.annotation.Configuration;
11
import org.springframework.context.annotation.Profile;
12
import org.springframework.core.io.ClassPathResource;
13
import org.springframework.core.io.Resource;
14

  
15
@Configuration
16
@Profile(value = "test")
17
public class HBaseTestContextConfiguration {
18

  
19
	public static final String ENABLED_CLIENTS = "{"
20
			+ "\"DM\":{\"oozie\":\"false\",\"mapred\":\"false\",\"hbase\":\"true\"},"
21
			+ "\"IIS\":{\"oozie\":\"false\",\"mapred\":\"false\",\"hbase\":\"false\"}"
22
			+ "}";
23

  
24
	public static final int MAX_VERSIONS = 10;
25

  
26
	@Bean
27
	public HadoopServiceCore hadoopServiceCore() {
28
		final HadoopServiceCore core = new HadoopServiceCore();
29

  
30
		core.setMaxVersions(MAX_VERSIONS);
31

  
32
		System.out.println("using hbase max versions: " + MAX_VERSIONS);
33
		return core;
34
	}
35

  
36
	@Bean
37
	public HadoopClientMap hadoopClientMap() throws InterruptedException {
38
		final HadoopClientMap clientMap = new HadoopClientMap();
39
		clientMap.setEnabledClients(ENABLED_CLIENTS);
40

  
41
		return clientMap;
42
	}
43

  
44
	@Bean
45
	public HBaseAdminFactory hBaseAdminFactory() {
46
		return new HBaseAdminFactory();
47
	}
48

  
49
	@Bean
50
	public OozieClientFactory oozieClientFactory() {
51
		return new OozieClientFactory();
52
	}
53

  
54
	@Bean
55
	public JobClientFactory jobClientFactory() {
56
		return new JobClientFactory();
57
	}
58

  
59
	@Bean
60
	public ConfigurationEnumerator configurationEnumerator() {
61
		return new ConfigurationEnumerator();
62
	}
63

  
64
	@Bean
65
	public ConfigurationFactory DM() {
66
		return get(new ClassPathResource("/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties"));
67
	}
68

  
69
	@Bean
70
	public ConfigurationFactory IIS() {
71
		return get(new ClassPathResource("/eu/dnetlib/data/hadoop/config/hadoop-default.iis.icm.properties"));
72
	}
73

  
74
	protected ConfigurationFactory get(final Resource props) {
75
		final ConfigurationFactory configurationFactory = new ConfigurationFactory();
76
		configurationFactory.setDefaults(props);
77
		return configurationFactory;
78
	}
79

  
80
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTest.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.io.IOException;
4
import java.util.Map.Entry;
5
import java.util.NavigableMap;
6
import java.util.Set;
7

  
8
import com.google.common.collect.Sets;
9
import eu.dnetlib.data.hadoop.HadoopServiceCore;
10
import eu.dnetlib.data.hadoop.config.ClusterName;
11
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
12
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
13
import eu.dnetlib.miscutils.datetime.DateUtils;
14
import org.apache.hadoop.hbase.client.Get;
15
import org.apache.hadoop.hbase.client.HTable;
16
import org.apache.hadoop.hbase.client.Put;
17
import org.apache.hadoop.hbase.client.Result;
18
import org.apache.hadoop.hbase.util.Bytes;
19
import org.junit.After;
20
import org.junit.Before;
21
import org.junit.Ignore;
22
import org.junit.Test;
23
import org.junit.runner.RunWith;
24
import org.springframework.beans.factory.annotation.Autowired;
25
import org.springframework.test.context.ActiveProfiles;
26
import org.springframework.test.context.ContextConfiguration;
27
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
28

  
29
import static org.junit.Assert.assertNotNull;
30
import static org.junit.Assert.assertTrue;
31

  
32
@ActiveProfiles("test")
33
@RunWith(SpringJUnit4ClassRunner.class)
34
@ContextConfiguration(classes = HBaseTestContextConfiguration.class)
35
public class HBaseTest {
36

  
37
	protected static final String TEST_TABLE = "dnet_test_table";
38

  
39
	protected static final int NUM_VERSIONS = 10;
40

  
41
	@Autowired
42
	private HadoopServiceCore hadoopServiceCore;
43

  
44
	@Autowired
45
	private ConfigurationEnumerator configurationEnumerator;
46

  
47
	@Before
48
	public void setUp() throws HadoopServiceException, IOException, InterruptedException {
49
		assertNotNull(hadoopServiceCore);
50

  
51
		ensureDropTable();
52
	}
53

  
54
	@After
55
	public void tearDown() throws HadoopServiceException, IOException {
56
		ensureDropTable();
57
	}
58

  
59
	@Test
60
	@Ignore
61
	// TODO allow testing on a dev cluster instance
62
	public void testReadWrite() throws HadoopServiceException, IOException, InterruptedException {
63

  
64
		hadoopServiceCore.createTable(ClusterName.DM, TEST_TABLE, testSchema());
65
		assertTrue(hadoopServiceCore.existTable(ClusterName.DM, TEST_TABLE));
66

  
67
		final HTable htable = new HTable(configurationEnumerator.get(ClusterName.DM), TEST_TABLE);
68

  
69
		final Put put = new Put(Bytes.toBytes("1"));
70

  
71
		for (int i = 0; i < NUM_VERSIONS; i++) {
72
			put.add(Bytes.toBytes("result"), Bytes.toBytes("body"), Bytes.toBytes(i + ""));
73
			htable.put(put);
74
			Thread.sleep(1000);
75
		}
76
		final Get get = new Get(Bytes.toBytes("1"));
77
		get.setMaxVersions(HBaseTestContextConfiguration.MAX_VERSIONS);
78

  
79
		final Result r = htable.get(get);
80

  
81
		// Map<family,Map<qualifier,Map<timestamp,value>>>
82
		final NavigableMap<Long, byte[]> versions = r.getMap().get(Bytes.toBytes("result")).get(Bytes.toBytes("body"));
83

  
84
		for (final Entry<Long, byte[]> e : versions.entrySet()) {
85
			System.out.println("t: " + DateUtils.calculate_ISO8601(e.getKey()) + ", v: " + Bytes.toString(e.getValue()));
86
		}
87

  
88
		htable.close();
89

  
90
	}
91

  
92
	protected void ensureDropTable() throws HadoopServiceException, IOException {
93
		if (hadoopServiceCore.existTable(ClusterName.DM, TEST_TABLE)) {
94
			hadoopServiceCore.dropTable(ClusterName.DM, TEST_TABLE);
95
		}
96
	}
97

  
98
	protected Set<String> testSchema() {
99
		final Set<String> schema = Sets.newHashSet();
100

  
101
		schema.add("result");
102

  
103
		return schema;
104
	}
105

  
106
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/test/resources/log4j.properties
1
### Root Level ###
2
log4j.rootLogger=WARN, LOGFILE, CONSOLE
3

  
4
### Configuration for the LOGFILE appender ###
5
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender
6
log4j.appender.LOGFILE.MaxFileSize=25MB
7
log4j.appender.LOGFILE.MaxBackupIndex=10
8
log4j.appender.LOGFILE.File=logs/dnet.log
9
log4j.appender.LOGFILE.Append=true
10
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
11
log4j.appender.LOGFILE.layout.ConversionPattern=[%-5p] %d %c - %m%n
12

  
13
### Configuration for the CONSOLE appender ###
14
log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
15
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
16
log4j.appender.CONSOLE.layout.ConversionPattern=[%-5p] %d %c - %m%n
17

  
18
org.apache.cxf.Logger=org.apache.cxf.common.logging.Log4jLogger
19

  
20
### Application Level ###
21
log4j.logger.eu.dnetlib=INFO
22
log4j.logger.eu.dnetlib.enabling.is.sn=INFO
23
log4j.logger.org.apache.cxf.interceptor=FATAL
24
log4j.logger.org.apache.cxf.ws.addressing.ContextUtils=FATAL
25
log4j.logger.eu.dnetlib.enabling.tools.AbstractBaseService=INFO
26
log4j.logger.eu.dnetlib.enabling.inspector=DEBUG
27
log4j.logger.eu.dnetlib.xml.database.LoggingTrigger=WARN
28
log4j.logger.eu.dnetlib.enabling.tools.registration.ServiceRegistrator=INFO
29
log4j.logger.eu.dnetlib.enabling.inspector=FATAL
30
log4j.logger.eu.dnetlib.enabling.inspector.SubscriptionController=DEBUG
31
log4j.logger.eu.dnetlib.springutils.stringtemplate.StringTemplateViewResolver=FATAL             
32
log4j.logger.eu.dnetlib.enabling.is.sn.SynchronousNotificationSenderImpl=WARN
33
log4j.logger.eu.dnetlib.enabling.tools.LocalServiceResolverImpl=WARN
34
log4j.logger.eu.dnetlib.enabling.is.sn.NotificationInvokerImpl=WARN
35
log4j.logger.eu.dnetlib.data.collective=INFO
36
log4j.logger.eu.dnetlib.data.hadoop.utils.ScanFactory=DEBUG
37
log4j.logger.org.apache.xerces.parsers.SAXParser=OFF
38
log4j.logger.eu.dnetlib.conf.PropertyFetcher=WARN
39
#log4j.logger.eu.dnetlib.data.transform.XsltRowTransformerFactory=DEBUG
40

  
41
log4j.logger.eu.dnetlib.enabling.is.sn.ISSNServiceImpl=OFF
42
log4j.logger.eu.dnetlib.enabling.datasources.DatasourceManagerClients=FATAL
43
log4j.logger.eu.dnetlib.data.mdstore.modular.mongodb.utils.MetadataCheckJob=DEBUG
44
log4j.logger.eu.dnetlib.enabling.is.sn.ISSNServiceCore=WARN
45
log4j.logger.eu.dnetlib.xml.database.exist.ExistDatabase=WARN
46
log4j.logger.eu.dnetlib.enabling.is.store.AbstractContentInitializer=FATAL
47

  
48
log4j.logger.org.apache.hadoop.hbase.mapreduce.TableInputFormatBase=FATAL
49

  
50
### Spring ###
51
log4j.logger.org.springframework=ERROR
52

  
53

  
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/utils/ScanProperties.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.util.Set;
4

  
5
import org.apache.hadoop.hbase.filter.FilterList;
6
import org.apache.hadoop.hbase.filter.FilterList.Operator;
7

  
8
import com.google.common.collect.Sets;
9

  
10
public class ScanProperties {
11

  
12
	private static final Operator DEFAULT_OPERATOR = Operator.MUST_PASS_ALL;
13

  
14
	private int caching = 100;
15
	private FilterList filterList;
16
	private Set<String> families = Sets.newHashSet();
17

  
18
	public ScanProperties(final String op) {
19
		Operator operator = DEFAULT_OPERATOR;
20
		if ((op != null) && !op.isEmpty()) {
21
			operator = Operator.valueOf(op);
22
		}
23
		filterList = new FilterList(operator);
24
	}
25

  
26
	public FilterList getFilterList() {
27
		return filterList;
28
	}
29

  
30
	public void setFilterList(final FilterList filterList) {
31
		this.filterList = filterList;
32
	}
33

  
34
	public Set<String> getFamilies() {
35
		return families;
36
	}
37

  
38
	public void setFamilies(final Set<String> families) {
39
		this.families = families;
40
	}
41

  
42
	public int getCaching() {
43
		return caching;
44
	}
45

  
46
	public void setCaching(int caching) {
47
		this.caching = caching;
48
	}
49
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/utils/HadoopUtils.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import eu.dnetlib.data.hadoop.HadoopJob;
4
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
5
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

  
9
public class HadoopUtils {
10

  
11
	private static final Log log = LogFactory.getLog(HadoopUtils.class);
12

  
13
	public static java.util.function.Function<HadoopJob, HadoopJobDescriptor> asDescriptor() {
14
		return d -> {
15
			try {
16
				return d.asDescriptor();
17
			} catch (HadoopServiceException e) {
18
				log.warn(e);
19
				return null;
20
			}
21
		};
22
	}
23

  
24
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/utils/JobProfile.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.util.Map;
4
import java.util.Set;
5

  
6
import com.google.common.collect.Maps;
7
import com.google.common.collect.Sets;
8

  
9
import eu.dnetlib.data.hadoop.rmi.HadoopJobType;
10

  
11
public class JobProfile {
12

  
13
	private final Map<String, String> jobDefinition = Maps.newHashMap();
14
	private final Set<String> requiredParams = Sets.newHashSet();
15
	private ScanProperties scanProperties;
16

  
17
	private String name;
18

  
19
	private String description = "";
20

  
21
	private HadoopJobType jobType;
22

  
23
	public boolean isEmpty() {
24
		return getJobDefinition().isEmpty();
25
	}
26

  
27
	public Map<String, String> getJobDefinition() {
28
		return jobDefinition;
29
	}
30

  
31
	public Set<String> getRequiredParams() {
32
		return requiredParams;
33
	}
34

  
35
	public String getName() {
36
		return name;
37
	}
38

  
39
	public void setName(String name) {
40
		this.name = name;
41
	}
42

  
43
	public String getDescription() {
44
		return description;
45
	}
46

  
47
	public void setDescription(String description) {
48
		this.description = description;
49
	}
50

  
51
	public ScanProperties getScanProperties() {
52
		return scanProperties;
53
	}
54

  
55
	public void setScanProperties(ScanProperties scanProperties) {
56
		this.scanProperties = scanProperties;
57
	}
58

  
59
	public HadoopJobType getJobType() {
60
		return jobType;
61
	}
62

  
63
	public void setJobType(HadoopJobType jobType) {
64
		this.jobType = jobType;
65
	}
66

  
67
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/utils/ScanFactory.java
1
package eu.dnetlib.data.hadoop.utils;
2

  
3
import java.io.ByteArrayOutputStream;
4
import java.io.DataOutputStream;
5
import java.io.IOException;
6
import java.util.Map;
7

  
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.hbase.client.Scan;
12
import org.apache.hadoop.hbase.filter.PrefixFilter;
13
import org.apache.hadoop.hbase.util.Base64;
14
import org.dom4j.Document;
15
import org.dom4j.Node;
16

  
17
public class ScanFactory {
18

  
19
	private static final Log log = LogFactory.getLog(ScanFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
20

  
21
	public static String getScan(final ScanProperties scanProperties) throws IOException {
22
		Scan scan = new Scan();
23

  
24
		scan.setCaching(scanProperties.getCaching());
25
		scan.setCacheBlocks(false); // don't set to true for MR jobs
26

  
27
		scan.setFilter(scanProperties.getFilterList());
28
		for (String family : scanProperties.getFamilies()) {
29
			scan.addFamily(family.getBytes());
30
		}
31

  
32
		log.debug("serializing scan");
33
		return convertScanToString(scan);
34
	}
35

  
36
	public static ScanProperties parseScanProperties(final Document doc, final Map<String, String> bbParams) {
37
		log.debug("setting job scanner");
38

  
39
		ScanProperties scanProperties = new ScanProperties(doc.valueOf("//FILTERS/@operator"));
40

  
41
		String caching = doc.valueOf("//SCAN/@caching");
42
		if (!StringUtils.isBlank(caching)) {
43
			log.info("overriding default scan caching with: " + caching);
44
			scanProperties.setCaching(Integer.valueOf(caching));
45
		}
46

  
47
		for (Object o : doc.selectNodes("//SCAN/FAMILIES/FAMILY")) {
48
			Node node = (Node) o;
49
			String value = node.valueOf("./@value");
50
			if ((value == null) || value.isEmpty()) {
51
				value = bbParams.get(node.valueOf("./@param"));
52
			}
53
			log.debug("scanner family value: " + value);
54
			scanProperties.getFamilies().add(value);
55
		}
56
		for (Object o : doc.selectNodes("//SCAN/FILTERS/FILTER")) {
57
			Node node = (Node) o;
58
			String filterType = node.valueOf("./@type");
59

  
60
			String value = node.valueOf("./@value");
61
			if ((value == null) || value.isEmpty()) {
62
				value = bbParams.get(node.valueOf("./@param"));
63
			}
64

  
65
			if (filterType.equals("prefix")) {
66
				log.debug("scanner prefix filter, value: " + value);
67
				scanProperties.getFilterList().addFilter(new PrefixFilter(value.getBytes()));
68
			} // TODO add more filterType cases here
69
		}
70
		return scanProperties;
71
	}
72

  
73
	/**
74
	 * Writes the given scan into a Base64 encoded string.
75
	 *
76
	 * @param scan
77
	 *            The scan to write out.
78
	 * @return The scan saved in a Base64 encoded string.
79
	 * @throws IOException
80
	 *             When writing the scan fails.
81
	 */
82
	private static String convertScanToString(final Scan scan) throws IOException {
83
		ByteArrayOutputStream out = new ByteArrayOutputStream();
84
		DataOutputStream dos = new DataOutputStream(out);
85
		scan.write(dos);
86
		return Base64.encodeBytes(out.toByteArray());
87
	}
88

  
89
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseDeleteFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.util.List;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.hbase.client.Delete;
8
import org.apache.hadoop.hbase.client.Mutation;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
import eu.dnetlib.data.transform.Column;
12
import eu.dnetlib.data.transform.Row;
13

  
14
/**
15
 * The Class HBaseDeleteFeeder performs a batch of Delete operations.
16
 */
17
public class HBaseDeleteFeeder extends HbaseTableFeeder {
18

  
19
	/**
20
	 * Logger.
21
	 */
22
	private static final Log log = LogFactory.getLog(HBaseDeleteFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	/*
25
	 * (non-Javadoc)
26
	 *
27
	 * @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row)
28
	 */
29
	@Override
30
	protected void addOperation(final List<Mutation> buffer, final Row row) {
31
		final Delete delete = new Delete(Bytes.toBytes(row.getKey()));
32
		delete.setWriteToWAL(true);
33

  
34
		for (final Column<String, byte[]> col : row) {
35
			log.debug(String.format("deleting K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName()));
36
			delete.deleteColumns(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName()));
37
		}
38

  
39
		buffer.add(delete);
40
	}
41

  
42
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseAdminFactory.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import eu.dnetlib.data.hadoop.AbstractHadoopClient;
4
import eu.dnetlib.data.hadoop.config.ClusterName;
5
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.apache.hadoop.hbase.client.HBaseAdmin;
9

  
10
public class HBaseAdminFactory extends AbstractHadoopClient {
11

  
12
	private static final Log log = LogFactory.getLog(HBaseAdminFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
13

  
14
	public HBaseAdmin newInstance(final ClusterName clusterName) throws HadoopServiceException {
15
		try {
16
			log.info("init hbaseAdmin, cluster: " + clusterName.toString());
17
			setHadoopUser();
18
			return new HBaseAdmin(configurationEnumerator.get(clusterName));
19
		} catch (final Throwable e) {
20
			throw new HadoopServiceException("unable to initialize hbase client for cluster: " + clusterName.toString(), e);
21
		}
22
	}
23

  
24
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/hbase/HbaseTableFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.List;
6

  
7
import com.google.common.base.Predicates;
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Lists;
10
import eu.dnetlib.data.hadoop.config.ClusterName;
11
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
12
import eu.dnetlib.data.transform.Row;
13
import eu.dnetlib.data.transform.XsltRowTransformerFactory;
14
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17
import org.apache.hadoop.conf.Configuration;
18
import org.apache.hadoop.hbase.client.HTable;
19
import org.apache.hadoop.hbase.client.Mutation;
20
import org.springframework.beans.factory.annotation.Autowired;
21
import org.springframework.beans.factory.annotation.Required;
22

  
23
/**
24
 * The Class HbaseTableFeeder provides abstraction to ship batch operation on an HBase table
25
 */
26
public abstract class HbaseTableFeeder {
27

  
28
	/**
29
	 * The Constant log.
30
	 */
31
	private static final Log log = LogFactory.getLog(HbaseTableFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
32
	/**
33
	 * The configuration enumerator.
34
	 */
35
	@Autowired
36
	protected ConfigurationEnumerator configurationEnumerator;
37
	/**
38
	 * The batch size.
39
	 */
40
	private int batchSize = 100;
41
	/**
42
	 * The result set client factory.
43
	 */
44
	private ResultSetClientFactory resultSetClientFactory;
45

  
46
	/**
47
	 * Adds the operation.
48
	 *
49
	 * @param buffer the buffer
50
	 * @param row    the row
51
	 */
52
	protected abstract void addOperation(final List<Mutation> buffer, final Row row);
53

  
54
	/**
55
	 * Feed.
56
	 *
57
	 * @param epr         the epr
58
	 * @param xsl         the xsl
59
	 * @param clusterName the cluster name
60
	 * @param tableName   the table name
61
	 * @param simulation  the simulation
62
	 * @return the int
63
	 * @throws IOException          Signals that an I/O exception has occurred.
64
	 * @throws InterruptedException the interrupted exception
65
	 */
66
	public int feed(final String epr, final String xsl, final ClusterName clusterName, final String tableName, final boolean simulation)
67
			throws IOException, InterruptedException {
68
		return doWrite(asRows(epr, xsl), getConf(clusterName), tableName, simulation);
69
	}
70

  
71
	/**
72
	 * Do writeOnHBase.
73
	 *
74
	 * @param rows          the rows
75
	 * @param configuration the configuration
76
	 * @param tableName     the table name
77
	 * @param simulation    the simulation
78
	 * @return the int
79
	 * @throws IOException          Signals that an I/O exception has occurred.
80
	 * @throws InterruptedException the interrupted exception
81
	 */
82
	private int doWrite(final Iterable<Row> rows, final Configuration configuration, final String tableName, final boolean simulation)
83
			throws IOException, InterruptedException {
84
		final List<Mutation> buffer = Lists.newArrayList();
85

  
86
		int count = 0;
87
		if (simulation) {
88
			log.info("running in simulation mode ...");
89
			log.info(String.format("... simulated import of %d records", Iterables.size(rows)));
90
		} else {
91

  
92
			final HTable htable = new HTable(configuration, tableName);
93
			try {
94
				int i = 0;
95
				for (final Row row : rows) {
96
					addOperation(buffer, row);
97
					if ((++i % getBatchSize()) == 0) {
98
						flush(tableName, buffer, htable);
99
						count += buffer.size();
100
						buffer.clear();
101
					}
102
				}
103
			} finally {
104
				if (!buffer.isEmpty()) {
105
					flush(tableName, buffer, htable);
106
					count += buffer.size();
107
				}
108
				htable.flushCommits();
109
				htable.close();
110
			}
111
		}
112
		return count;
113
	}
114

  
115
	private void flush(final String tableName, final List<Mutation> buffer, final HTable htable) throws IOException, InterruptedException {
116
		if (!checkOp(htable.batch(buffer), tableName)) throw new IOException("unable to flush operation on HBase table: " + tableName);
117
	}
118

  
119
	private boolean checkOp(final Object[] res, final String tableName) throws IOException {
120
		return Iterables.all(Arrays.asList(res), Predicates.notNull());
121
	}
122

  
123
	/**
124
	 * As rows.
125
	 *
126
	 * @param epr the epr
127
	 * @param xsl the xsl
128
	 * @return the iterable
129
	 */
130
	protected Iterable<Row> asRows(final String epr, final String xsl) {
131
		return Iterables.concat(Iterables.transform(getResultSetClientFactory().getClient(epr), XsltRowTransformerFactory.newInstance(xsl)));
132
	}
133

  
134
	/**
135
	 * Gets the conf.
136
	 *
137
	 * @param clusterName the cluster name
138
	 * @return the conf
139
	 */
140
	protected Configuration getConf(final ClusterName clusterName) {
141
		return configurationEnumerator.get(clusterName);
142
	}
143

  
144
	/**
145
	 * Gets the batch size.
146
	 *
147
	 * @return the batch size
148
	 */
149
	public int getBatchSize() {
150
		return batchSize;
151
	}
152

  
153
	/**
154
	 * Sets the batch size.
155
	 *
156
	 * @param batchSize the new batch size
157
	 */
158
	public void setBatchSize(final int batchSize) {
159
		this.batchSize = batchSize;
160
	}
161

  
162
	/**
163
	 * Gets the result set client factory.
164
	 *
165
	 * @return the result set client factory
166
	 */
167
	public ResultSetClientFactory getResultSetClientFactory() {
168
		return resultSetClientFactory;
169
	}
170

  
171
	/**
172
	 * Sets the result set client factory.
173
	 *
174
	 * @param resultSetClientFactory the new result set client factory
175
	 */
176
	@Required
177
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
178
		this.resultSetClientFactory = resultSetClientFactory;
179
	}
180

  
181
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/hbase/HBasePutFeeder.java
1
package eu.dnetlib.data.hadoop.hbase;
2

  
3
import java.util.List;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.hbase.client.Mutation;
8
import org.apache.hadoop.hbase.client.Put;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
import eu.dnetlib.data.transform.Column;
12
import eu.dnetlib.data.transform.Row;
13

  
14
/**
15
 * The Class HBasePutFeeder performs a batch of Put operations..
16
 */
17
public class HBasePutFeeder extends HbaseTableFeeder {
18

  
19
	/**
20
	 * Logger.
21
	 */
22
	private static final Log log = LogFactory.getLog(HBasePutFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	/*
25
	 * (non-Javadoc)
26
	 *
27
	 * @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row)
28
	 */
29
	@Override
30
	protected void addOperation(final List<Mutation> buffer, final Row row) {
31
		final Put put = new Put(Bytes.toBytes(row.getKey()));
32
		put.setWriteToWAL(true);
33

  
34
		for (final Column<String, byte[]> col : row) {
35
			log.debug(String.format("adding value to K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName()));
36
			put.add(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName()), col.getValue());
37
		}
38

  
39
		buffer.add(put);
40
	}
41

  
42
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/JobRegistry.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.util.Date;
4
import java.util.List;
5
import java.util.Map.Entry;
6
import java.util.Objects;
7
import java.util.stream.Collectors;
8

  
9
import com.google.common.collect.BiMap;
10
import com.google.common.collect.HashBiMap;
11
import eu.dnetlib.data.hadoop.HadoopJob.Status;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.springframework.beans.factory.annotation.Required;
19

  
20
public class JobRegistry {
21

  
22
	private static final Log log = LogFactory.getLog(JobRegistry.class); // NOPMD by marko on 11/24/08 5:02 PM
23

  
24
	private int maxJobs;
25

  
26
	private final BiMap<String, HadoopJob> jobs = HashBiMap.create();
27

  
28
	public String registerJob(HadoopJob hadoopJob) throws HadoopServiceException {
29

  
30
		if (jobs.containsValue(hadoopJob)) { return jobs.inverse().get(hadoopJob); }
31

  
32
		if (jobs.size() > getMaxJobs()) {
33
			removeOldestProcess();
34
		}
35

  
36
		jobs.put(hadoopJob.getId(), hadoopJob);
37
		log.info("Registered hadoop job " + hadoopJob.getId());
38
		hadoopJob.startMonitor();
39

  
40
		return hadoopJob.getId();
41
	}
42

  
43
	public Status getJobStatus(String id) {
44
		return findJob(id).getStatus();
45
	}
46

  
47
	public HadoopJob findJob(String id) {
48
		return jobs.get(id);
49
	}
50

  
51
	public void unregisterJob(String id) throws HadoopServiceException {
52

  
53
		if (!jobs.containsKey(id)) { throw new HadoopServiceException("unable to unregister job, could not find jobId in registry: " + id); }
54

  
55
		log.info("unregistering job: " + id);
56
		jobs.get(id).getJobMonitor().kill();
57
		jobs.remove(id);
58
	}
59

  
60
	private void removeOldestProcess() throws HadoopServiceException {
61
		Date oldDate = new Date();
62
		String oldId = null;
63

  
64
		for (Entry<String, HadoopJob> e : jobs.entrySet()) {
65
			final HadoopJob hadoopJob = e.getValue();
66

  
67
			if (hadoopJob.isComplete()) {
68
				final Date date = hadoopJob.getLastActivity();
69
				if (date.before(oldDate)) {
70
					oldDate = date;
71
					oldId = e.getKey();
72
				}
73
			}
74
		}
75

  
76
		if (oldId != null) {
77
			unregisterJob(oldId);
78
		}
79

  
80
	}
81

  
82
	public List<HadoopJobDescriptor> listJobs(ClusterName clusterName) {
83
		return jobs.values().stream()
84
				.filter(j -> clusterName.equals(j.getClusterName()))
85
				.map(HadoopUtils.asDescriptor())
86
				.filter(Objects::nonNull)
87
				.collect(Collectors.toList());
88
	}
89

  
90
	@Required
91
	public void setMaxJobs(final int maxJobs) {
92
		this.maxJobs = maxJobs;
93
	}
94

  
95
	public int getMaxJobs() {
96
		return maxJobs;
97
	}
98

  
99
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.io.IOException;
4
import java.net.URI;
5
import java.util.*;
6
import java.util.Map.Entry;
7
import java.util.stream.Collectors;
8

  
9
import com.google.common.collect.Lists;
10
import com.google.common.collect.Maps;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.data.hadoop.rmi.hbase.Column;
16
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
17
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableDescriptor;
18
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableRegionInfo;
19
import org.apache.commons.lang.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.hadoop.conf.Configuration;
23
import org.apache.hadoop.fs.FileSystem;
24
import org.apache.hadoop.fs.Path;
25
import org.apache.hadoop.hbase.HColumnDescriptor;
26
import org.apache.hadoop.hbase.HRegionInfo;
27
import org.apache.hadoop.hbase.HTableDescriptor;
28
import org.apache.hadoop.hbase.client.*;
29
import org.apache.hadoop.hbase.util.Bytes;
30
import org.springframework.beans.factory.annotation.Autowired;
31
import org.springframework.beans.factory.annotation.Required;
32

  
33
public class HadoopServiceCore {
34

  
35
	private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM
36
	@Autowired
37
	protected ConfigurationEnumerator configurationEnumerator;
38
	@Autowired
39
	private HadoopClientMap clients;
40
	private int maxVersions;
41

  
42
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
43
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
44
			return Arrays.asList(admin.listTables())
45
					.stream()
46
					.map(HTableDescriptor::getNameAsString)
47
					.collect(Collectors.toList());
48
		}
49
	}
50

  
51
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
52
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
53

  
54
			if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
55

  
56
			if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
57

  
58
			final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
59

  
60
			final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
61

  
62
			final Set<String> columns = Sets.newHashSet();
63

  
64
			for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) {
65
				columns.add(hColDesc.getNameAsString());
66
			}
67

  
68
			HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
69
			htDescriptor.setColumns(columns);
70

  
71
			List<HBaseTableRegionInfo> regions = Lists.newArrayList();
72

  
73
			for (HRegionInfo info : tableRegions) {
74
				regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
75
			}
76
			htDescriptor.setRegions(regions);
77

  
78
			if (log.isDebugEnabled()) {
79
				log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
80
			}
81

  
82
			return htDescriptor.toString();
83
		}
84
	}
85

  
86
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
87
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
88
			final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
89
			return desc.getFamilies().stream()
90
					.map(d -> d.getNameAsString())
91
					.collect(Collectors.toList());
92
		}
93
	}
94

  
95
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
96
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
97

  
98
			if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
99

  
100
			final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
101

  
102
			log.info("disabling table: " + table);
103
			admin.disableTable(table);
104

  
105
			log.info("deleting table: " + table);
106
			admin.deleteTable(table);
107

  
108
			log.info("creating table: " + table);
109
			admin.createTable(desc);
110
		}
111
	}
112

  
113
	public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
114
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
115

  
116
			return admin.tableExists(table);
117
		}
118
	}
119

  
120
	public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
121
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
122

  
123
			if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
124

  
125
			log.info("disabling table: " + table);
126
			admin.disableTable(table);
127

  
128
			log.info("deleting table: " + table);
129
			admin.deleteTable(table);
130
		}
131
	}
132

  
133
	public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException {
134
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
135

  
136
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
137

  
138
			if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
139

  
140
			final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
141

  
142
			doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
143
		}
144
	}
145

  
146
	public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
147
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
148

  
149
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
150

  
151
			doCreateTable(clusterName, table, columns, null);
152
		}
153
	}
154

  
155
	public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions)
156
			throws IOException, HadoopServiceException {
157
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
158

  
159
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
160

  
161
			final HTableDescriptor desc = new HTableDescriptor(table);
162
			for (final String column : columns) {
163
				final HColumnDescriptor hds = new HColumnDescriptor(column);
164
				hds.setMaxVersions(getMaxVersions());
165
				desc.addFamily(hds);
166
			}
167

  
168
			log.info("creating hbase table: " + table);
169

  
170
			if (regions != null && !regions.isEmpty()) {
171
				log.debug(String.format("create using %s regions: %s", regions.size(), regions));
172
				admin.createTable(desc, getSplitKeys(regions));
173
			} else {
174
				admin.createTable(desc);
175
			}
176

  
177
			log.info("created hbase table: [" + table + "]");
178
			log.debug("descriptor: [" + desc.toString() + "]");
179
		}
180
	}
181

  
182
	private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) {
183
		byte[][] splits = new byte[regions.size() - 1][];
184
		for (int i = 0; i < regions.size() - 1; i++) {
185
			splits[i] = regions.get(i).getEndKey().getBytes();
186
		}
187
		return splits;
188
	}
189

  
190
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
191
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
192

  
193
			if (!admin.tableExists(table)) {
194
				createTable(clusterName, table, columns);
195
			} else {
196
				final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
197

  
198
				final Set<String> foundColumns = desc.getFamilies().stream()
199
						.map(d -> d.getNameAsString())
200
						.collect(Collectors.toCollection(HashSet::new));
201

  
202
				log.info("ensuring columns on table " + table + ": " + columns);
203
				final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
204
				if (!missingColumns.isEmpty()) {
205

  
206
					if (admin.isTableEnabled(table)) {
207
						admin.disableTable(table);
208
					}
209

  
210
					for (final String column : missingColumns) {
211
						log.info("hbase table: '" + table + "', adding column: " + column);
212
						admin.addColumn(table, new HColumnDescriptor(column));
213
					}
214

  
215
					admin.enableTable(table);
216
				}
217
			}
218
		}
219
	}
220

  
221
	public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException {
222
		final Configuration conf = configurationEnumerator.get(clusterName);
223
		final HTable table = new HTable(conf, tableName);
224

  
225
		try {
226
			table.put(puts);
227
		} finally {
228
			table.flushCommits();
229
			table.close();
230
		}
231
	}
232

  
233
	public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException {
234
		final Configuration conf = configurationEnumerator.get(clusterName);
235
		final HTable table = new HTable(conf, tableName);
236
		try {
237
			table.delete(deletes);
238
		} finally {
239
			table.flushCommits();
240
			table.close();
241
		}
242
	}
243

  
244
	public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException {
245
		final Configuration conf = configurationEnumerator.get(clusterName);
246
		final HTable table = new HTable(conf, tableName);
247
		try {
248
			for(HBaseRowDescriptor desc : columns) {
249

  
250
				final Delete d = new Delete(Bytes.toBytes(desc.getRowKey()));
251
				d.setWriteToWAL(true);
252
				for(Column c : desc.getColumns()) {
253
					for(String qualifier : c.getQualifier()) {
254
						log.info(String.format("delete from row '%s' cf '%s:%s'", desc.getRowKey(), c.getFamily(), qualifier));
255
						d.deleteColumns(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier));
256
					}
257
				}
258
				table.delete(d);
259
			}
260
		} finally {
261
			table.flushCommits();
262
			table.close();
263
		}
264
	}
265

  
266
	public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException {
267
		final Configuration conf = configurationEnumerator.get(clusterName);
268
		final HTable table = new HTable(conf, tableName);
269
		try {
270
			return table.get(new Get(id));
271
		} finally {
272
			table.close();
273
		}
274
	}
275

  
276
	public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException {
277
		final Map<String, HBaseRowDescriptor> map = Maps.newHashMap();
278
		for(String rowKey : rowKeys) {
279
			map.put(rowKey, describeRow(clusterName, tableName, rowKey));
280
		}
281
		return map;
282
	}
283

  
284
	public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException {
285
		final Configuration conf = configurationEnumerator.get(clusterName);
286
		final HTable table = new HTable(conf, tableName);
287

  
288
		final HBaseRowDescriptor desc = new HBaseRowDescriptor();
289

  
290
		try {
291
			final Result r = table.get(new Get(Bytes.toBytes(rowKey)));
292

  
293
			if (r.isEmpty()) {
294
				return desc;
295
			}
296

  
297
			final List<Column> columns = Lists.newArrayList();
298

  
299
			for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) {
300
				final Set<byte[]> qualifiers = e.getValue().keySet();
301
				final String family = new String(e.getKey());
302
				final Column col = new Column(family);
303

  
304
				for(byte[] q : qualifiers) {
305
					String qs = new String(q);
306
					col.getQualifier().add(qs);
307
				}
308
				columns.add(col);
309
			}
310
			desc.setColumns(columns);
311
			desc.setRowKey(rowKey);
312

  
313
			return desc;
314
		} finally {
315
			table.close();
316
		}
317
	}
318

  
319
	public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException {
320
		final Configuration conf = configurationEnumerator.get(clusterName);
321
		try(final HTable table = new HTable(conf, tableName)) {
322
			final ResultScanner rs = table.getScanner(scan);
323
			try {
324
				return Lists.newArrayList(rs.iterator());
325
			} finally {
326
				rs.close();
327
			}
328
		}
329
	}
330

  
331
	public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException {
332
		if (StringUtils.isBlank(path))
333
			throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path.");
334

  
335
		final Configuration conf = configurationEnumerator.get(clusterName);
336

  
337
		try(final FileSystem hdfs = FileSystem.get(conf)) {
338
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
339

  
340
			if (hdfs.exists(absolutePath)) {
341
				log.debug("deleteFromHBase path: " + absolutePath.toString());
342
				hdfs.delete(absolutePath, true);
343
				log.info("deleted path: " + absolutePath.toString());
344
				return true;
345
			} else {
346
				log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString());
347
				return false;
348
			}
349
		} catch (IOException e) {
350
			throw new HadoopServiceException(e);
351
		}
352
	}
353

  
354
	public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException {
355
		if (StringUtils.isBlank(path))
356
			throw new HadoopServiceException("Cannot create an empty HDFS path.");
357

  
358
		final Configuration conf = configurationEnumerator.get(clusterName);
359

  
360
		try(final FileSystem hdfs = FileSystem.get(conf)) {
361
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
362
			if (!hdfs.exists(absolutePath)) {
363
				hdfs.mkdirs(absolutePath);
364
				log.info("created path: " + absolutePath.toString());
365
				return true;
366
			} else if (force) {
367
				log.info(String.format("found directory '%s', force delete it", absolutePath.toString()));
368
				hdfs.delete(absolutePath, true);
369

  
370
				hdfs.mkdirs(absolutePath);
371
				log.info("created path: " + absolutePath.toString());
372
				return true;
373
			} else {
374
				log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString()));
375
				return false;
376
			}
377
		} catch (IOException e) {
378
			throw new HadoopServiceException(e);
379
		}
380
	}
381

  
382
	public boolean existHdfsPath(final ClusterName clusterName, final String path) throws HadoopServiceException {
383
		if (StringUtils.isBlank(path))
384
			throw new HadoopServiceException("invalid empty path");
385

  
386
		final Configuration conf = configurationEnumerator.get(clusterName);
387
		try(final FileSystem hdfs = FileSystem.get(conf)) {
388
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
389
			return hdfs.exists(absolutePath);
390
		} catch (IOException e) {
391
			throw new HadoopServiceException(e);
392
		}
393
	}
394

  
395
	public Configuration getClusterConiguration(final ClusterName clusterName) {
396
		return configurationEnumerator.get(clusterName);
397
	}
398

  
399
	public int getMaxVersions() {
400
		return maxVersions;
401
	}
402

  
403
	@Required
404
	public void setMaxVersions(final int maxVersions) {
405
		this.maxVersions = maxVersions;
406
	}
407

  
408
	public HadoopClientMap getClients() {
409
		return clients;
410
	}
411

  
412
}
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.7.7/src/main/java/eu/dnetlib/data/hadoop/HadoopJob.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import java.util.Date;
4
import java.util.concurrent.Executor;
5
import java.util.concurrent.Executors;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9

  
10
import eu.dnetlib.data.hadoop.action.JobMonitor;
11
import eu.dnetlib.data.hadoop.config.ClusterName;
12
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
13
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
14
import eu.dnetlib.data.hadoop.utils.JobProfile;
15

  
16
public class HadoopJob {
17

  
18
	private static final Log log = LogFactory.getLog(HadoopJob.class); // NOPMD by marko on 11/24/08 5:02 PM
19

  
20
	/**
21
	 * Defines the possible stati of an hadoop job.
22
	 */
23
	public static enum Status {
24
		PREP, RUNNING, SUCCEEDED, KILLED, FAILED, SUSPENDED, UNKNOWN
25
	}
26

  
27
	private final Executor executor = Executors.newSingleThreadExecutor();
28

  
29
	private final JobMonitor jobMonitor;
30

  
31
	private final JobProfile jobProfile;
32

  
33
	private final ClusterName clusterName;
34

  
35
	private final String id;
36

  
37
	public static HadoopJob newInstance(String id, ClusterName clusterName, JobProfile profile, JobMonitor jobMonitor) {
38
		return new HadoopJob(id, clusterName, profile, jobMonitor);
39
	}
40

  
41
	private HadoopJob(String id, ClusterName clusterName, JobProfile jobProfile, JobMonitor jobMonitor) {
42
		super();
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff