Revision 49795
Added by Claudio Atzori over 6 years ago
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/HadoopClients.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import org.apache.hadoop.hbase.client.HBaseAdmin; |
|
4 |
import org.apache.hadoop.mapred.JobClient; |
|
5 |
import org.apache.oozie.client.OozieClient; |
|
6 |
|
|
7 |
public class HadoopClients { |
|
8 |
|
|
9 |
private final JobClient jtClient; |
|
10 |
|
|
11 |
private final OozieClient oozieClient; |
|
12 |
|
|
13 |
private final HBaseAdmin hbaseAdmin; |
|
14 |
|
|
15 |
public HadoopClients(JobClient jtClient, OozieClient oozieClient, HBaseAdmin hbaseAdmin) { |
|
16 |
super(); |
|
17 |
this.jtClient = jtClient; |
|
18 |
this.oozieClient = oozieClient; |
|
19 |
this.hbaseAdmin = hbaseAdmin; |
|
20 |
} |
|
21 |
|
|
22 |
public JobClient getJtClient() { |
|
23 |
return jtClient; |
|
24 |
} |
|
25 |
|
|
26 |
public HBaseAdmin getHbaseAdmin() { |
|
27 |
return hbaseAdmin; |
|
28 |
} |
|
29 |
|
|
30 |
public OozieClient getOozieClient() { |
|
31 |
return oozieClient; |
|
32 |
} |
|
33 |
|
|
34 |
public boolean isOozieAvailable() { |
|
35 |
return getOozieClient() != null; |
|
36 |
} |
|
37 |
|
|
38 |
public boolean isHbaseAvailable() { |
|
39 |
return getHbaseAdmin() != null; |
|
40 |
} |
|
41 |
|
|
42 |
public boolean isMapredAvailable() { |
|
43 |
return getJtClient() != null; |
|
44 |
} |
|
45 |
|
|
46 |
} |
modules/dnet-hadoop-service/trunk/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTestContextConfiguration.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.hadoop.hbase; |
2 | 2 |
|
3 |
import eu.dnetlib.data.hadoop.HadoopClientMap; |
|
4 |
import eu.dnetlib.data.hadoop.HadoopServiceCore; |
|
5 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
6 |
import eu.dnetlib.data.hadoop.config.ConfigurationFactory; |
|
7 |
import eu.dnetlib.data.hadoop.mapred.JobClientFactory; |
|
8 |
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory; |
|
3 | 9 |
import org.springframework.context.annotation.Bean; |
4 | 10 |
import org.springframework.context.annotation.Configuration; |
5 | 11 |
import org.springframework.context.annotation.Profile; |
6 | 12 |
import org.springframework.core.io.ClassPathResource; |
7 | 13 |
import org.springframework.core.io.Resource; |
8 | 14 |
|
9 |
import eu.dnetlib.data.hadoop.HadoopClientMap; |
|
10 |
import eu.dnetlib.data.hadoop.HadoopServiceCore; |
|
11 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
12 |
import eu.dnetlib.data.hadoop.config.ConfigurationFactory; |
|
13 |
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory; |
|
14 |
import eu.dnetlib.data.mapreduce.JobClientFactory; |
|
15 |
|
|
16 | 15 |
@Configuration |
17 | 16 |
@Profile(value = "test") |
18 | 17 |
public class HBaseTestContextConfiguration { |
... | ... | |
34 | 33 |
return core; |
35 | 34 |
} |
36 | 35 |
|
37 |
@Bean(initMethod = "init")
|
|
36 |
@Bean |
|
38 | 37 |
public HadoopClientMap hadoopClientMap() throws InterruptedException { |
39 | 38 |
final HadoopClientMap clientMap = new HadoopClientMap(); |
40 | 39 |
clientMap.setEnabledClients(ENABLED_CLIENTS); |
41 |
clientMap.setClientsInitTime(10); |
|
42 | 40 |
|
43 | 41 |
return clientMap; |
44 | 42 |
} |
modules/dnet-hadoop-service/trunk/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTest.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.hadoop.hbase; |
2 | 2 |
|
3 |
import static org.junit.Assert.assertNotNull; |
|
4 |
import static org.junit.Assert.assertTrue; |
|
5 |
|
|
6 | 3 |
import java.io.IOException; |
7 | 4 |
import java.util.Map.Entry; |
8 | 5 |
import java.util.NavigableMap; |
9 | 6 |
import java.util.Set; |
10 | 7 |
|
8 |
import com.google.common.collect.Sets; |
|
9 |
import eu.dnetlib.data.hadoop.HadoopServiceCore; |
|
10 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
11 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
12 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
13 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
11 | 14 |
import org.apache.hadoop.hbase.client.Get; |
12 | 15 |
import org.apache.hadoop.hbase.client.HTable; |
13 | 16 |
import org.apache.hadoop.hbase.client.Put; |
... | ... | |
23 | 26 |
import org.springframework.test.context.ContextConfiguration; |
24 | 27 |
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; |
25 | 28 |
|
26 |
import com.google.common.collect.Sets; |
|
29 |
import static org.junit.Assert.assertNotNull; |
|
30 |
import static org.junit.Assert.assertTrue; |
|
27 | 31 |
|
28 |
import eu.dnetlib.data.hadoop.HadoopServiceCore; |
|
29 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
30 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
31 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
32 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
33 |
|
|
34 | 32 |
@ActiveProfiles("test") |
35 | 33 |
@RunWith(SpringJUnit4ClassRunner.class) |
36 | 34 |
@ContextConfiguration(classes = HBaseTestContextConfiguration.class) |
... | ... | |
50 | 48 |
public void setUp() throws HadoopServiceException, IOException, InterruptedException { |
51 | 49 |
assertNotNull(hadoopServiceCore); |
52 | 50 |
|
53 |
System.out.println("waiting for clients to be ready... timeout? " + !hadoopServiceCore.getClients().waitClients()); |
|
54 |
|
|
55 | 51 |
ensureDropTable(); |
56 | 52 |
} |
57 | 53 |
|
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseAdminFactory.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.hadoop.hbase; |
2 | 2 |
|
3 |
import eu.dnetlib.data.hadoop.AbstractHadoopClient; |
|
4 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
5 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
3 | 6 |
import org.apache.commons.logging.Log; |
4 | 7 |
import org.apache.commons.logging.LogFactory; |
5 | 8 |
import org.apache.hadoop.hbase.client.HBaseAdmin; |
6 |
import org.springframework.beans.factory.annotation.Autowired; |
|
7 | 9 |
|
8 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
9 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
10 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
10 |
public class HBaseAdminFactory extends AbstractHadoopClient { |
|
11 | 11 |
|
12 |
public class HBaseAdminFactory { |
|
13 |
|
|
14 | 12 |
private static final Log log = LogFactory.getLog(HBaseAdminFactory.class); // NOPMD by marko on 11/24/08 5:02 PM |
15 | 13 |
|
16 |
@Autowired |
|
17 |
private ConfigurationEnumerator configurationEnumerator; |
|
18 |
|
|
19 | 14 |
public HBaseAdmin newInstance(final ClusterName clusterName) throws HadoopServiceException { |
20 | 15 |
try { |
21 | 16 |
log.info("init hbaseAdmin, cluster: " + clusterName.toString()); |
17 |
setHadoopUser(); |
|
22 | 18 |
return new HBaseAdmin(configurationEnumerator.get(clusterName)); |
23 | 19 |
} catch (final Throwable e) { |
24 |
log.warn("unable to initialize hbase client for cluster: " + clusterName.toString(), e); |
|
25 |
return null; |
|
20 |
throw new HadoopServiceException("unable to initialize hbase client for cluster: " + clusterName.toString(), e); |
|
26 | 21 |
} |
27 | 22 |
} |
23 |
|
|
28 | 24 |
} |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java | ||
---|---|---|
40 | 40 |
private int maxVersions; |
41 | 41 |
|
42 | 42 |
public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException { |
43 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
44 |
return Arrays.asList(admin.listTables()) |
|
45 |
.stream() |
|
46 |
.map(HTableDescriptor::getNameAsString) |
|
47 |
.collect(Collectors.toList()); |
|
43 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
44 |
return Arrays.asList(admin.listTables()) |
|
45 |
.stream() |
|
46 |
.map(HTableDescriptor::getNameAsString) |
|
47 |
.collect(Collectors.toList()); |
|
48 |
} |
|
48 | 49 |
} |
49 | 50 |
|
50 |
private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException {
|
|
51 |
final HBaseAdmin admin = clients.getHbaseAdmin(clusterName);
|
|
51 |
public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException {
|
|
52 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
52 | 53 |
|
53 |
if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
|
|
54 |
if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
|
|
54 | 55 |
|
55 |
return admin; |
|
56 |
} |
|
56 |
if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString())); |
|
57 | 57 |
|
58 |
public String getHBaseTableDescriptor(final ClusterName clusterName, final String tableName) throws HadoopServiceException, IOException { |
|
59 |
final HBaseAdmin admin = clients.getHbaseAdmin(clusterName); |
|
58 |
final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes()); |
|
60 | 59 |
|
61 |
if (StringUtils.isBlank(tableName)) throw new HadoopServiceException("Table name cannot be empty or null");
|
|
60 |
final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes());
|
|
62 | 61 |
|
63 |
if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString()));
|
|
62 |
final Set<String> columns = Sets.newHashSet();
|
|
64 | 63 |
|
65 |
final List<HRegionInfo> tableRegions = admin.getTableRegions(tableName.getBytes()); |
|
64 |
for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) { |
|
65 |
columns.add(hColDesc.getNameAsString()); |
|
66 |
} |
|
66 | 67 |
|
67 |
final HTableDescriptor desc = admin.getTableDescriptor(tableName.getBytes()); |
|
68 |
HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor(); |
|
69 |
htDescriptor.setColumns(columns); |
|
68 | 70 |
|
69 |
final Set<String> columns = Sets.newHashSet();
|
|
71 |
List<HBaseTableRegionInfo> regions = Lists.newArrayList();
|
|
70 | 72 |
|
71 |
for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) { |
|
72 |
columns.add(hColDesc.getNameAsString()); |
|
73 |
} |
|
73 |
for (HRegionInfo info : tableRegions) { |
|
74 |
regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey()))); |
|
75 |
} |
|
76 |
htDescriptor.setRegions(regions); |
|
74 | 77 |
|
75 |
HBaseTableDescriptor htDescriptor = new HBaseTableDescriptor(); |
|
76 |
htDescriptor.setColumns(columns); |
|
78 |
if (log.isDebugEnabled()) { |
|
79 |
log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString()); |
|
80 |
} |
|
77 | 81 |
|
78 |
List<HBaseTableRegionInfo> regions = Lists.newArrayList(); |
|
79 |
|
|
80 |
for (HRegionInfo info : tableRegions) { |
|
81 |
regions.add(new HBaseTableRegionInfo(new String(info.getStartKey()), new String(info.getEndKey()))); |
|
82 |
return htDescriptor.toString(); |
|
82 | 83 |
} |
83 |
htDescriptor.setRegions(regions); |
|
84 |
|
|
85 |
if (log.isDebugEnabled()) { |
|
86 |
log.info("got configuration for table '" + tableName + "': " + htDescriptor.toString()); |
|
87 |
} |
|
88 |
|
|
89 |
return htDescriptor.toString(); |
|
90 | 84 |
} |
91 | 85 |
|
92 | 86 |
public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
93 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
94 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
95 |
return desc.getFamilies().stream() |
|
96 |
.map(d -> d.getNameAsString()) |
|
97 |
.collect(Collectors.toList()); |
|
87 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
88 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
89 |
return desc.getFamilies().stream() |
|
90 |
.map(d -> d.getNameAsString()) |
|
91 |
.collect(Collectors.toList()); |
|
92 |
} |
|
98 | 93 |
} |
99 | 94 |
|
100 | 95 |
public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
101 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
96 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
102 | 97 |
|
103 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table"); |
|
98 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table");
|
|
104 | 99 |
|
105 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
100 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
|
|
106 | 101 |
|
107 |
log.info("disabling table: " + table); |
|
108 |
admin.disableTable(table); |
|
102 |
log.info("disabling table: " + table);
|
|
103 |
admin.disableTable(table);
|
|
109 | 104 |
|
110 |
log.info("deleting table: " + table); |
|
111 |
admin.deleteTable(table); |
|
105 |
log.info("deleting table: " + table);
|
|
106 |
admin.deleteTable(table);
|
|
112 | 107 |
|
113 |
log.info("creating table: " + table); |
|
114 |
admin.createTable(desc); |
|
108 |
log.info("creating table: " + table); |
|
109 |
admin.createTable(desc); |
|
110 |
} |
|
115 | 111 |
} |
116 | 112 |
|
117 | 113 |
public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
118 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
114 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
119 | 115 |
|
120 |
return admin.tableExists(table); |
|
116 |
return admin.tableExists(table); |
|
117 |
} |
|
121 | 118 |
} |
122 | 119 |
|
123 | 120 |
public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
124 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
121 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
125 | 122 |
|
126 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'"); |
|
123 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'");
|
|
127 | 124 |
|
128 |
log.info("disabling table: " + table); |
|
129 |
admin.disableTable(table); |
|
125 |
log.info("disabling table: " + table);
|
|
126 |
admin.disableTable(table);
|
|
130 | 127 |
|
131 |
log.info("deleting table: " + table); |
|
132 |
admin.deleteTable(table); |
|
128 |
log.info("deleting table: " + table); |
|
129 |
admin.deleteTable(table); |
|
130 |
} |
|
133 | 131 |
} |
134 | 132 |
|
135 | 133 |
public void createTable(final ClusterName clusterName, final String table, final String tableConfiguration) throws IOException, HadoopServiceException { |
136 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
134 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
137 | 135 |
|
138 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
136 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
|
|
139 | 137 |
|
140 |
if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration"); |
|
138 |
if (StringUtils.isBlank(tableConfiguration)) throw new HadoopServiceException("empty table configuration");
|
|
141 | 139 |
|
142 |
final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration); |
|
140 |
final HBaseTableDescriptor tableConf = HBaseTableDescriptor.fromJSON(tableConfiguration);
|
|
143 | 141 |
|
144 |
doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions()); |
|
142 |
doCreateTable(clusterName, table, tableConf.getColumns(), tableConf.getRegions()); |
|
143 |
} |
|
145 | 144 |
} |
146 | 145 |
|
147 | 146 |
public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
148 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
147 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
149 | 148 |
|
150 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
149 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
|
|
151 | 150 |
|
152 |
doCreateTable(clusterName, table, columns, null); |
|
151 |
doCreateTable(clusterName, table, columns, null); |
|
152 |
} |
|
153 | 153 |
} |
154 | 154 |
|
155 | 155 |
public void doCreateTable(final ClusterName clusterName, final String table, final Set<String> columns, final List<HBaseTableRegionInfo> regions) |
156 | 156 |
throws IOException, HadoopServiceException { |
157 |
final HBaseAdmin admin = getHBaseAdmin(clusterName);
|
|
157 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) {
|
|
158 | 158 |
|
159 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
159 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists");
|
|
160 | 160 |
|
161 |
final HTableDescriptor desc = new HTableDescriptor(table); |
|
162 |
for (final String column : columns) { |
|
163 |
final HColumnDescriptor hds = new HColumnDescriptor(column); |
|
164 |
hds.setMaxVersions(getMaxVersions()); |
|
165 |
desc.addFamily(hds); |
|
166 |
} |
|
161 |
final HTableDescriptor desc = new HTableDescriptor(table);
|
|
162 |
for (final String column : columns) {
|
|
163 |
final HColumnDescriptor hds = new HColumnDescriptor(column);
|
|
164 |
hds.setMaxVersions(getMaxVersions());
|
|
165 |
desc.addFamily(hds);
|
|
166 |
}
|
|
167 | 167 |
|
168 |
log.info("creating hbase table: " + table); |
|
168 |
log.info("creating hbase table: " + table);
|
|
169 | 169 |
|
170 |
if (regions != null && !regions.isEmpty()) { |
|
171 |
log.debug(String.format("create using %s regions: %s", regions.size(), regions)); |
|
172 |
admin.createTable(desc, getSplitKeys(regions)); |
|
173 |
} else { |
|
174 |
admin.createTable(desc); |
|
170 |
if (regions != null && !regions.isEmpty()) { |
|
171 |
log.debug(String.format("create using %s regions: %s", regions.size(), regions)); |
|
172 |
admin.createTable(desc, getSplitKeys(regions)); |
|
173 |
} else { |
|
174 |
admin.createTable(desc); |
|
175 |
} |
|
176 |
|
|
177 |
log.info("created hbase table: [" + table + "]"); |
|
178 |
log.debug("descriptor: [" + desc.toString() + "]"); |
|
175 | 179 |
} |
176 |
|
|
177 |
log.info("created hbase table: [" + table + "]"); |
|
178 |
log.debug("descriptor: [" + desc.toString() + "]"); |
|
179 | 180 |
} |
180 | 181 |
|
181 | 182 |
private byte[][] getSplitKeys(final List<HBaseTableRegionInfo> regions) { |
... | ... | |
187 | 188 |
} |
188 | 189 |
|
189 | 190 |
public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
191 |
try(final HBaseAdmin admin = clients.getHbaseAdmin(clusterName)) { |
|
190 | 192 |
|
191 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
193 |
if (!admin.tableExists(table)) { |
|
194 |
createTable(clusterName, table, columns); |
|
195 |
} else { |
|
196 |
final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table)); |
|
192 | 197 |
|
193 |
if (!admin.tableExists(table)) { |
|
194 |
createTable(clusterName, table, columns); |
|
195 |
} else { |
|
196 |
final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table)); |
|
198 |
final Set<String> foundColumns = desc.getFamilies().stream() |
|
199 |
.map(d -> d.getNameAsString()) |
|
200 |
.collect(Collectors.toCollection(HashSet::new)); |
|
197 | 201 |
|
198 |
final Set<String> foundColumns = desc.getFamilies().stream()
|
|
199 |
.map(d -> d.getNameAsString())
|
|
200 |
.collect(Collectors.toCollection(HashSet::new));
|
|
202 |
log.info("ensuring columns on table " + table + ": " + columns);
|
|
203 |
final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
|
|
204 |
if (!missingColumns.isEmpty()) {
|
|
201 | 205 |
|
202 |
log.info("ensuring columns on table " + table + ": " + columns);
|
|
203 |
final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
|
|
204 |
if (!missingColumns.isEmpty()) {
|
|
206 |
if (admin.isTableEnabled(table)) {
|
|
207 |
admin.disableTable(table);
|
|
208 |
}
|
|
205 | 209 |
|
206 |
if (admin.isTableEnabled(table)) { |
|
207 |
admin.disableTable(table); |
|
208 |
} |
|
210 |
for (final String column : missingColumns) { |
|
211 |
log.info("hbase table: '" + table + "', adding column: " + column); |
|
212 |
admin.addColumn(table, new HColumnDescriptor(column)); |
|
213 |
} |
|
209 | 214 |
|
210 |
for (final String column : missingColumns) { |
|
211 |
log.info("hbase table: '" + table + "', adding column: " + column); |
|
212 |
admin.addColumn(table, new HColumnDescriptor(column)); |
|
215 |
admin.enableTable(table); |
|
213 | 216 |
} |
214 |
|
|
215 |
admin.enableTable(table); |
|
216 | 217 |
} |
217 | 218 |
} |
218 | 219 |
} |
... | ... | |
317 | 318 |
|
318 | 319 |
public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException { |
319 | 320 |
final Configuration conf = configurationEnumerator.get(clusterName); |
320 |
final HTable table = new HTable(conf, tableName); |
|
321 |
try { |
|
321 |
try(final HTable table = new HTable(conf, tableName)) { |
|
322 | 322 |
final ResultScanner rs = table.getScanner(scan); |
323 | 323 |
try { |
324 | 324 |
return Lists.newArrayList(rs.iterator()); |
325 | 325 |
} finally { |
326 | 326 |
rs.close(); |
327 | 327 |
} |
328 |
} finally { |
|
329 |
table.close(); |
|
330 | 328 |
} |
331 | 329 |
} |
332 | 330 |
|
... | ... | |
336 | 334 |
|
337 | 335 |
final Configuration conf = configurationEnumerator.get(clusterName); |
338 | 336 |
|
339 |
try { |
|
340 |
final FileSystem hdfs = FileSystem.get(conf); |
|
337 |
try(final FileSystem hdfs = FileSystem.get(conf)) { |
|
341 | 338 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
342 | 339 |
|
343 | 340 |
if (hdfs.exists(absolutePath)) { |
... | ... | |
360 | 357 |
|
361 | 358 |
final Configuration conf = configurationEnumerator.get(clusterName); |
362 | 359 |
|
363 |
try { |
|
364 |
final FileSystem hdfs = FileSystem.get(conf); |
|
360 |
try(final FileSystem hdfs = FileSystem.get(conf)) { |
|
365 | 361 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
366 | 362 |
if (!hdfs.exists(absolutePath)) { |
367 | 363 |
hdfs.mkdirs(absolutePath); |
... | ... | |
388 | 384 |
throw new HadoopServiceException("invalid empty path"); |
389 | 385 |
|
390 | 386 |
final Configuration conf = configurationEnumerator.get(clusterName); |
391 |
try { |
|
392 |
final FileSystem hdfs = FileSystem.get(conf); |
|
387 |
try(final FileSystem hdfs = FileSystem.get(conf)) { |
|
393 | 388 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
394 | 389 |
return hdfs.exists(absolutePath); |
395 | 390 |
} catch (IOException e) { |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieClientFactory.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.hadoop.oozie; |
2 | 2 |
|
3 |
import eu.dnetlib.data.hadoop.AbstractHadoopClient; |
|
4 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
5 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
3 | 6 |
import org.apache.commons.logging.Log; |
4 | 7 |
import org.apache.commons.logging.LogFactory; |
5 | 8 |
import org.apache.oozie.client.OozieClient; |
6 |
import org.springframework.beans.factory.annotation.Autowired; |
|
7 | 9 |
|
8 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
9 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
10 |
|
|
11 | 10 |
/** |
12 | 11 |
* Factory bean for Oozie client instances. |
13 | 12 |
* |
... | ... | |
16 | 15 |
* @author claudio |
17 | 16 |
* |
18 | 17 |
*/ |
19 |
public class OozieClientFactory { |
|
18 |
public class OozieClientFactory extends AbstractHadoopClient {
|
|
20 | 19 |
|
21 | 20 |
private static final String ENV_ATTRIBUTE_OOZIE_SERVICE_LOC = "oozie.service.loc"; |
22 | 21 |
|
23 | 22 |
private static final Log log = LogFactory.getLog(OozieClientFactory.class); // NOPMD by marko on 11/24/08 5:02 PM |
24 | 23 |
|
25 |
@Autowired |
|
26 |
private ConfigurationEnumerator configurationEnumerator; |
|
27 |
|
|
28 |
public OozieClient newInstance(ClusterName clusterName) { |
|
24 |
public OozieClient newInstance(ClusterName clusterName) throws HadoopServiceException { |
|
29 | 25 |
final String oozieServiceLocation = configurationEnumerator.get(clusterName).get(ENV_ATTRIBUTE_OOZIE_SERVICE_LOC); |
30 | 26 |
log.info("init oozie client, cluster: " + clusterName.toString() + ", oozie server: " + oozieServiceLocation); |
27 |
setHadoopUser(); |
|
31 | 28 |
try { |
32 | 29 |
return new OozieClient(oozieServiceLocation); |
33 | 30 |
} catch (Throwable e) { |
34 |
log.warn("unable to initialize oozie client for cluster: " + clusterName.toString(), e); |
|
35 |
return null; |
|
31 |
throw new HadoopServiceException("unable to initialize oozie client for cluster: " + clusterName.toString(), e); |
|
36 | 32 |
} |
37 | 33 |
} |
38 | 34 |
|
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/HadoopClientMap.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.hadoop; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
3 | 4 |
import java.util.Map; |
4 |
import java.util.concurrent.ExecutorService; |
|
5 |
import java.util.concurrent.Executors; |
|
6 |
import java.util.concurrent.TimeUnit; |
|
7 | 5 |
|
6 |
import eu.dnetlib.data.hadoop.mapred.JobClientFactory; |
|
7 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
8 | 8 |
import org.apache.commons.logging.Log; |
9 | 9 |
import org.apache.commons.logging.LogFactory; |
10 | 10 |
import org.apache.hadoop.hbase.client.HBaseAdmin; |
... | ... | |
20 | 20 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
21 | 21 |
import eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory; |
22 | 22 |
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory; |
23 |
import eu.dnetlib.data.mapreduce.JobClientFactory; |
|
24 | 23 |
|
24 |
|
|
25 | 25 |
public class HadoopClientMap { |
26 | 26 |
|
27 | 27 |
private static final Log log = LogFactory.getLog(HadoopClientMap.class); // NOPMD by marko on 11/24/08 5:02 PM |
... | ... | |
38 | 38 |
@Autowired |
39 | 39 |
private HBaseAdminFactory hbaseAdminFactory; |
40 | 40 |
|
41 |
private int clientsInitTime; |
|
42 |
|
|
43 | 41 |
private Map<String, Map<String, String>> enabledClients = Maps.newHashMap(); |
44 | 42 |
|
45 |
private final Map<ClusterName, HadoopClients> clients = Maps.newHashMap(); |
|
46 |
|
|
47 |
private final ExecutorService executor = Executors.newSingleThreadExecutor(); |
|
48 |
|
|
49 |
public void init() { |
|
50 |
log.info("clients conf: " + getEnabledClients()); |
|
51 |
executor.execute(new Runnable() { |
|
52 |
|
|
53 |
@Override |
|
54 |
public void run() { |
|
55 |
for (final String name : enabledClients.keySet()) { |
|
56 |
doInit(name); |
|
57 |
} |
|
58 |
} |
|
59 |
}); |
|
43 |
public boolean isMapreduceAvailable(final ClusterName name) { |
|
44 |
return isClientAvailable(name, "mapred"); |
|
60 | 45 |
} |
61 | 46 |
|
62 |
private void doInit(final String name) { |
|
63 |
try { |
|
64 |
log.info("initializing clients for hadoop cluster: " + name); |
|
65 |
final ClusterName clusterName = ClusterName.valueOf(name); |
|
66 |
|
|
67 |
final Map<String, String> clientsConf = enabledClients.get(name); |
|
68 |
|
|
69 |
final JobClient jobClient = Boolean.valueOf(clientsConf.get("mapred")) ? getJobClientFactory().newInstance(name) : null; |
|
70 |
final OozieClient oozieClient = Boolean.valueOf(clientsConf.get("oozie")) ? getOozieClientFactory().newInstance(clusterName) : null; |
|
71 |
final HBaseAdmin hbaseAdmin = Boolean.valueOf(clientsConf.get("hbase")) ? getHbaseAdminFactory().newInstance(clusterName) : null; |
|
72 |
|
|
73 |
clients.put(clusterName, new HadoopClients(jobClient, oozieClient, hbaseAdmin)); |
|
74 |
} catch (final Throwable e) { |
|
75 |
log.error("Error initializing hadoop client for cluster: " + name, e); |
|
76 |
throw new RuntimeException(e); |
|
77 |
} |
|
47 |
public boolean isOozieAvailable(final ClusterName name) { |
|
48 |
return isClientAvailable(name, "oozie"); |
|
78 | 49 |
} |
79 | 50 |
|
80 |
public boolean waitClients() throws InterruptedException {
|
|
81 |
return executor.awaitTermination(getClientsInitTime(), TimeUnit.SECONDS);
|
|
51 |
public boolean isHbaseAvailable(final ClusterName name) {
|
|
52 |
return isClientAvailable(name, "hbase");
|
|
82 | 53 |
} |
83 | 54 |
|
84 |
public JobClient getJtClient(final ClusterName name) { |
|
85 |
return getClients(name).getJtClient(); |
|
55 |
private boolean isClientAvailable(final ClusterName name, final String clientName) { |
|
56 |
final String clusterName = name.toString(); |
|
57 |
return enabledClients.containsKey(clusterName) && "true".equals(enabledClients.get(clusterName).get(clientName)); |
|
86 | 58 |
} |
87 | 59 |
|
88 |
public boolean isMapreduceAvailable(final ClusterName name) { |
|
89 |
return getClients(name).isMapredAvailable(); |
|
60 |
public JobClient getJtClient(final ClusterName clusterName, final String username) throws HadoopServiceException, IOException { |
|
61 |
if (!isMapreduceAvailable(clusterName)) { |
|
62 |
throw new HadoopServiceException("jobtracker is not available for cluster " + clusterName.toString()); |
|
63 |
} |
|
64 |
return getJobClientFactory().newInstance(clusterName, username); |
|
90 | 65 |
} |
91 | 66 |
|
92 |
public OozieClient getOozieClient(final ClusterName name) { |
|
93 |
return getClients(name).getOozieClient(); |
|
67 |
public JobClient getJtClient(final ClusterName clusterName) throws HadoopServiceException, IOException { |
|
68 |
if (!isMapreduceAvailable(clusterName)) { |
|
69 |
throw new HadoopServiceException("jobtracker is not available for cluster " + clusterName.toString()); |
|
70 |
} |
|
71 |
return getJobClientFactory().newInstance(clusterName); |
|
94 | 72 |
} |
95 | 73 |
|
96 |
public boolean isOozieAvailable(final ClusterName name) { |
|
97 |
return getClients(name).isOozieAvailable(); |
|
74 |
public OozieClient getOozieClient(final ClusterName name) throws HadoopServiceException { |
|
75 |
if (!isOozieAvailable(name)) { |
|
76 |
throw new HadoopServiceException("oozie is not available for cluster " + name.toString()); |
|
77 |
} |
|
78 |
return getOozieClientFactory().newInstance(name); |
|
98 | 79 |
} |
99 | 80 |
|
100 |
public HBaseAdmin getHbaseAdmin(final ClusterName name) { |
|
101 |
return getClients(name).getHbaseAdmin(); |
|
81 |
public HBaseAdmin getHbaseAdmin(final ClusterName name) throws HadoopServiceException { |
|
82 |
if (!isHbaseAvailable(name)) { |
|
83 |
throw new HadoopServiceException("hbase is not available for cluster " + name.toString()); |
|
84 |
} |
|
85 |
return getHbaseAdminFactory().newInstance(name); |
|
102 | 86 |
} |
103 | 87 |
|
104 |
public HadoopClients getClients(final ClusterName name) { |
|
105 |
final HadoopClients hadoopClients = clients.get(name); |
|
106 |
if (hadoopClients == null) throw new IllegalArgumentException("cluster " + name.toString() + " is currently disabled"); |
|
107 |
return hadoopClients; |
|
108 |
} |
|
109 |
|
|
110 | 88 |
// ////////// |
111 | 89 |
|
112 | 90 |
public String getEnabledClients() { |
... | ... | |
143 | 121 |
this.hbaseAdminFactory = hbaseAdminFactory; |
144 | 122 |
} |
145 | 123 |
|
146 |
public int getClientsInitTime() { |
|
147 |
return clientsInitTime; |
|
148 |
} |
|
149 |
|
|
150 |
@Required |
|
151 |
public void setClientsInitTime(int clientsInitTime) { |
|
152 |
this.clientsInitTime = clientsInitTime; |
|
153 |
} |
|
154 |
|
|
155 | 124 |
} |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/mapred/JobClientFactory.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.mapred; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import eu.dnetlib.data.hadoop.AbstractHadoopClient; |
|
6 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
7 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
8 |
import org.apache.hadoop.mapred.JobClient; |
|
9 |
|
|
10 |
/** |
|
11 |
* Factory bean for jobClient instances |
|
12 |
* |
|
13 |
* @author claudio |
|
14 |
* |
|
15 |
*/ |
|
16 |
public class JobClientFactory extends AbstractHadoopClient { |
|
17 |
|
|
18 |
public JobClient newInstance(final ClusterName clusterName, final String userName) throws IOException { |
|
19 |
setHadoopUser(userName); |
|
20 |
return new JobClient(configurationEnumerator.get(clusterName)); |
|
21 |
} |
|
22 |
|
|
23 |
public JobClient newInstance(final ClusterName clusterName) throws IOException, HadoopServiceException { |
|
24 |
setHadoopUser(); |
|
25 |
return new JobClient(configurationEnumerator.get(clusterName)); |
|
26 |
} |
|
27 |
|
|
28 |
} |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/mapred/MapreduceJobMonitor.java | ||
---|---|---|
9 | 9 |
import org.apache.hadoop.mapred.Counters; |
10 | 10 |
import org.apache.hadoop.mapred.Counters.Counter; |
11 | 11 |
import org.apache.hadoop.mapred.Counters.Group; |
12 |
import org.apache.hadoop.mapred.JobClient; |
|
12 | 13 |
import org.apache.hadoop.mapred.JobStatus; |
13 | 14 |
import org.apache.hadoop.mapred.RunningJob; |
14 | 15 |
|
... | ... | |
24 | 25 |
|
25 | 26 |
private final RunningJob runningJob; |
26 | 27 |
|
28 |
private final JobClient jobClient; |
|
29 |
|
|
27 | 30 |
public MapreduceJobMonitor(final RunningJob runningJob, final JobCompletion callback) { |
31 |
this(null, runningJob, callback); |
|
32 |
} |
|
33 |
|
|
34 |
public MapreduceJobMonitor(final JobClient jobClient, final RunningJob runningJob, final JobCompletion callback) { |
|
28 | 35 |
super(callback); |
29 | 36 |
this.runningJob = runningJob; |
37 |
this.jobClient = jobClient; |
|
30 | 38 |
} |
31 | 39 |
|
32 | 40 |
@Override |
... | ... | |
55 | 63 |
} |
56 | 64 |
} catch (final Throwable e) { |
57 | 65 |
getCallback().failed(getHadoopId(), e); |
66 |
} finally { |
|
67 |
try { |
|
68 |
if (jobClient != null) { |
|
69 |
jobClient.close(); |
|
70 |
} |
|
71 |
} catch (IOException e) { |
|
72 |
throw new RuntimeException("unable to close jobClient", e); |
|
73 |
} |
|
58 | 74 |
} |
59 | 75 |
} |
60 | 76 |
|
... | ... | |
124 | 140 |
} |
125 | 141 |
} |
126 | 142 |
|
143 |
public JobClient getJobClient() { |
|
144 |
return jobClient; |
|
145 |
} |
|
127 | 146 |
} |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/SubmitMapreduceJobAction.java | ||
---|---|---|
5 | 5 |
import java.util.Map; |
6 | 6 |
import java.util.Map.Entry; |
7 | 7 |
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
import org.apache.hadoop.conf.Configuration; |
|
11 |
import org.apache.hadoop.fs.Path; |
|
12 |
import org.apache.hadoop.mapred.JobConf; |
|
13 |
import org.apache.hadoop.mapred.RunningJob; |
|
14 |
|
|
15 | 8 |
import eu.dnetlib.data.hadoop.HadoopJob; |
16 | 9 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
17 | 10 |
import eu.dnetlib.data.hadoop.mapred.MapreduceJobMonitor; |
... | ... | |
21 | 14 |
import eu.dnetlib.data.hadoop.utils.ScanProperties; |
22 | 15 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
23 | 16 |
import eu.dnetlib.miscutils.functional.xml.IndentXmlString; |
17 |
import org.apache.commons.logging.Log; |
|
18 |
import org.apache.commons.logging.LogFactory; |
|
19 |
import org.apache.hadoop.conf.Configuration; |
|
20 |
import org.apache.hadoop.fs.Path; |
|
21 |
import org.apache.hadoop.mapred.JobClient; |
|
22 |
import org.apache.hadoop.mapred.JobConf; |
|
23 |
import org.apache.hadoop.mapred.RunningJob; |
|
24 | 24 |
|
25 | 25 |
public class SubmitMapreduceJobAction extends AbstractSubmitAction { |
26 | 26 |
|
... | ... | |
38 | 38 |
try { |
39 | 39 |
final JobConf jobConf = prepareJob(getConf(clusterName), jobName, jobProfile, bbJob.getParameters()); |
40 | 40 |
|
41 |
if (!hadoopClientMap.isMapreduceAvailable(clusterName)) |
|
42 |
throw new HadoopServiceException("mapreduce not available for cluster: " + clusterName.toString()); |
|
43 |
|
|
44 | 41 |
logJobDetails(jobConf); |
45 | 42 |
|
46 |
final RunningJob runningJob = hadoopClientMap.getJtClient(clusterName).submitJob(jobConf); |
|
43 |
final JobClient jtClient = hadoopClientMap.getJtClient(clusterName); |
|
44 |
final RunningJob runningJob = jtClient.submitJob(jobConf); |
|
45 |
|
|
47 | 46 |
final String jobId = newJobId(clusterName, runningJob.getID().getId()); |
48 | 47 |
|
49 |
jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
|
|
50 |
new MapreduceJobMonitor(runningJob, callback)));
|
|
48 |
jobRegistry.registerJob( |
|
49 |
HadoopJob.newInstance(jobId, clusterName, jobProfile, new MapreduceJobMonitor(jtClient, runningJob, callback)));
|
|
51 | 50 |
|
52 | 51 |
} catch (final IOException e) { |
53 | 52 |
throw new HadoopServiceException("error executing hadoop job: " + jobName, e); |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/AbstractHadoopClient.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
4 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
5 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.springframework.beans.factory.annotation.Autowired; |
|
9 |
|
|
10 |
public abstract class AbstractHadoopClient { |
|
11 |
|
|
12 |
private static final Log log = LogFactory.getLog(AbstractHadoopClient.class); |
|
13 |
|
|
14 |
@Autowired |
|
15 |
private ISClient isClient; |
|
16 |
|
|
17 |
@Autowired |
|
18 |
protected ConfigurationEnumerator configurationEnumerator; |
|
19 |
|
|
20 |
protected void setHadoopUser() throws HadoopServiceException { |
|
21 |
setHadoopUser(getDefaultUser()); |
|
22 |
} |
|
23 |
|
|
24 |
private String getDefaultUser() throws HadoopServiceException { |
|
25 |
try { |
|
26 |
return isClient.queryForServiceProperty("default.hadoop.user"); |
|
27 |
} catch (ISLookUpException e) { |
|
28 |
throw new HadoopServiceException(e); |
|
29 |
} |
|
30 |
} |
|
31 |
|
|
32 |
protected void setHadoopUser(final String userName) { |
|
33 |
log.info("setting HADOOP_USER_NAME as " + userName); |
|
34 |
System.setProperty("HADOOP_USER_NAME", userName); |
|
35 |
} |
|
36 |
|
|
37 |
} |
modules/dnet-hadoop-service/trunk/src/main/resources/eu/dnetlib/data/hadoop/applicationContext-dnet-hadoop-service.properties | ||
---|---|---|
3 | 3 |
services.hadoop.jobregistry.size=100 |
4 | 4 |
services.hadoop.lib.path=/user/dnet/lib/dnet-mapreduce-jobs-assembly-0.0.6.2-SNAPSHOT.jar |
5 | 5 |
services.hadoop.hbase.maxversions=1 |
6 |
services.hadoop.clients.init.timeout=30 |
|
6 |
services.hadoop.clients.init.timeout=30 |
|
7 |
services.hadoop.user=dnet |
modules/dnet-hadoop-service/trunk/src/main/resources/eu/dnetlib/data/hadoop/applicationContext-dnet-hadoop-service.xml | ||
---|---|---|
79 | 79 |
<bean id="isClient" class="eu.dnetlib.data.hadoop.ISClient" /> |
80 | 80 |
|
81 | 81 |
<bean id="hadoopClientMap" class="eu.dnetlib.data.hadoop.HadoopClientMap" |
82 |
init-method="init" p:enabledClients="${services.hadoop.clients}" |
|
83 |
p:clientsInitTime="${services.hadoop.clients.init.timeout}"/> |
|
82 |
p:enabledClients="${services.hadoop.clients}" /> |
|
84 | 83 |
|
85 | 84 |
<bean id="oozieClientFactory" class="eu.dnetlib.data.hadoop.oozie.OozieClientFactory" /> |
86 | 85 |
|
87 |
<bean id="hbaseAdminFactory" class="eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory" /> |
|
86 |
<bean id="hbaseAdminFactory" class="eu.dnetlib.data.hadoop.hbase.HBaseAdminFactory" /> |
|
87 |
|
|
88 |
<bean id="jobClientFactory" class="eu.dnetlib.data.hadoop.mapred.JobClientFactory" /> |
|
88 | 89 |
|
89 | 90 |
<bean id="hadoopJobRegistry" class="eu.dnetlib.data.hadoop.JobRegistry" |
90 | 91 |
p:maxJobs="${services.hadoop.jobregistry.size}"/> |
... | ... | |
110 | 111 |
<property name="serviceProperties"> |
111 | 112 |
<map> |
112 | 113 |
<entry key="defaultLibPath" value="${services.hadoop.lib.path}" /> |
114 |
<entry key="default.hadoop.user" value="${services.hadoop.user}" /> |
|
113 | 115 |
</map> |
114 | 116 |
</property> |
115 | 117 |
</bean> |
modules/dnet-hadoop-service/trunk/pom.xml | ||
---|---|---|
9 | 9 |
<groupId>eu.dnetlib</groupId> |
10 | 10 |
<artifactId>dnet-hadoop-service</artifactId> |
11 | 11 |
<packaging>jar</packaging> |
12 |
<version>2.6.3-SNAPSHOT</version>
|
|
12 |
<version>2.7.3-SNAPSHOT</version>
|
|
13 | 13 |
<scm> |
14 | 14 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-hadoop-service/trunk</developerConnection> |
15 | 15 |
</scm> |
... | ... | |
58 | 58 |
</dependency> |
59 | 59 |
<dependency> |
60 | 60 |
<groupId>eu.dnetlib</groupId> |
61 |
<artifactId>dnet-hadoop-commons</artifactId> |
|
62 |
<version>[2.0.0,3.0.0)</version> |
|
63 |
</dependency> |
|
64 |
<!-- |
|
65 |
<dependency> |
|
66 |
<groupId>eu.dnetlib</groupId> |
|
61 | 67 |
<artifactId>dnet-mapreduce-submitter</artifactId> |
62 | 68 |
<version>[3.0.0,4.0.0)</version> |
63 | 69 |
</dependency> |
70 |
--> |
|
64 | 71 |
<dependency> |
65 | 72 |
<groupId>org.apache.hbase</groupId> |
66 | 73 |
<artifactId>hbase</artifactId> |
Also available in: Unified diff
lazy init of hadoop clients allow to define the user for HBase admin and jtClient once. The user is available as service property