Project

General

Profile

1
package eu.dnetlib.data.hadoop.hbase;
2

    
3
import java.io.IOException;
4
import java.util.Map.Entry;
5
import java.util.NavigableMap;
6
import java.util.Set;
7

    
8
import com.google.common.collect.Sets;
9
import eu.dnetlib.data.hadoop.HadoopServiceCore;
10
import eu.dnetlib.data.hadoop.config.ClusterName;
11
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
12
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
13
import eu.dnetlib.miscutils.datetime.DateUtils;
14
import org.apache.hadoop.hbase.client.Get;
15
import org.apache.hadoop.hbase.client.HTable;
16
import org.apache.hadoop.hbase.client.Put;
17
import org.apache.hadoop.hbase.client.Result;
18
import org.apache.hadoop.hbase.util.Bytes;
19
import org.junit.After;
20
import org.junit.Before;
21
import org.junit.Ignore;
22
import org.junit.Test;
23
import org.junit.runner.RunWith;
24
import org.springframework.beans.factory.annotation.Autowired;
25
import org.springframework.test.context.ActiveProfiles;
26
import org.springframework.test.context.ContextConfiguration;
27
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
28

    
29
import static org.junit.Assert.assertNotNull;
30
import static org.junit.Assert.assertTrue;
31

    
32
@ActiveProfiles("test")
33
@RunWith(SpringJUnit4ClassRunner.class)
34
@ContextConfiguration(classes = HBaseTestContextConfiguration.class)
35
public class HBaseTest {
36

    
37
	protected static final String TEST_TABLE = "dnet_test_table";
38

    
39
	protected static final int NUM_VERSIONS = 10;
40

    
41
	@Autowired
42
	private HadoopServiceCore hadoopServiceCore;
43

    
44
	@Autowired
45
	private ConfigurationEnumerator configurationEnumerator;
46

    
47
	@Before
48
	public void setUp() throws HadoopServiceException, IOException, InterruptedException {
49
		assertNotNull(hadoopServiceCore);
50

    
51
		ensureDropTable();
52
	}
53

    
54
	@After
55
	public void tearDown() throws HadoopServiceException, IOException {
56
		ensureDropTable();
57
	}
58

    
59
	@Test
60
	@Ignore
61
	// TODO allow testing on a dev cluster instance
62
	public void testReadWrite() throws HadoopServiceException, IOException, InterruptedException {
63

    
64
		hadoopServiceCore.createTable(ClusterName.DM, TEST_TABLE, testSchema());
65
		assertTrue(hadoopServiceCore.existTable(ClusterName.DM, TEST_TABLE));
66

    
67
		final HTable htable = new HTable(configurationEnumerator.get(ClusterName.DM), TEST_TABLE);
68

    
69
		final Put put = new Put(Bytes.toBytes("1"));
70

    
71
		for (int i = 0; i < NUM_VERSIONS; i++) {
72
			put.add(Bytes.toBytes("result"), Bytes.toBytes("body"), Bytes.toBytes(i + ""));
73
			htable.put(put);
74
			Thread.sleep(1000);
75
		}
76
		final Get get = new Get(Bytes.toBytes("1"));
77
		get.setMaxVersions(HBaseTestContextConfiguration.MAX_VERSIONS);
78

    
79
		final Result r = htable.get(get);
80

    
81
		// Map<family,Map<qualifier,Map<timestamp,value>>>
82
		final NavigableMap<Long, byte[]> versions = r.getMap().get(Bytes.toBytes("result")).get(Bytes.toBytes("body"));
83

    
84
		for (final Entry<Long, byte[]> e : versions.entrySet()) {
85
			System.out.println("t: " + DateUtils.calculate_ISO8601(e.getKey()) + ", v: " + Bytes.toString(e.getValue()));
86
		}
87

    
88
		htable.close();
89

    
90
	}
91

    
92
	protected void ensureDropTable() throws HadoopServiceException, IOException {
93
		if (hadoopServiceCore.existTable(ClusterName.DM, TEST_TABLE)) {
94
			hadoopServiceCore.dropTable(ClusterName.DM, TEST_TABLE);
95
		}
96
	}
97

    
98
	protected Set<String> testSchema() {
99
		final Set<String> schema = Sets.newHashSet();
100

    
101
		schema.add("result");
102

    
103
		return schema;
104
	}
105

    
106
}
(1-1/2)