Project

General

Profile

« Previous | Next » 

Revision 49795

lazy init of hadoop clients allow to define the user for HBase admin and jtClient once. The user is available as service property

View differences:

modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/HadoopClients.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import org.apache.hadoop.hbase.client.HBaseAdmin;
4
import org.apache.hadoop.mapred.JobClient;
5
import org.apache.oozie.client.OozieClient;
6

  
7
public class HadoopClients {
8

  
9
	private final JobClient jtClient;
10

  
11
	private final OozieClient oozieClient;
12

  
13
	private final HBaseAdmin hbaseAdmin;
14

  
15
	public HadoopClients(JobClient jtClient, OozieClient oozieClient, HBaseAdmin hbaseAdmin) {
16
		super();
17
		this.jtClient = jtClient;
18
		this.oozieClient = oozieClient;
19
		this.hbaseAdmin = hbaseAdmin;
20
	}
21

  
22
	public JobClient getJtClient() {
23
		return jtClient;
24
	}
25

  
26
	public HBaseAdmin getHbaseAdmin() {
27
		return hbaseAdmin;
28
	}
29

  
30
	public OozieClient getOozieClient() {
31
		return oozieClient;
32
	}
33

  
34
	public boolean isOozieAvailable() {
35
		return getOozieClient() != null;
36
	}
37

  
38
	public boolean isHbaseAvailable() {
39
		return getHbaseAdmin() != null;
40
	}
41

  
42
	public boolean isMapredAvailable() {
43
		return getJtClient() != null;
44
	}
45

  
46
}
modules/dnet-hadoop-service/trunk/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTestContextConfiguration.java
1 1
package eu.dnetlib.data.hadoop.hbase;
2 2

  
3
import eu.dnetlib.data.hadoop.HadoopClientMap;
4
import eu.dnetlib.data.hadoop.HadoopServiceCore;
5
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
6
import eu.dnetlib.data.hadoop.config.ConfigurationFactory;
7
import eu.dnetlib.data.hadoop.mapred.JobClientFactory;
8
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory;
3 9
import org.springframework.context.annotation.Bean;
4 10
import org.springframework.context.annotation.Configuration;
5 11
import org.springframework.context.annotation.Profile;
6 12
import org.springframework.core.io.ClassPathResource;
7 13
import org.springframework.core.io.Resource;
8 14

  
9
import eu.dnetlib.data.hadoop.HadoopClientMap;
10
import eu.dnetlib.data.hadoop.HadoopServiceCore;
11
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
12
import eu.dnetlib.data.hadoop.config.ConfigurationFactory;
13
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory;
14
import eu.dnetlib.data.mapreduce.JobClientFactory;
15

  
16 15
@Configuration
17 16
@Profile(value = "test")
18 17
public class HBaseTestContextConfiguration {
......
34 33
		return core;
35 34
	}
36 35

  
37
	@Bean(initMethod = "init")
36
	@Bean
38 37
	public HadoopClientMap hadoopClientMap() throws InterruptedException {
39 38
		final HadoopClientMap clientMap = new HadoopClientMap();
40 39
		clientMap.setEnabledClients(ENABLED_CLIENTS);
41
		clientMap.setClientsInitTime(10);
42 40

  
43 41
		return clientMap;
44 42
	}
modules/dnet-hadoop-service/trunk/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTest.java
1 1
package eu.dnetlib.data.hadoop.hbase;
2 2

  
3
import static org.junit.Assert.assertNotNull;
4
import static org.junit.Assert.assertTrue;
5

  
6 3
import java.io.IOException;
7 4
import java.util.Map.Entry;
8 5
import java.util.NavigableMap;
9 6
import java.util.Set;
10 7

  
8
import com.google.common.collect.Sets;
9
import eu.dnetlib.data.hadoop.HadoopServiceCore;
10
import eu.dnetlib.data.hadoop.config.ClusterName;
11
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
12
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
13
import eu.dnetlib.miscutils.datetime.DateUtils;
11 14
import org.apache.hadoop.hbase.client.Get;
12 15
import org.apache.hadoop.hbase.client.HTable;
13 16
import org.apache.hadoop.hbase.client.Put;
......
23 26
import org.springframework.test.context.ContextConfiguration;
24 27
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
25 28

  
26
import com.google.common.collect.Sets;
29
import static org.junit.Assert.assertNotNull;
30
import static org.junit.Assert.assertTrue;
27 31

  
28
import eu.dnetlib.data.hadoop.HadoopServiceCore;
29
import eu.dnetlib.data.hadoop.config.ClusterName;
30
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
31
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
32
import eu.dnetlib.miscutils.datetime.DateUtils;
33

  
34 32
@ActiveProfiles("test")
35 33
@RunWith(SpringJUnit4ClassRunner.class)
36 34
@ContextConfiguration(classes = HBaseTestContextConfiguration.class)
......
50 48
	public void setUp() throws HadoopServiceException, IOException, InterruptedException {
51 49
		assertNotNull(hadoopServiceCore);
52 50

  
53
		System.out.println("waiting for clients to be ready... timeout? " + !hadoopServiceCore.getClients().waitClients());
54

  
55 51
		ensureDropTable();
56 52
	}
57 53

  
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseAdminFactory.java
1 1
package eu.dnetlib.data.hadoop.hbase;
2 2

  
3
import eu.dnetlib.data.hadoop.AbstractHadoopClient;
4
import eu.dnetlib.data.hadoop.config.ClusterName;
5
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
3 6
import org.apache.commons.logging.Log;
4 7
import org.apache.commons.logging.LogFactory;
5 8
import org.apache.hadoop.hbase.client.HBaseAdmin;
6
import org.springframework.beans.factory.annotation.Autowired;
7 9

  
8
import eu.dnetlib.data.hadoop.config.ClusterName;
9
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
10
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
10
public class HBaseAdminFactory extends AbstractHadoopClient {
11 11

  
12
public class HBaseAdminFactory {
13

  
14 12
	private static final Log log = LogFactory.getLog(HBaseAdminFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
15 13

  
16
	@Autowired
17
	private ConfigurationEnumerator configurationEnumerator;
18

  
19 14
	public HBaseAdmin newInstance(final ClusterName clusterName) throws HadoopServiceException {
20 15
		try {
21 16
			log.info("init hbaseAdmin, cluster: " + clusterName.toString());
17
			setHadoopUser();
22 18
			return new HBaseAdmin(configurationEnumerator.get(clusterName));
23 19
		} catch (final Throwable e) {
24
			log.warn("unable to initialize hbase client for cluster: " + clusterName.toString(), e);
25
			return null;
20
			throw new HadoopServiceException("unable to initialize hbase client for cluster: " + clusterName.toString(), e);
26 21
		}
27 22
	}
23

  
28 24
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java
40 40
	private int maxVersions;
41 41

  
42 42
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
43
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
44
		return Arrays.asList(admin.listTables())
45
				.stream()
46
				.map(HTableDescriptor::getNameAsString)
47
				.collect(Collectors.toList());
43
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
44
			return Arrays.asList(admin.listTables())
45
					.stream()
46
					.map(HTableDescriptor::getNameAsString)
47
					.collect(Collectors.toList());
48
		}
48 49
	}
49 50

  
50
	private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException {
51
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
51
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
52
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
52 53

  
53
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
54
			if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
54 55

  
55
		return admin;
56
	}
56
			if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
57 57

  
58
	public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
59
		final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
58
			final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
60 59

  
61
		if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
60
			final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
62 61

  
63
		if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
62
			final Set<String> columns = Sets.newHashSet();
64 63

  
65
		final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes());
64
			for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) {
65
				columns.add(hColDesc.getNameAsString());
66
			}
66 67

  
67
		final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
68
			HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
69
			htDescriptor.setColumns(columns);
68 70

  
69
		final Set<String> columns = Sets.newHashSet();
71
			List<HBaseTableRegionInfo> regions = Lists.newArrayList();
70 72

  
71
		for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) {
72
			columns.add(hColDesc.getNameAsString());
73
		}
73
			for (HRegionInfo info : tableRegions) {
74
				regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
75
			}
76
			htDescriptor.setRegions(regions);
74 77

  
75
		HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor();
76
		htDescriptor.setColumns(columns);
78
			if (log.isDebugEnabled()) {
79
				log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
80
			}
77 81

  
78
		List<HBaseTableRegionInfo> regions = Lists.newArrayList();
79

  
80
		for (HRegionInfo info : tableRegions) {
81
			regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey())));
82
			return htDescriptor.toString();
82 83
		}
83
		htDescriptor.setRegions(regions);
84

  
85
		if (log.isDebugEnabled()) {
86
			log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString());
87
		}
88

  
89
		return htDescriptor.toString();
90 84
	}
91 85

  
92 86
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
93
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
94
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
95
		return desc.getFamilies().stream()
96
				.map(d -> d.getNameAsString())
97
				.collect(Collectors.toList());
87
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
88
			final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
89
			return desc.getFamilies().stream()
90
					.map(d -> d.getNameAsString())
91
					.collect(Collectors.toList());
92
		}
98 93
	}
99 94

  
100 95
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
101
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
96
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
102 97

  
103
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
98
			if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
104 99

  
105
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
100
			final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
106 101

  
107
		log.info("disabling table: " + table);
108
		admin.disableTable(table);
102
			log.info("disabling table: " + table);
103
			admin.disableTable(table);
109 104

  
110
		log.info("deleting table: " + table);
111
		admin.deleteTable(table);
105
			log.info("deleting table: " + table);
106
			admin.deleteTable(table);
112 107

  
113
		log.info("creating table: " + table);
114
		admin.createTable(desc);
108
			log.info("creating table: " + table);
109
			admin.createTable(desc);
110
		}
115 111
	}
116 112

  
117 113
	public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
118
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
114
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
119 115

  
120
		return admin.tableExists(table);
116
			return admin.tableExists(table);
117
		}
121 118
	}
122 119

  
123 120
	public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
124
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
121
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
125 122

  
126
		if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
123
			if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
127 124

  
128
		log.info("disabling table: " + table);
129
		admin.disableTable(table);
125
			log.info("disabling table: " + table);
126
			admin.disableTable(table);
130 127

  
131
		log.info("deleting table: " + table);
132
		admin.deleteTable(table);
128
			log.info("deleting table: " + table);
129
			admin.deleteTable(table);
130
		}
133 131
	}
134 132

  
135 133
	public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException {
136
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
134
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
137 135

  
138
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
136
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
139 137

  
140
		if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
138
			if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
141 139

  
142
		final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
140
			final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
143 141

  
144
		doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
142
			doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions());
143
		}
145 144
	}
146 145

  
147 146
	public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
148
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
147
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
149 148

  
150
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
149
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
151 150

  
152
		doCreateTable(clusterName, table, columns, null);
151
			doCreateTable(clusterName, table, columns, null);
152
		}
153 153
	}
154 154

  
155 155
	public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions)
156 156
			throws IOException, HadoopServiceException {
157
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
157
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
158 158

  
159
		if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
159
			if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
160 160

  
161
		final HTableDescriptor desc = new HTableDescriptor(table);
162
		for (final String column : columns) {
163
			final HColumnDescriptor hds = new HColumnDescriptor(column);
164
			hds.setMaxVersions(getMaxVersions());
165
			desc.addFamily(hds);
166
		}
161
			final HTableDescriptor desc = new HTableDescriptor(table);
162
			for (final String column : columns) {
163
				final HColumnDescriptor hds = new HColumnDescriptor(column);
164
				hds.setMaxVersions(getMaxVersions());
165
				desc.addFamily(hds);
166
			}
167 167

  
168
		log.info("creating hbase table: " + table);
168
			log.info("creating hbase table: " + table);
169 169

  
170
		if (regions != null && !regions.isEmpty()) {
171
			log.debug(String.format("create using %s regions: %s", regions.size(), regions));
172
			admin.createTable(desc, getSplitKeys(regions));
173
		} else {
174
			admin.createTable(desc);
170
			if (regions != null && !regions.isEmpty()) {
171
				log.debug(String.format("create using %s regions: %s", regions.size(), regions));
172
				admin.createTable(desc, getSplitKeys(regions));
173
			} else {
174
				admin.createTable(desc);
175
			}
176

  
177
			log.info("created hbase table: [" + table + "]");
178
			log.debug("descriptor: [" + desc.toString() + "]");
175 179
		}
176

  
177
		log.info("created hbase table: [" + table + "]");
178
		log.debug("descriptor: [" + desc.toString() + "]");
179 180
	}
180 181

  
181 182
	private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) {
......
187 188
	}
188 189

  
189 190
	public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException {
191
		try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
190 192

  
191
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
193
			if (!admin.tableExists(table)) {
194
				createTable(clusterName, table, columns);
195
			} else {
196
				final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
192 197

  
193
		if (!admin.tableExists(table)) {
194
			createTable(clusterName, table, columns);
195
		} else {
196
			final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
198
				final Set<String> foundColumns = desc.getFamilies().stream()
199
						.map(d -> d.getNameAsString())
200
						.collect(Collectors.toCollection(HashSet::new));
197 201

  
198
			final Set<String> foundColumns = desc.getFamilies().stream()
199
					.map(d -> d.getNameAsString())
200
					.collect(Collectors.toCollection(HashSet::new));
202
				log.info("ensuring columns on table " + table + ": " + columns);
203
				final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
204
				if (!missingColumns.isEmpty()) {
201 205

  
202
			log.info("ensuring columns on table " + table + ": " + columns);
203
			final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
204
			if (!missingColumns.isEmpty()) {
206
					if (admin.isTableEnabled(table)) {
207
						admin.disableTable(table);
208
					}
205 209

  
206
				if (admin.isTableEnabled(table)) {
207
					admin.disableTable(table);
208
				}
210
					for (final String column : missingColumns) {
211
						log.info("hbase table: '" + table + "', adding column: " + column);
212
						admin.addColumn(table, new HColumnDescriptor(column));
213
					}
209 214

  
210
				for (final String column : missingColumns) {
211
					log.info("hbase table: '" + table + "', adding column: " + column);
212
					admin.addColumn(table, new HColumnDescriptor(column));
215
					admin.enableTable(table);
213 216
				}
214

  
215
				admin.enableTable(table);
216 217
			}
217 218
		}
218 219
	}
......
317 318

  
318 319
	public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException {
319 320
		final Configuration conf = configurationEnumerator.get(clusterName);
320
		final HTable table = new HTable(conf, tableName);
321
		try {
321
		try(final HTable table = new HTable(conf, tableName)) {
322 322
			final ResultScanner rs = table.getScanner(scan);
323 323
			try {
324 324
				return Lists.newArrayList(rs.iterator());
325 325
			} finally {
326 326
				rs.close();
327 327
			}
328
		} finally {
329
			table.close();
330 328
		}
331 329
	}
332 330

  
......
336 334

  
337 335
		final Configuration conf = configurationEnumerator.get(clusterName);
338 336

  
339
		try {
340
			final FileSystem hdfs = FileSystem.get(conf);
337
		try(final FileSystem hdfs = FileSystem.get(conf)) {
341 338
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
342 339

  
343 340
			if (hdfs.exists(absolutePath)) {
......
360 357

  
361 358
		final Configuration conf = configurationEnumerator.get(clusterName);
362 359

  
363
		try {
364
			final FileSystem hdfs = FileSystem.get(conf);
360
		try(final FileSystem hdfs = FileSystem.get(conf)) {
365 361
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
366 362
			if (!hdfs.exists(absolutePath)) {
367 363
				hdfs.mkdirs(absolutePath);
......
388 384
			throw new HadoopServiceException("invalid empty path");
389 385

  
390 386
		final Configuration conf = configurationEnumerator.get(clusterName);
391
		try {
392
			final FileSystem hdfs = FileSystem.get(conf);
387
		try(final FileSystem hdfs = FileSystem.get(conf)) {
393 388
			final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path));
394 389
			return hdfs.exists(absolutePath);
395 390
		} catch (IOException e) {
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieClientFactory.java
1 1
package eu.dnetlib.data.hadoop.oozie;
2 2

  
3
import eu.dnetlib.data.hadoop.AbstractHadoopClient;
4
import eu.dnetlib.data.hadoop.config.ClusterName;
5
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
3 6
import org.apache.commons.logging.Log;
4 7
import org.apache.commons.logging.LogFactory;
5 8
import org.apache.oozie.client.OozieClient;
6
import org.springframework.beans.factory.annotation.Autowired;
7 9

  
8
import eu.dnetlib.data.hadoop.config.ClusterName;
9
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
10

  
11 10
/**
12 11
 * Factory bean for Oozie client instances.
13 12
 * 
......
16 15
 * @author claudio
17 16
 * 
18 17
 */
19
public class OozieClientFactory {
18
public class OozieClientFactory extends AbstractHadoopClient {
20 19

  
21 20
	private static final String ENV_ATTRIBUTE_OOZIE_SERVICE_LOC = "oozie.service.loc";
22 21

  
23 22
	private static final Log log = LogFactory.getLog(OozieClientFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
24 23

  
25
	@Autowired
26
	private ConfigurationEnumerator configurationEnumerator;
27

  
28
	public OozieClient newInstance(ClusterName clusterName) {
24
	public OozieClient newInstance(ClusterName clusterName) throws HadoopServiceException {
29 25
		final String oozieServiceLocation = configurationEnumerator.get(clusterName).get(ENV_ATTRIBUTE_OOZIE_SERVICE_LOC);
30 26
		log.info("init oozie client, cluster: " + clusterName.toString() + ", oozie server: " + oozieServiceLocation);
27
		setHadoopUser();
31 28
		try {
32 29
			return new OozieClient(oozieServiceLocation);
33 30
		} catch (Throwable e) {
34
			log.warn("unable to initialize oozie client for cluster: " + clusterName.toString(), e);
35
			return null;
31
			throw new HadoopServiceException("unable to initialize oozie client for cluster: " + clusterName.toString(), e);
36 32
		}
37 33
	}
38 34

  
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/HadoopClientMap.java
1 1
package eu.dnetlib.data.hadoop;
2 2

  
3
import java.io.IOException;
3 4
import java.util.Map;
4
import java.util.concurrent.ExecutorService;
5
import java.util.concurrent.Executors;
6
import java.util.concurrent.TimeUnit;
7 5

  
6
import eu.dnetlib.data.hadoop.mapred.JobClientFactory;
7
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
8 8
import org.apache.commons.logging.Log;
9 9
import org.apache.commons.logging.LogFactory;
10 10
import org.apache.hadoop.hbase.client.HBaseAdmin;
......
20 20
import eu.dnetlib.data.hadoop.config.ClusterName;
21 21
import eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory;
22 22
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory;
23
import eu.dnetlib.data.mapreduce.JobClientFactory;
24 23

  
24

  
25 25
public class HadoopClientMap {
26 26

  
27 27
	private static final Log log = LogFactory.getLog(HadoopClientMap.class); // NOPMD by marko on 11/24/08 5:02 PM
......
38 38
	@Autowired
39 39
	private HBaseAdminFactory hbaseAdminFactory;
40 40

  
41
	private int clientsInitTime;
42

  
43 41
	private Map<String, Map<String, String>> enabledClients = Maps.newHashMap();
44 42

  
45
	private final Map<ClusterName, HadoopClients> clients = Maps.newHashMap();
46

  
47
	private final ExecutorService executor = Executors.newSingleThreadExecutor();
48

  
49
	public void init() {
50
		log.info("clients conf: " + getEnabledClients());
51
		executor.execute(new Runnable() {
52

  
53
			@Override
54
			public void run() {
55
				for (final String name : enabledClients.keySet()) {
56
					doInit(name);
57
				}
58
			}
59
		});
43
	public boolean isMapreduceAvailable(final ClusterName name) {
44
		return isClientAvailable(name, "mapred");
60 45
	}
61 46

  
62
	private void doInit(final String name) {
63
		try {
64
			log.info("initializing clients for hadoop cluster: " + name);
65
			final ClusterName clusterName = ClusterName.valueOf(name);
66

  
67
			final Map<String, String> clientsConf = enabledClients.get(name);
68

  
69
			final JobClient jobClient = Boolean.valueOf(clientsConf.get("mapred")) ? getJobClientFactory().newInstance(name) : null;
70
			final OozieClient oozieClient = Boolean.valueOf(clientsConf.get("oozie")) ? getOozieClientFactory().newInstance(clusterName) : null;
71
			final HBaseAdmin hbaseAdmin = Boolean.valueOf(clientsConf.get("hbase")) ? getHbaseAdminFactory().newInstance(clusterName) : null;
72

  
73
			clients.put(clusterName, new HadoopClients(jobClient, oozieClient, hbaseAdmin));
74
		} catch (final Throwable e) {
75
			log.error("Error initializing hadoop client for cluster: " + name, e);
76
			throw new RuntimeException(e);
77
		}
47
	public boolean isOozieAvailable(final ClusterName name) {
48
		return isClientAvailable(name, "oozie");
78 49
	}
79 50

  
80
	public boolean waitClients() throws InterruptedException {
81
		return executor.awaitTermination(getClientsInitTime(), TimeUnit.SECONDS);
51
	public boolean isHbaseAvailable(final ClusterName name) {
52
		return isClientAvailable(name, "hbase");
82 53
	}
83 54

  
84
	public JobClient getJtClient(final ClusterName name) {
85
		return getClients(name).getJtClient();
55
	private boolean isClientAvailable(final ClusterName name, final String clientName) {
56
		final String clusterName = name.toString();
57
		return enabledClients.containsKey(clusterName) && "true".equals(enabledClients.get(clusterName).get(clientName));
86 58
	}
87 59

  
88
	public boolean isMapreduceAvailable(final ClusterName name) {
89
		return getClients(name).isMapredAvailable();
60
	public JobClient getJtClient(final ClusterName clusterName, final String username) throws HadoopServiceException, IOException {
61
		if (!isMapreduceAvailable(clusterName)) {
62
			throw new HadoopServiceException("jobtracker is not available for cluster " + clusterName.toString());
63
		}
64
		return getJobClientFactory().newInstance(clusterName, username);
90 65
	}
91 66

  
92
	public OozieClient getOozieClient(final ClusterName name) {
93
		return getClients(name).getOozieClient();
67
	public JobClient getJtClient(final ClusterName clusterName) throws HadoopServiceException, IOException {
68
		if (!isMapreduceAvailable(clusterName)) {
69
			throw new HadoopServiceException("jobtracker is not available for cluster " + clusterName.toString());
70
		}
71
		return getJobClientFactory().newInstance(clusterName);
94 72
	}
95 73

  
96
	public boolean isOozieAvailable(final ClusterName name) {
97
		return getClients(name).isOozieAvailable();
74
	public OozieClient getOozieClient(final ClusterName name) throws HadoopServiceException {
75
		if (!isOozieAvailable(name)) {
76
			throw new HadoopServiceException("oozie is not available for cluster " + name.toString());
77
		}
78
		return getOozieClientFactory().newInstance(name);
98 79
	}
99 80

  
100
	public HBaseAdmin getHbaseAdmin(final ClusterName name) {
101
		return getClients(name).getHbaseAdmin();
81
	public HBaseAdmin getHbaseAdmin(final ClusterName name) throws HadoopServiceException {
82
		if (!isHbaseAvailable(name)) {
83
			throw new HadoopServiceException("hbase is not available for cluster " + name.toString());
84
		}
85
		return getHbaseAdminFactory().newInstance(name);
102 86
	}
103 87

  
104
	public HadoopClients getClients(final ClusterName name) {
105
		final HadoopClients hadoopClients = clients.get(name);
106
		if (hadoopClients == null) throw new IllegalArgumentException("cluster " + name.toString() + " is currently disabled");
107
		return hadoopClients;
108
	}
109

  
110 88
	// //////////
111 89

  
112 90
	public String getEnabledClients() {
......
143 121
		this.hbaseAdminFactory = hbaseAdminFactory;
144 122
	}
145 123

  
146
	public int getClientsInitTime() {
147
		return clientsInitTime;
148
	}
149

  
150
	@Required
151
	public void setClientsInitTime(int clientsInitTime) {
152
		this.clientsInitTime = clientsInitTime;
153
	}
154

  
155 124
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/mapred/JobClientFactory.java
1
package eu.dnetlib.data.hadoop.mapred;
2

  
3
import java.io.IOException;
4

  
5
import eu.dnetlib.data.hadoop.AbstractHadoopClient;
6
import eu.dnetlib.data.hadoop.config.ClusterName;
7
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
8
import org.apache.hadoop.mapred.JobClient;
9

  
10
/**
11
 * Factory bean for jobClient instances
12
 *
13
 * @author claudio
14
 *
15
 */
16
public class JobClientFactory extends AbstractHadoopClient {
17

  
18
	public JobClient newInstance(final ClusterName clusterName, final String userName) throws IOException {
19
		setHadoopUser(userName);
20
		return new JobClient(configurationEnumerator.get(clusterName));
21
	}
22

  
23
	public JobClient newInstance(final ClusterName clusterName) throws IOException, HadoopServiceException {
24
		setHadoopUser();
25
		return new JobClient(configurationEnumerator.get(clusterName));
26
	}
27

  
28
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/mapred/MapreduceJobMonitor.java
9 9
import org.apache.hadoop.mapred.Counters;
10 10
import org.apache.hadoop.mapred.Counters.Counter;
11 11
import org.apache.hadoop.mapred.Counters.Group;
12
import org.apache.hadoop.mapred.JobClient;
12 13
import org.apache.hadoop.mapred.JobStatus;
13 14
import org.apache.hadoop.mapred.RunningJob;
14 15

  
......
24 25

  
25 26
	private final RunningJob runningJob;
26 27

  
28
	private final JobClient jobClient;
29

  
27 30
	public MapreduceJobMonitor(final RunningJob runningJob, final JobCompletion callback) {
31
		this(null, runningJob, callback);
32
	}
33

  
34
	public MapreduceJobMonitor(final JobClient jobClient, final RunningJob runningJob, final JobCompletion callback) {
28 35
		super(callback);
29 36
		this.runningJob = runningJob;
37
		this.jobClient = jobClient;
30 38
	}
31 39

  
32 40
	@Override
......
55 63
			}
56 64
		} catch (final Throwable e) {
57 65
			getCallback().failed(getHadoopId(), e);
66
		} finally {
67
			try {
68
				if (jobClient != null) {
69
					jobClient.close();
70
				}
71
			} catch (IOException e) {
72
				throw new RuntimeException("unable to close jobClient", e);
73
			}
58 74
		}
59 75
	}
60 76

  
......
124 140
		}
125 141
	}
126 142

  
143
	public JobClient getJobClient() {
144
		return jobClient;
145
	}
127 146
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/SubmitMapreduceJobAction.java
5 5
import java.util.Map;
6 6
import java.util.Map.Entry;
7 7

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.conf.Configuration;
11
import org.apache.hadoop.fs.Path;
12
import org.apache.hadoop.mapred.JobConf;
13
import org.apache.hadoop.mapred.RunningJob;
14

  
15 8
import eu.dnetlib.data.hadoop.HadoopJob;
16 9
import eu.dnetlib.data.hadoop.config.ClusterName;
17 10
import eu.dnetlib.data.hadoop.mapred.MapreduceJobMonitor;
......
21 14
import eu.dnetlib.data.hadoop.utils.ScanProperties;
22 15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
23 16
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.hadoop.conf.Configuration;
20
import org.apache.hadoop.fs.Path;
21
import org.apache.hadoop.mapred.JobClient;
22
import org.apache.hadoop.mapred.JobConf;
23
import org.apache.hadoop.mapred.RunningJob;
24 24

  
25 25
public class SubmitMapreduceJobAction extends AbstractSubmitAction {
26 26

  
......
38 38
		try {
39 39
			final JobConf jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
40 40

  
41
			if (!hadoopClientMap.isMapreduceAvailable(clusterName))
42
				throw new HadoopServiceException("mapreduce not available for cluster: " + clusterName.toString());
43

  
44 41
			logJobDetails(jobConf);
45 42

  
46
			final RunningJob runningJob = hadoopClientMap.getJtClient(clusterName).submitJob(jobConf);
43
			final JobClient jtClient = hadoopClientMap.getJtClient(clusterName);
44
			final RunningJob runningJob = jtClient.submitJob(jobConf);
45

  
47 46
			final String jobId = newJobId(clusterName, runningJob.getID().getId());
48 47

  
49
			jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
50
					new MapreduceJobMonitor(runningJob, callback)));
48
			jobRegistry.registerJob(
49
					HadoopJob.newInstance(jobId, clusterName, jobProfile, new MapreduceJobMonitor(jtClient, runningJob, callback)));
51 50

  
52 51
		} catch (final IOException e) {
53 52
			throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/AbstractHadoopClient.java
1
package eu.dnetlib.data.hadoop;
2

  
3
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
4
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
5
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Autowired;
9

  
10
public abstract class AbstractHadoopClient {
11

  
12
	private static final Log log = LogFactory.getLog(AbstractHadoopClient.class);
13

  
14
	@Autowired
15
	private ISClient isClient;
16

  
17
	@Autowired
18
	protected ConfigurationEnumerator configurationEnumerator;
19

  
20
	protected void setHadoopUser() throws HadoopServiceException {
21
		setHadoopUser(getDefaultUser());
22
	}
23

  
24
	private String getDefaultUser() throws HadoopServiceException {
25
		try {
26
			return isClient.queryForServiceProperty("default.hadoop.user");
27
		} catch (ISLookUpException e) {
28
			throw new HadoopServiceException(e);
29
		}
30
	}
31

  
32
	protected void setHadoopUser(final String userName) {
33
		log.info("setting HADOOP_USER_NAME as " + userName);
34
		System.setProperty("HADOOP_USER_NAME", userName);
35
	}
36

  
37
}
modules/dnet-hadoop-service/trunk/src/main/resources/eu/dnetlib/data/hadoop/applicationContext-dnet-hadoop-service.properties
3 3
services.hadoop.jobregistry.size=100
4 4
services.hadoop.lib.path=/user/dnet/lib/dnet-mapreduce-jobs-assembly-0.0.6.2-SNAPSHOT.jar
5 5
services.hadoop.hbase.maxversions=1
6
services.hadoop.clients.init.timeout=30
6
services.hadoop.clients.init.timeout=30
7
services.hadoop.user=dnet
modules/dnet-hadoop-service/trunk/src/main/resources/eu/dnetlib/data/hadoop/applicationContext-dnet-hadoop-service.xml
79 79
    <bean id="isClient" class="eu.dnetlib.data.hadoop.ISClient" />
80 80

  
81 81
	<bean id="hadoopClientMap" class="eu.dnetlib.data.hadoop.HadoopClientMap"
82
		init-method="init" p:enabledClients="${services.hadoop.clients}" 
83
		p:clientsInitTime="${services.hadoop.clients.init.timeout}"/>
82
		p:enabledClients="${services.hadoop.clients}" />
84 83

  
85 84
	<bean id="oozieClientFactory" class="eu.dnetlib.data.hadoop.oozie.OozieClientFactory" />
86 85

  
87
	<bean id="hbaseAdminFactory" class="eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory" />	
86
	<bean id="hbaseAdminFactory" class="eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory" />
87

  
88
	<bean id="jobClientFactory" class="eu.dnetlib.data.hadoop.mapred.JobClientFactory" />
88 89
		
89 90
	<bean id="hadoopJobRegistry" class="eu.dnetlib.data.hadoop.JobRegistry" 
90 91
		p:maxJobs="${services.hadoop.jobregistry.size}"/>
......
110 111
		<property name="serviceProperties">
111 112
			<map>
112 113
				<entry key="defaultLibPath" value="${services.hadoop.lib.path}" />
114
				<entry key="default.hadoop.user" value="${services.hadoop.user}" />
113 115
			</map>
114 116
		</property>
115 117
	</bean>		
modules/dnet-hadoop-service/trunk/pom.xml
9 9
	<groupId>eu.dnetlib</groupId>
10 10
	<artifactId>dnet-hadoop-service</artifactId>
11 11
	<packaging>jar</packaging>
12
	<version>2.6.3-SNAPSHOT</version>
12
	<version>2.7.3-SNAPSHOT</version>
13 13
	<scm>
14 14
	    <developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-hadoop-service/trunk</developerConnection>
15 15
	</scm>
......
58 58
		</dependency>
59 59
		<dependency>
60 60
			<groupId>eu.dnetlib</groupId>
61
			<artifactId>dnet-hadoop-commons</artifactId>
62
			<version>[2.0.0,3.0.0)</version>
63
		</dependency>
64
<!--
65
		<dependency>
66
			<groupId>eu.dnetlib</groupId>
61 67
			<artifactId>dnet-mapreduce-submitter</artifactId>
62 68
			<version>[3.0.0,4.0.0)</version>
63 69
		</dependency>
70
-->
64 71
		<dependency>
65 72
			<groupId>org.apache.hbase</groupId>
66 73
			<artifactId>hbase</artifactId>

Also available in: Unified diff