Revision 39732
Added by Claudio Atzori over 8 years ago
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/pom.xml | ||
---|---|---|
1 |
<?xml version="1.0" encoding="UTF-8"?> |
|
2 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> |
|
3 |
<parent> |
|
4 |
<groupId>eu.dnetlib</groupId> |
|
5 |
<artifactId>dnet-parent</artifactId> |
|
6 |
<version>1.0.0</version> |
|
7 |
</parent> |
|
8 |
<modelVersion>4.0.0</modelVersion> |
|
9 |
<groupId>eu.dnetlib</groupId> |
|
10 |
<artifactId>dnet-hadoop-service</artifactId> |
|
11 |
<packaging>jar</packaging> |
|
12 |
<version>2.4.1</version> |
|
13 |
<scm> |
|
14 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet40/modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1</developerConnection> |
|
15 |
</scm> |
|
16 |
<repositories> |
|
17 |
<!-- Cloudera Repositories --> |
|
18 |
<repository> |
|
19 |
<snapshots> |
|
20 |
<enabled>false</enabled> |
|
21 |
</snapshots> |
|
22 |
<id>cloudera-central</id> |
|
23 |
<name>cloundera-libs-release</name> |
|
24 |
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-central</url> |
|
25 |
</repository> |
|
26 |
<repository> |
|
27 |
<id>cloudera-snapshots</id> |
|
28 |
<name>cloudera-libs-snapshot</name> |
|
29 |
<url>http://maven.research-infrastructures.eu/nexus/content/repositories/cloudera-snapshots</url> |
|
30 |
</repository> |
|
31 |
</repositories> |
|
32 |
<dependencies> |
|
33 |
<dependency> |
|
34 |
<groupId>eu.dnetlib</groupId> |
|
35 |
<artifactId>dnet-hadoop-service-rmi</artifactId> |
|
36 |
<version>[1.0.0,2.0.0)</version> |
|
37 |
</dependency> |
|
38 |
<dependency> |
|
39 |
<groupId>com.google.code.gson</groupId> |
|
40 |
<artifactId>gson</artifactId> |
|
41 |
<version>${google.gson.version}</version> |
|
42 |
</dependency> |
|
43 |
<dependency> |
|
44 |
<groupId>eu.dnetlib</groupId> |
|
45 |
<artifactId>cnr-blackboard-common</artifactId> |
|
46 |
<version>[2.1.0,3.0.0)</version> |
|
47 |
</dependency> |
|
48 |
<dependency> |
|
49 |
<groupId>eu.dnetlib</groupId> |
|
50 |
<artifactId>cnr-resultset-client</artifactId> |
|
51 |
<version>[2.0.0,3.0.0)</version> |
|
52 |
<exclusions> |
|
53 |
<exclusion> |
|
54 |
<artifactId>slf4j-api</artifactId> |
|
55 |
<groupId>org.slf4j</groupId> |
|
56 |
</exclusion> |
|
57 |
</exclusions> |
|
58 |
</dependency> |
|
59 |
<dependency> |
|
60 |
<groupId>eu.dnetlib</groupId> |
|
61 |
<artifactId>dnet-mapreduce-submitter</artifactId> |
|
62 |
<version>[2.0.0,3.0.0)</version> |
|
63 |
</dependency> |
|
64 |
<dependency> |
|
65 |
<groupId>org.apache.hbase</groupId> |
|
66 |
<artifactId>hbase</artifactId> |
|
67 |
<version>${apache.hbase.version}</version> |
|
68 |
<exclusions> |
|
69 |
<exclusion> |
|
70 |
<artifactId>jsp-api-2.1</artifactId> |
|
71 |
<groupId>org.mortbay.jetty</groupId> |
|
72 |
</exclusion> |
|
73 |
<exclusion> |
|
74 |
<artifactId>servlet-api-2.5</artifactId> |
|
75 |
<groupId>org.mortbay.jetty</groupId> |
|
76 |
</exclusion> |
|
77 |
<exclusion> |
|
78 |
<artifactId>slf4j-api</artifactId> |
|
79 |
<groupId>org.slf4j</groupId> |
|
80 |
</exclusion> |
|
81 |
<exclusion> |
|
82 |
<artifactId>slf4j-log4j12</artifactId> |
|
83 |
<groupId>org.slf4j</groupId> |
|
84 |
</exclusion> |
|
85 |
<exclusion> |
|
86 |
<artifactId>stax-api</artifactId> |
|
87 |
<groupId>stax</groupId> |
|
88 |
</exclusion> |
|
89 |
<exclusion> |
|
90 |
<artifactId>httpclient</artifactId> |
|
91 |
<groupId>org.apache.httpcomponents</groupId> |
|
92 |
</exclusion> |
|
93 |
<exclusion> |
|
94 |
<artifactId>guava</artifactId> |
|
95 |
<groupId>com.google.guava</groupId> |
|
96 |
</exclusion> |
|
97 |
</exclusions> |
|
98 |
</dependency> |
|
99 |
<dependency> |
|
100 |
<groupId>org.apache.oozie</groupId> |
|
101 |
<artifactId>oozie-client</artifactId> |
|
102 |
<version>${apache.oozie.version}</version> |
|
103 |
<exclusions> |
|
104 |
<exclusion> |
|
105 |
<artifactId>slf4j-simple</artifactId> |
|
106 |
<groupId>org.slf4j</groupId> |
|
107 |
</exclusion> |
|
108 |
<exclusion> |
|
109 |
<artifactId>slf4j-api</artifactId> |
|
110 |
<groupId>org.slf4j</groupId> |
|
111 |
</exclusion> |
|
112 |
<exclusion> |
|
113 |
<artifactId>xercesImpl</artifactId> |
|
114 |
<groupId>xerces</groupId> |
|
115 |
</exclusion> |
|
116 |
</exclusions> |
|
117 |
</dependency> |
|
118 |
<dependency> |
|
119 |
<groupId>junit</groupId> |
|
120 |
<artifactId>junit</artifactId> |
|
121 |
<version>${junit.version}</version> |
|
122 |
<scope>test</scope> |
|
123 |
</dependency> |
|
124 |
<dependency> |
|
125 |
<groupId>org.apache.logging.log4j</groupId> |
|
126 |
<artifactId>log4j-slf4j-impl</artifactId> |
|
127 |
<version>2.0.2</version> |
|
128 |
<scope>test</scope> |
|
129 |
</dependency> |
|
130 |
<dependency> |
|
131 |
<groupId>org.apache.httpcomponents</groupId> |
|
132 |
<artifactId>httpclient</artifactId> |
|
133 |
<version>4.3.1</version> |
|
134 |
</dependency> |
|
135 |
</dependencies> |
|
136 |
</project> |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.net.URI; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Map; |
|
7 |
import java.util.Map.Entry; |
|
8 |
import java.util.NavigableMap; |
|
9 |
import java.util.Set; |
|
10 |
|
|
11 |
import com.google.common.base.Function; |
|
12 |
import com.google.common.collect.Iterables; |
|
13 |
import com.google.common.collect.Lists; |
|
14 |
import com.google.common.collect.Maps; |
|
15 |
import com.google.common.collect.Sets; |
|
16 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
17 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
18 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
19 |
import eu.dnetlib.data.hadoop.rmi.hbase.Column; |
|
20 |
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor; |
|
21 |
import eu.dnetlib.data.hadoop.utils.HadoopUtils; |
|
22 |
import org.apache.commons.lang.StringUtils; |
|
23 |
import org.apache.commons.logging.Log; |
|
24 |
import org.apache.commons.logging.LogFactory; |
|
25 |
import org.apache.hadoop.conf.Configuration; |
|
26 |
import org.apache.hadoop.fs.FileSystem; |
|
27 |
import org.apache.hadoop.fs.Path; |
|
28 |
import org.apache.hadoop.hbase.HColumnDescriptor; |
|
29 |
import org.apache.hadoop.hbase.HTableDescriptor; |
|
30 |
import org.apache.hadoop.hbase.client.*; |
|
31 |
import org.apache.hadoop.hbase.util.Bytes; |
|
32 |
import org.springframework.beans.factory.annotation.Autowired; |
|
33 |
import org.springframework.beans.factory.annotation.Required; |
|
34 |
|
|
35 |
public class HadoopServiceCore { |
|
36 |
|
|
37 |
private static final Log log = LogFactory.getLog(HadoopServiceCore.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
38 |
@Autowired |
|
39 |
protected ConfigurationEnumerator configurationEnumerator; |
|
40 |
@Autowired |
|
41 |
private HadoopClientMap clients; |
|
42 |
private int maxVersions; |
|
43 |
|
|
44 |
public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException { |
|
45 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
46 |
return Lists.newArrayList(Iterables.transform(Lists.newArrayList(admin.listTables()), HadoopUtils.tableName())); |
|
47 |
} |
|
48 |
|
|
49 |
private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException { |
|
50 |
final HBaseAdmin admin = clients.getHbaseAdmin(clusterName); |
|
51 |
|
|
52 |
if (admin == null) throw new HadoopServiceException(String.format("HBase admin not available for cluster: '%s'", clusterName.toString())); |
|
53 |
|
|
54 |
return admin; |
|
55 |
} |
|
56 |
|
|
57 |
public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
58 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
59 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
60 |
|
|
61 |
return Lists.newArrayList(Iterables.transform(desc.getFamilies(), new Function<HColumnDescriptor, String>() { |
|
62 |
|
|
63 |
@Override |
|
64 |
public String apply(final HColumnDescriptor desc) { |
|
65 |
return desc.getNameAsString(); |
|
66 |
} |
|
67 |
})); |
|
68 |
} |
|
69 |
|
|
70 |
public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
71 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
72 |
|
|
73 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot truncate unexisting table"); |
|
74 |
|
|
75 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
|
76 |
|
|
77 |
log.info("disabling table: " + table); |
|
78 |
admin.disableTable(table); |
|
79 |
|
|
80 |
log.info("deleting table: " + table); |
|
81 |
admin.deleteTable(table); |
|
82 |
|
|
83 |
log.info("creating table: " + table); |
|
84 |
admin.createTable(desc); |
|
85 |
} |
|
86 |
|
|
87 |
public boolean existTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
88 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
89 |
|
|
90 |
return admin.tableExists(table); |
|
91 |
} |
|
92 |
|
|
93 |
public void dropTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
|
94 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
95 |
|
|
96 |
if (!admin.tableExists(table)) throw new IllegalStateException("cannot drop unexisting table: '" + table + "'"); |
|
97 |
|
|
98 |
log.info("disabling table: " + table); |
|
99 |
admin.disableTable(table); |
|
100 |
|
|
101 |
log.info("deleting table: " + table); |
|
102 |
admin.deleteTable(table); |
|
103 |
} |
|
104 |
|
|
105 |
public void createTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
|
106 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
107 |
|
|
108 |
if (admin.tableExists(table)) throw new IllegalStateException("table already exists"); |
|
109 |
|
|
110 |
final HTableDescriptor desc = new HTableDescriptor(table); |
|
111 |
for (final String column : columns) { |
|
112 |
final HColumnDescriptor hds = new HColumnDescriptor(column); |
|
113 |
hds.setMaxVersions(getMaxVersions()); |
|
114 |
desc.addFamily(hds); |
|
115 |
} |
|
116 |
log.info("creating hbase table: " + table); |
|
117 |
|
|
118 |
admin.createTable(desc); |
|
119 |
|
|
120 |
log.info("created hbase table: [" + table + "]"); |
|
121 |
log.debug("descriptor: [" + desc.toString() + "]"); |
|
122 |
} |
|
123 |
|
|
124 |
public void ensureTable(final ClusterName clusterName, final String table, final Set<String> columns) throws IOException, HadoopServiceException { |
|
125 |
|
|
126 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
|
127 |
|
|
128 |
if (!admin.tableExists(table)) { |
|
129 |
createTable(clusterName, table, columns); |
|
130 |
} else { |
|
131 |
final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table)); |
|
132 |
final Set<String> foundColumns = Sets.newHashSet(Iterables.transform(Lists.newArrayList(desc.getColumnFamilies()), HadoopUtils.columnName())); |
|
133 |
|
|
134 |
log.info("ensuring columns on table " + table + ": " + columns); |
|
135 |
final List<String> missingColumns = Lists.newArrayList(Iterables.filter(columns, HadoopUtils.columnPredicate(foundColumns))); |
|
136 |
if (!missingColumns.isEmpty()) { |
|
137 |
|
|
138 |
if (admin.isTableEnabled(table)) { |
|
139 |
admin.disableTable(table); |
|
140 |
} |
|
141 |
|
|
142 |
for (final String column : missingColumns) { |
|
143 |
log.info("hbase table: '" + table + "', adding column: " + column); |
|
144 |
admin.addColumn(table, new HColumnDescriptor(column)); |
|
145 |
} |
|
146 |
|
|
147 |
admin.enableTable(table); |
|
148 |
} |
|
149 |
} |
|
150 |
} |
|
151 |
|
|
152 |
public void writeOnHBase(final ClusterName clusterName, final String tableName, final List<Put> puts) throws IOException { |
|
153 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
154 |
final HTable table = new HTable(conf, tableName); |
|
155 |
|
|
156 |
try { |
|
157 |
table.put(puts); |
|
158 |
} finally { |
|
159 |
table.flushCommits(); |
|
160 |
table.close(); |
|
161 |
} |
|
162 |
} |
|
163 |
|
|
164 |
public void deleteFromHBase(final ClusterName clusterName, final String tableName, final List<Delete> deletes) throws IOException { |
|
165 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
166 |
final HTable table = new HTable(conf, tableName); |
|
167 |
try { |
|
168 |
table.delete(deletes); |
|
169 |
} finally { |
|
170 |
table.flushCommits(); |
|
171 |
table.close(); |
|
172 |
} |
|
173 |
} |
|
174 |
|
|
175 |
public void deleteColumnsFromHBase(final ClusterName clusterName, final String tableName, final List<HBaseRowDescriptor> columns) throws IOException { |
|
176 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
177 |
final HTable table = new HTable(conf, tableName); |
|
178 |
try { |
|
179 |
final List<Delete> deletes = Lists.newArrayList(); |
|
180 |
for(HBaseRowDescriptor desc : columns) { |
|
181 |
|
|
182 |
final Delete d = new Delete(Bytes.toBytes(desc.getRowKey())); |
|
183 |
|
|
184 |
for(Column c : desc.getColumns()) { |
|
185 |
for(String qualifier : c.getQualifier()) { |
|
186 |
log.info(String.format("delete from row '%s' cf '%s::%s'", desc.getRowKey(), c.getFamily(), qualifier)); |
|
187 |
d.deleteColumn(Bytes.toBytes(c.getFamily()), Bytes.toBytes(qualifier)); |
|
188 |
} |
|
189 |
} |
|
190 |
|
|
191 |
deletes.add(d); |
|
192 |
} |
|
193 |
table.delete(deletes); |
|
194 |
|
|
195 |
} finally { |
|
196 |
table.flushCommits(); |
|
197 |
table.close(); |
|
198 |
} |
|
199 |
} |
|
200 |
|
|
201 |
public Result getRow(final ClusterName clusterName, final String tableName, final byte[] id) throws IOException { |
|
202 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
203 |
final HTable table = new HTable(conf, tableName); |
|
204 |
try { |
|
205 |
return table.get(new Get(id)); |
|
206 |
} finally { |
|
207 |
table.close(); |
|
208 |
} |
|
209 |
} |
|
210 |
|
|
211 |
public Map<String, HBaseRowDescriptor> describeRows(final ClusterName clusterName, final String tableName, final List<String> rowKeys) throws IOException { |
|
212 |
final Map<String, HBaseRowDescriptor> map = Maps.newHashMap(); |
|
213 |
for(String rowKey : rowKeys) { |
|
214 |
map.put(rowKey, describeRow(clusterName, tableName, rowKey)); |
|
215 |
} |
|
216 |
return map; |
|
217 |
} |
|
218 |
|
|
219 |
public HBaseRowDescriptor describeRow(final ClusterName clusterName, final String tableName, final String rowKey) throws IOException { |
|
220 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
221 |
final HTable table = new HTable(conf, tableName); |
|
222 |
|
|
223 |
final HBaseRowDescriptor desc = new HBaseRowDescriptor(); |
|
224 |
|
|
225 |
try { |
|
226 |
final Result r = table.get(new Get(Bytes.toBytes(rowKey))); |
|
227 |
|
|
228 |
if (r.isEmpty()) { |
|
229 |
return desc; |
|
230 |
} |
|
231 |
|
|
232 |
final List<Column> columns = Lists.newArrayList(); |
|
233 |
|
|
234 |
for(Entry<byte[], NavigableMap<byte[], byte[]>> e : r.getNoVersionMap().entrySet()) { |
|
235 |
final Set<byte[]> qualifiers = e.getValue().keySet(); |
|
236 |
final String family = new String(e.getKey()); |
|
237 |
final Column col = new Column(family); |
|
238 |
|
|
239 |
for(byte[] q : qualifiers) { |
|
240 |
String qs = new String(q); |
|
241 |
col.getQualifier().add(qs); |
|
242 |
} |
|
243 |
columns.add(col); |
|
244 |
} |
|
245 |
desc.setColumns(columns); |
|
246 |
desc.setRowKey(rowKey); |
|
247 |
|
|
248 |
return desc; |
|
249 |
} finally { |
|
250 |
table.close(); |
|
251 |
} |
|
252 |
} |
|
253 |
|
|
254 |
public List<Result> getRows(final ClusterName clusterName, final String tableName, final Scan scan) throws IOException { |
|
255 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
256 |
final HTable table = new HTable(conf, tableName); |
|
257 |
try { |
|
258 |
final ResultScanner rs = table.getScanner(scan); |
|
259 |
try { |
|
260 |
return Lists.newArrayList(rs.iterator()); |
|
261 |
} finally { |
|
262 |
rs.close(); |
|
263 |
} |
|
264 |
} finally { |
|
265 |
table.close(); |
|
266 |
} |
|
267 |
} |
|
268 |
|
|
269 |
public boolean deleteFromHdfs(final ClusterName clusterName, final String path) throws HadoopServiceException { |
|
270 |
if (StringUtils.isBlank(path)) |
|
271 |
throw new HadoopServiceException("Cannot deleteFromHBase an empty HDFS path."); |
|
272 |
|
|
273 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
274 |
|
|
275 |
try { |
|
276 |
final FileSystem hdfs = FileSystem.get(conf); |
|
277 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
|
278 |
|
|
279 |
if (hdfs.exists(absolutePath)) { |
|
280 |
log.debug("deleteFromHBase path: " + absolutePath.toString()); |
|
281 |
hdfs.delete(absolutePath, true); |
|
282 |
log.info("deleted path: " + absolutePath.toString()); |
|
283 |
return true; |
|
284 |
} else { |
|
285 |
log.warn("cannot deleteFromHBase unexisting path: " + absolutePath.toString()); |
|
286 |
return false; |
|
287 |
} |
|
288 |
} catch (IOException e) { |
|
289 |
throw new HadoopServiceException(e); |
|
290 |
} |
|
291 |
} |
|
292 |
|
|
293 |
public boolean createHdfsDir(final ClusterName clusterName, final String path, final boolean force) throws HadoopServiceException { |
|
294 |
if (StringUtils.isBlank(path)) |
|
295 |
throw new HadoopServiceException("Cannot create an empty HDFS path."); |
|
296 |
|
|
297 |
final Configuration conf = configurationEnumerator.get(clusterName); |
|
298 |
|
|
299 |
try { |
|
300 |
final FileSystem hdfs = FileSystem.get(conf); |
|
301 |
final Path absolutePath = new Path(URI.create(conf.get("fs.defaultFS") + path)); |
|
302 |
if (!hdfs.exists(absolutePath)) { |
|
303 |
hdfs.mkdirs(absolutePath); |
|
304 |
log.info("created path: " + absolutePath.toString()); |
|
305 |
return true; |
|
306 |
} else if (force) { |
|
307 |
log.info(String.format("found directory '%s', force delete it", absolutePath.toString())); |
|
308 |
hdfs.delete(absolutePath, true); |
|
309 |
|
|
310 |
hdfs.mkdirs(absolutePath); |
|
311 |
log.info("created path: " + absolutePath.toString()); |
|
312 |
return true; |
|
313 |
} else { |
|
314 |
log.info(String.format("directory already exists: '%s', nothing to do", absolutePath.toString())); |
|
315 |
return false; |
|
316 |
} |
|
317 |
} catch (IOException e) { |
|
318 |
throw new HadoopServiceException(e); |
|
319 |
} |
|
320 |
} |
|
321 |
|
|
322 |
public Configuration getClusterConiguration(final ClusterName clusterName) { |
|
323 |
return configurationEnumerator.get(clusterName); |
|
324 |
} |
|
325 |
|
|
326 |
public int getMaxVersions() { |
|
327 |
return maxVersions; |
|
328 |
} |
|
329 |
|
|
330 |
@Required |
|
331 |
public void setMaxVersions(final int maxVersions) { |
|
332 |
this.maxVersions = maxVersions; |
|
333 |
} |
|
334 |
|
|
335 |
public HadoopClientMap getClients() { |
|
336 |
return clients; |
|
337 |
} |
|
338 |
|
|
339 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/test/java/eu/dnetlib/data/hadoop/hbase/HBaseTestContextConfiguration.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import org.springframework.context.annotation.Bean; |
|
4 |
import org.springframework.context.annotation.Configuration; |
|
5 |
import org.springframework.context.annotation.Profile; |
|
6 |
import org.springframework.core.io.ClassPathResource; |
|
7 |
import org.springframework.core.io.Resource; |
|
8 |
|
|
9 |
import eu.dnetlib.data.hadoop.HadoopClientMap; |
|
10 |
import eu.dnetlib.data.hadoop.HadoopServiceCore; |
|
11 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
12 |
import eu.dnetlib.data.hadoop.config.ConfigurationFactory; |
|
13 |
import eu.dnetlib.data.hadoop.oozie.OozieClientFactory; |
|
14 |
import eu.dnetlib.data.mapreduce.JobClientFactory; |
|
15 |
|
|
16 |
@Configuration |
|
17 |
@Profile(value = "test") |
|
18 |
public class HBaseTestContextConfiguration { |
|
19 |
|
|
20 |
public static final String ENABLED_CLIENTS = "{" |
|
21 |
+ "\"DM\":{\"oozie\":\"false\",\"mapred\":\"false\",\"hbase\":\"true\"}," |
|
22 |
+ "\"IIS\":{\"oozie\":\"false\",\"mapred\":\"false\",\"hbase\":\"false\"}" |
|
23 |
+ "}"; |
|
24 |
|
|
25 |
public static final int MAX_VERSIONS = 10; |
|
26 |
|
|
27 |
@Bean |
|
28 |
public HadoopServiceCore hadoopServiceCore() { |
|
29 |
final HadoopServiceCore core = new HadoopServiceCore(); |
|
30 |
|
|
31 |
core.setMaxVersions(MAX_VERSIONS); |
|
32 |
|
|
33 |
System.out.println("using hbase max versions: " + MAX_VERSIONS); |
|
34 |
return core; |
|
35 |
} |
|
36 |
|
|
37 |
@Bean(initMethod = "init") |
|
38 |
public HadoopClientMap hadoopClientMap() throws InterruptedException { |
|
39 |
final HadoopClientMap clientMap = new HadoopClientMap(); |
|
40 |
clientMap.setEnabledClients(ENABLED_CLIENTS); |
|
41 |
clientMap.setClientsInitTime(10); |
|
42 |
|
|
43 |
return clientMap; |
|
44 |
} |
|
45 |
|
|
46 |
@Bean |
|
47 |
public HBaseAdminFactory hBaseAdminFactory() { |
|
48 |
return new HBaseAdminFactory(); |
|
49 |
} |
|
50 |
|
|
51 |
@Bean |
|
52 |
public OozieClientFactory oozieClientFactory() { |
|
53 |
return new OozieClientFactory(); |
|
54 |
} |
|
55 |
|
|
56 |
@Bean |
|
57 |
public JobClientFactory jobClientFactory() { |
|
58 |
return new JobClientFactory(); |
|
59 |
} |
|
60 |
|
|
61 |
@Bean |
|
62 |
public ConfigurationEnumerator configurationEnumerator() { |
|
63 |
return new ConfigurationEnumerator(); |
|
64 |
} |
|
65 |
|
|
66 |
@Bean |
|
67 |
public ConfigurationFactory DM() { |
|
68 |
return get(new ClassPathResource("/eu/dnetlib/data/hadoop/config/hadoop-default.dm.cnr.properties")); |
|
69 |
} |
|
70 |
|
|
71 |
@Bean |
|
72 |
public ConfigurationFactory IIS() { |
|
73 |
return get(new ClassPathResource("/eu/dnetlib/data/hadoop/config/hadoop-default.iis.icm.properties")); |
|
74 |
} |
|
75 |
|
|
76 |
protected ConfigurationFactory get(final Resource props) { |
|
77 |
final ConfigurationFactory configurationFactory = new ConfigurationFactory(); |
|
78 |
configurationFactory.setDefaults(props); |
|
79 |
return configurationFactory; |
|
80 |
} |
|
81 |
|
|
82 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieJobMonitor.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.oozie; |
|
2 |
|
|
3 |
import java.util.Date; |
|
4 |
import java.util.HashMap; |
|
5 |
|
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.apache.oozie.client.OozieClient; |
|
9 |
import org.apache.oozie.client.OozieClientException; |
|
10 |
import org.apache.oozie.client.WorkflowJob.Status; |
|
11 |
|
|
12 |
import eu.dnetlib.data.hadoop.action.JobCompletion; |
|
13 |
import eu.dnetlib.data.hadoop.action.JobMonitor; |
|
14 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
15 |
|
|
16 |
public class OozieJobMonitor extends JobMonitor { |
|
17 |
|
|
18 |
private static final Log log = LogFactory.getLog(JobMonitor.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
19 |
|
|
20 |
private final OozieClient oozieClient; |
|
21 |
|
|
22 |
private final String jobId; |
|
23 |
|
|
24 |
public OozieJobMonitor(OozieClient oozieClient, String jobId, JobCompletion callback) { |
|
25 |
super(callback); |
|
26 |
this.oozieClient = oozieClient; |
|
27 |
this.jobId = jobId; |
|
28 |
} |
|
29 |
|
|
30 |
@Override |
|
31 |
public void run() { |
|
32 |
try { |
|
33 |
log.info("waiting for oozie job completion: " + getHadoopId()); |
|
34 |
|
|
35 |
Status status = doGetStatus(); |
|
36 |
while (status.equals(Status.RUNNING)) { |
|
37 |
Thread.sleep(monitorSleepTimeSecs * 1000); |
|
38 |
|
|
39 |
Status currentStatus = doGetStatus(); |
|
40 |
if (!status.equals(currentStatus)) { |
|
41 |
status = currentStatus; |
|
42 |
lastActivity = new Date(System.currentTimeMillis()); |
|
43 |
} |
|
44 |
} |
|
45 |
|
|
46 |
log.debug("job " + jobId + " finihsed with status: " + status); |
|
47 |
if (status == Status.SUCCEEDED) { |
|
48 |
// TODO set some content to return to the blackboard msg. |
|
49 |
getCallback().done(new HashMap<String, String>()); |
|
50 |
} else { |
|
51 |
// TODO retrieve some failure information from the oozie client. |
|
52 |
String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus() + ", oozie log:\n " |
|
53 |
+ getOozieClient().getJobLog(getHadoopId()) + "\n\n"; |
|
54 |
getCallback().failed(msg, new HadoopServiceException(msg)); |
|
55 |
} |
|
56 |
} catch (Throwable e) { |
|
57 |
getCallback().failed(getHadoopId(), e); |
|
58 |
} |
|
59 |
} |
|
60 |
|
|
61 |
@Override |
|
62 |
public String getHadoopId() { |
|
63 |
return jobId; |
|
64 |
} |
|
65 |
|
|
66 |
public OozieClient getOozieClient() { |
|
67 |
return oozieClient; |
|
68 |
} |
|
69 |
|
|
70 |
@Override |
|
71 |
public String getStatus() { |
|
72 |
try { |
|
73 |
return doGetStatus().toString(); |
|
74 |
} catch (OozieClientException e) { |
|
75 |
log.error("error accessing job status", e); |
|
76 |
return "UNKNOWN"; |
|
77 |
} |
|
78 |
} |
|
79 |
|
|
80 |
private Status doGetStatus() throws OozieClientException { |
|
81 |
return getOozieClient().getJobInfo(getHadoopId()).getStatus(); |
|
82 |
} |
|
83 |
|
|
84 |
@Override |
|
85 |
public Date getLastActivity() { |
|
86 |
return lastActivity; |
|
87 |
} |
|
88 |
|
|
89 |
@Override |
|
90 |
public Date getStartTime() throws HadoopServiceException { |
|
91 |
try { |
|
92 |
return getOozieClient().getJobInfo(getHadoopId()).getStartTime(); |
|
93 |
} catch (OozieClientException e) { |
|
94 |
throw new HadoopServiceException("unable to read job start time", e); |
|
95 |
} |
|
96 |
} |
|
97 |
|
|
98 |
@Override |
|
99 |
public String getTrackerUrl() { |
|
100 |
return getOozieClient().getOozieUrl(); |
|
101 |
} |
|
102 |
|
|
103 |
@Override |
|
104 |
public void kill() { |
|
105 |
try { |
|
106 |
getOozieClient().kill(getHadoopId()); |
|
107 |
} catch (OozieClientException e) { |
|
108 |
log.error("unable to kill job: " + getHadoopId(), e); |
|
109 |
} |
|
110 |
} |
|
111 |
|
|
112 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceImpl.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.List; |
|
5 |
import java.util.Map; |
|
6 |
import java.util.Map.Entry; |
|
7 |
import java.util.Set; |
|
8 |
|
|
9 |
import com.google.common.collect.Lists; |
|
10 |
import com.google.common.collect.Maps; |
|
11 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
12 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor; |
|
13 |
import eu.dnetlib.data.hadoop.rmi.HadoopService; |
|
14 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
15 |
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor; |
|
16 |
import eu.dnetlib.enabling.tools.AbstractBaseService; |
|
17 |
import eu.dnetlib.enabling.tools.blackboard.NotificationHandler; |
|
18 |
import org.apache.hadoop.conf.Configuration; |
|
19 |
import org.springframework.beans.factory.annotation.Autowired; |
|
20 |
import org.springframework.beans.factory.annotation.Required; |
|
21 |
|
|
22 |
/** |
|
23 |
* The Class HadoopServiceImpl. |
|
24 |
*/ |
|
25 |
public class HadoopServiceImpl extends AbstractBaseService implements HadoopService { |
|
26 |
|
|
27 |
/** |
|
28 |
* notification handler. |
|
29 |
*/ |
|
30 |
private NotificationHandler notificationHandler; |
|
31 |
|
|
32 |
/** The hadoop service core. */ |
|
33 |
@Autowired |
|
34 |
private HadoopServiceCore hadoopServiceCore; |
|
35 |
|
|
36 |
/** The job registry. */ |
|
37 |
@Autowired |
|
38 |
private JobRegistry jobRegistry; |
|
39 |
|
|
40 |
/* |
|
41 |
* (non-Javadoc) |
|
42 |
* |
|
43 |
* @see eu.dnetlib.data.hadoop.rmi.HadoopService#listAvailableJobs() |
|
44 |
*/ |
|
45 |
@Override |
|
46 |
public List<String> listAvailableJobs() throws HadoopServiceException { |
|
47 |
List<String> res = Lists.newArrayList(); |
|
48 |
return res; |
|
49 |
} |
|
50 |
|
|
51 |
/* |
|
52 |
* (non-Javadoc) |
|
53 |
* |
|
54 |
* @see eu.dnetlib.data.hadoop.rmi.HadoopService#listJobs(java.lang.String) |
|
55 |
*/ |
|
56 |
@Override |
|
57 |
public List<HadoopJobDescriptor> listJobs(final String clusterName) throws HadoopServiceException { |
|
58 |
return jobRegistry.listJobs(checkExists(clusterName)); |
|
59 |
} |
|
60 |
|
|
61 |
/* |
|
62 |
* (non-Javadoc) |
|
63 |
* |
|
64 |
* @see eu.dnetlib.data.hadoop.rmi.HadoopService#killJob(java.lang.String) |
|
65 |
*/ |
|
66 |
@Override |
|
67 |
public boolean killJob(final String jobId) throws HadoopServiceException { |
|
68 |
jobRegistry.unregisterJob(jobId); |
|
69 |
return true; |
|
70 |
} |
|
71 |
|
|
72 |
/* |
|
73 |
* (non-Javadoc) |
|
74 |
* |
|
75 |
* @see eu.dnetlib.enabling.tools.AbstractBaseService#notify(java.lang.String, java.lang.String, java.lang.String, java.lang.String) |
|
76 |
*/ |
|
77 |
@Override |
|
78 |
public void notify(final String subscriptionId, final String topic, final String isId, final String message) { |
|
79 |
getNotificationHandler().notified(subscriptionId, topic, isId, message); |
|
80 |
} |
|
81 |
|
|
82 |
/* |
|
83 |
* (non-Javadoc) |
|
84 |
* |
|
85 |
* @see eu.dnetlib.data.hadoop.rmi.HadoopService#listHbaseTables(java.lang.String) |
|
86 |
*/ |
|
87 |
@Override |
|
88 |
public List<String> listHbaseTables(final String clusterName) throws HadoopServiceException { |
|
89 |
try { |
|
90 |
return hadoopServiceCore.listTables(checkExists(clusterName)); |
|
91 |
} catch (IOException e) { |
|
92 |
throw new HadoopServiceException(e); |
|
93 |
} |
|
94 |
} |
|
95 |
|
|
96 |
/* |
|
97 |
* (non-Javadoc) |
|
98 |
* |
|
99 |
* @see eu.dnetlib.data.hadoop.rmi.HadoopService#ensureHbaseTable(java.lang.String, java.lang.String, java.util.Set) |
|
100 |
*/ |
|
101 |
@Override |
|
102 |
public boolean ensureHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException { |
|
103 |
try { |
|
104 |
hadoopServiceCore.ensureTable(checkExists(clusterName), tableName, columns); |
|
105 |
return true; |
|
106 |
} catch (IOException e) { |
|
107 |
throw new HadoopServiceException(e); |
|
108 |
} |
|
109 |
} |
|
110 |
|
|
111 |
/* |
|
112 |
* (non-Javadoc) |
|
113 |
* |
|
114 |
* @see eu.dnetlib.data.hadoop.rmi.HadoopService#createHbaseTable(java.lang.String, java.lang.String, java.util.Set) |
|
115 |
*/ |
|
116 |
@Override |
|
117 |
public boolean createHbaseTable(final String clusterName, final String tableName, final Set<String> columns) throws HadoopServiceException { |
|
118 |
try { |
|
119 |
hadoopServiceCore.createTable(checkExists(clusterName), tableName, columns); |
|
120 |
return true; |
|
121 |
} catch (IOException e) { |
|
122 |
throw new HadoopServiceException(e); |
|
123 |
} |
|
124 |
} |
|
125 |
|
|
126 |
/* |
|
127 |
* (non-Javadoc) |
|
128 |
* |
|
129 |
* @see eu.dnetlib.data.hadoop.rmi.HadoopService#truncateHbaseTable(java.lang.String, java.lang.String) |
|
130 |
*/ |
|
131 |
@Override |
|
132 |
public boolean truncateHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException { |
|
133 |
try { |
|
134 |
hadoopServiceCore.truncateTable(checkExists(clusterName), tableName); |
|
135 |
} catch (IOException e) { |
|
136 |
throw new HadoopServiceException(e); |
|
137 |
} |
|
138 |
return true; |
|
139 |
} |
|
140 |
|
|
141 |
/* |
|
142 |
* (non-Javadoc) |
|
143 |
* |
|
144 |
* @see eu.dnetlib.data.hadoop.rmi.HadoopService#dropHbaseTable(java.lang.String, java.lang.String) |
|
145 |
*/ |
|
146 |
@Override |
|
147 |
public boolean dropHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException { |
|
148 |
try { |
|
149 |
hadoopServiceCore.dropTable(checkExists(clusterName), tableName); |
|
150 |
} catch (IOException e) { |
|
151 |
throw new HadoopServiceException(e); |
|
152 |
} |
|
153 |
return true; |
|
154 |
} |
|
155 |
|
|
156 |
/* |
|
157 |
* (non-Javadoc) |
|
158 |
* |
|
159 |
* @see eu.dnetlib.data.hadoop.rmi.HadoopService#existHbaseTable(java.lang.String, java.lang.String) |
|
160 |
*/ |
|
161 |
@Override |
|
162 |
public boolean existHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException { |
|
163 |
try { |
|
164 |
return hadoopServiceCore.existTable(checkExists(clusterName), tableName); |
|
165 |
} catch (IOException e) { |
|
166 |
throw new HadoopServiceException(e); |
|
167 |
} |
|
168 |
} |
|
169 |
|
|
170 |
/* |
|
171 |
* (non-Javadoc) |
|
172 |
* |
|
173 |
* @see eu.dnetlib.data.hadoop.rmi.HadoopService#getClusterConfiguration(java.lang.String) |
|
174 |
*/ |
|
175 |
@Override |
|
176 |
public Map<String, String> getClusterConfiguration(final String clusterName) throws HadoopServiceException { |
|
177 |
|
|
178 |
final Configuration conf = hadoopServiceCore.getClusterConiguration(checkExists(clusterName)); |
|
179 |
final Map<String, String> res = Maps.newHashMap(); |
|
180 |
for (Entry<String, String> e : conf) { |
|
181 |
res.put(e.getKey(), e.getValue()); |
|
182 |
} |
|
183 |
|
|
184 |
return res; |
|
185 |
} |
|
186 |
|
|
187 |
@Override |
|
188 |
public boolean deleteHdfsPath(final String clusterName, final String path) throws HadoopServiceException { |
|
189 |
return hadoopServiceCore.deleteFromHdfs(checkExists(clusterName), path); |
|
190 |
} |
|
191 |
|
|
192 |
@Override |
|
193 |
public boolean createHdfsDirectory(final String clusterName, final String path, final boolean force) throws HadoopServiceException { |
|
194 |
return hadoopServiceCore.createHdfsDir(checkExists(clusterName), path, force); |
|
195 |
} |
|
196 |
|
|
197 |
@Override |
|
198 |
public List<String> listClusters() throws HadoopServiceException { |
|
199 |
try { |
|
200 |
return ClusterName.asStringList(); |
|
201 |
} catch (Throwable e) { |
|
202 |
throw new HadoopServiceException(e); |
|
203 |
} |
|
204 |
} |
|
205 |
|
|
206 |
@Override |
|
207 |
public List<String> describeHbaseTable(final String clusterName, final String tableName) throws HadoopServiceException { |
|
208 |
try { |
|
209 |
return hadoopServiceCore.describeTable(checkExists(clusterName), tableName); |
|
210 |
} catch (IOException e) { |
|
211 |
throw new HadoopServiceException(e); |
|
212 |
} |
|
213 |
} |
|
214 |
|
|
215 |
@Override |
|
216 |
public HBaseRowDescriptor describeHBaseColumn(final String clusterName, final String tableName, final String rowKey) throws HadoopServiceException { |
|
217 |
try { |
|
218 |
return hadoopServiceCore.describeRow(checkExists(clusterName), tableName, rowKey); |
|
219 |
} catch (IOException e) { |
|
220 |
throw new HadoopServiceException(e); |
|
221 |
} |
|
222 |
} |
|
223 |
|
|
224 |
@Override |
|
225 |
public Map<String, HBaseRowDescriptor> describeHBaseColumns(final String clusterName, final String tableName, final List<String> rowKeys) throws HadoopServiceException { |
|
226 |
try { |
|
227 |
return hadoopServiceCore.describeRows(checkExists(clusterName), tableName, rowKeys); |
|
228 |
} catch (IOException e) { |
|
229 |
throw new HadoopServiceException(e); |
|
230 |
} |
|
231 |
} |
|
232 |
|
|
233 |
@Override |
|
234 |
public boolean deleteHBaseColumn(final String clusterName, final String tableName, final HBaseRowDescriptor column) throws HadoopServiceException { |
|
235 |
try { |
|
236 |
hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, Lists.newArrayList(column)); |
|
237 |
return true; |
|
238 |
} catch (IOException e) { |
|
239 |
throw new HadoopServiceException(e); |
|
240 |
} |
|
241 |
} |
|
242 |
|
|
243 |
@Override |
|
244 |
public boolean deleteHBaseColumns(final String clusterName, final String tableName, final List<HBaseRowDescriptor> column) throws HadoopServiceException { |
|
245 |
try { |
|
246 |
hadoopServiceCore.deleteColumnsFromHBase(checkExists(clusterName), tableName, column); |
|
247 |
return true; |
|
248 |
} catch (IOException e) { |
|
249 |
throw new HadoopServiceException(e); |
|
250 |
} |
|
251 |
} |
|
252 |
|
|
253 |
/////////////////// |
|
254 |
|
|
255 |
/** |
|
256 |
* Check exists. |
|
257 |
* |
|
258 |
* @param clusterName |
|
259 |
* the cluster name |
|
260 |
* @return the cluster name |
|
261 |
* @throws HadoopServiceException |
|
262 |
* the hadoop service exception |
|
263 |
*/ |
|
264 |
private ClusterName checkExists(final String clusterName) throws HadoopServiceException { |
|
265 |
try { |
|
266 |
return ClusterName.valueOf(clusterName); |
|
267 |
} catch (final IllegalArgumentException e) { |
|
268 |
throw new HadoopServiceException("Invalid cluster name: " + clusterName); |
|
269 |
} |
|
270 |
} |
|
271 |
|
|
272 |
/** |
|
273 |
* Gets the notification handler. |
|
274 |
* |
|
275 |
* @return the notification handler |
|
276 |
*/ |
|
277 |
public NotificationHandler getNotificationHandler() { |
|
278 |
return notificationHandler; |
|
279 |
} |
|
280 |
|
|
281 |
/** |
|
282 |
* Sets the notification handler. |
|
283 |
* |
|
284 |
* @param notificationHandler |
|
285 |
* the new notification handler |
|
286 |
*/ |
|
287 |
@Required |
|
288 |
public void setNotificationHandler(final NotificationHandler notificationHandler) { |
|
289 |
this.notificationHandler = notificationHandler; |
|
290 |
} |
|
291 |
|
|
292 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/utils/ScanFactory.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.utils; |
|
2 |
|
|
3 |
import java.io.ByteArrayOutputStream; |
|
4 |
import java.io.DataOutputStream; |
|
5 |
import java.io.IOException; |
|
6 |
import java.util.Map; |
|
7 |
|
|
8 |
import org.apache.commons.lang.StringUtils; |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.apache.hadoop.hbase.client.Scan; |
|
12 |
import org.apache.hadoop.hbase.filter.PrefixFilter; |
|
13 |
import org.apache.hadoop.hbase.util.Base64; |
|
14 |
import org.dom4j.Document; |
|
15 |
import org.dom4j.Node; |
|
16 |
|
|
17 |
public class ScanFactory { |
|
18 |
|
|
19 |
private static final Log log = LogFactory.getLog(ScanFactory.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
20 |
|
|
21 |
public static String getScan(final ScanProperties scanProperties) throws IOException { |
|
22 |
Scan scan = new Scan(); |
|
23 |
|
|
24 |
scan.setCaching(scanProperties.getCaching()); |
|
25 |
scan.setCacheBlocks(false); // don't set to true for MR jobs |
|
26 |
|
|
27 |
scan.setFilter(scanProperties.getFilterList()); |
|
28 |
for (String family : scanProperties.getFamilies()) { |
|
29 |
scan.addFamily(family.getBytes()); |
|
30 |
} |
|
31 |
|
|
32 |
log.debug("serializing scan"); |
|
33 |
return convertScanToString(scan); |
|
34 |
} |
|
35 |
|
|
36 |
public static ScanProperties parseScanProperties(final Document doc, final Map<String, String> bbParams) { |
|
37 |
log.debug("setting job scanner"); |
|
38 |
|
|
39 |
ScanProperties scanProperties = new ScanProperties(doc.valueOf("//FILTERS/@operator")); |
|
40 |
|
|
41 |
String caching = doc.valueOf("//SCAN/@caching"); |
|
42 |
if (!StringUtils.isBlank(caching)) { |
|
43 |
log.info("overriding default scan caching with: " + caching); |
|
44 |
scanProperties.setCaching(Integer.valueOf(caching)); |
|
45 |
} |
|
46 |
|
|
47 |
for (Object o : doc.selectNodes("//SCAN/FAMILIES/FAMILY")) { |
|
48 |
Node node = (Node) o; |
|
49 |
String value = node.valueOf("./@value"); |
|
50 |
if ((value == null) || value.isEmpty()) { |
|
51 |
value = bbParams.get(node.valueOf("./@param")); |
|
52 |
} |
|
53 |
log.debug("scanner family value: " + value); |
|
54 |
scanProperties.getFamilies().add(value); |
|
55 |
} |
|
56 |
for (Object o : doc.selectNodes("//SCAN/FILTERS/FILTER")) { |
|
57 |
Node node = (Node) o; |
|
58 |
String filterType = node.valueOf("./@type"); |
|
59 |
|
|
60 |
String value = node.valueOf("./@value"); |
|
61 |
if ((value == null) || value.isEmpty()) { |
|
62 |
value = bbParams.get(node.valueOf("./@param")); |
|
63 |
} |
|
64 |
|
|
65 |
if (filterType.equals("prefix")) { |
|
66 |
log.debug("scanner prefix filter, value: " + value); |
|
67 |
scanProperties.getFilterList().addFilter(new PrefixFilter(value.getBytes())); |
|
68 |
} // TODO add more filterType cases here |
|
69 |
} |
|
70 |
return scanProperties; |
|
71 |
} |
|
72 |
|
|
73 |
/** |
|
74 |
* Writes the given scan into a Base64 encoded string. |
|
75 |
* |
|
76 |
* @param scan |
|
77 |
* The scan to write out. |
|
78 |
* @return The scan saved in a Base64 encoded string. |
|
79 |
* @throws IOException |
|
80 |
* When writing the scan fails. |
|
81 |
*/ |
|
82 |
private static String convertScanToString(final Scan scan) throws IOException { |
|
83 |
ByteArrayOutputStream out = new ByteArrayOutputStream(); |
|
84 |
DataOutputStream dos = new DataOutputStream(out); |
|
85 |
scan.write(dos); |
|
86 |
return Base64.encodeBytes(out.toByteArray()); |
|
87 |
} |
|
88 |
|
|
89 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/hbase/HbaseTableFeeder.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Arrays; |
|
5 |
import java.util.List; |
|
6 |
|
|
7 |
import com.google.common.base.Predicates; |
|
8 |
import com.google.common.collect.Iterables; |
|
9 |
import com.google.common.collect.Lists; |
|
10 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
11 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
12 |
import eu.dnetlib.data.transform.Row; |
|
13 |
import eu.dnetlib.data.transform.XsltRowTransformerFactory; |
|
14 |
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory; |
|
15 |
import org.apache.commons.logging.Log; |
|
16 |
import org.apache.commons.logging.LogFactory; |
|
17 |
import org.apache.hadoop.conf.Configuration; |
|
18 |
import org.apache.hadoop.hbase.client.HTable; |
|
19 |
import org.apache.hadoop.hbase.client.Mutation; |
|
20 |
import org.springframework.beans.factory.annotation.Autowired; |
|
21 |
import org.springframework.beans.factory.annotation.Required; |
|
22 |
|
|
23 |
/** |
|
24 |
* The Class HbaseTableFeeder provides abstraction to ship batch operation on an HBase table |
|
25 |
*/ |
|
26 |
public abstract class HbaseTableFeeder { |
|
27 |
|
|
28 |
/** |
|
29 |
* The Constant log. |
|
30 |
*/ |
|
31 |
private static final Log log = LogFactory.getLog(HbaseTableFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
32 |
/** |
|
33 |
* The configuration enumerator. |
|
34 |
*/ |
|
35 |
@Autowired |
|
36 |
protected ConfigurationEnumerator configurationEnumerator; |
|
37 |
/** |
|
38 |
* The batch size. |
|
39 |
*/ |
|
40 |
private int batchSize = 100; |
|
41 |
/** |
|
42 |
* The result set client factory. |
|
43 |
*/ |
|
44 |
private ResultSetClientFactory resultSetClientFactory; |
|
45 |
|
|
46 |
/** |
|
47 |
* Adds the operation. |
|
48 |
* |
|
49 |
* @param buffer the buffer |
|
50 |
* @param row the row |
|
51 |
*/ |
|
52 |
protected abstract void addOperation(final List<Mutation> buffer, final Row row); |
|
53 |
|
|
54 |
/** |
|
55 |
* Feed. |
|
56 |
* |
|
57 |
* @param epr the epr |
|
58 |
* @param xsl the xsl |
|
59 |
* @param clusterName the cluster name |
|
60 |
* @param tableName the table name |
|
61 |
* @param simulation the simulation |
|
62 |
* @return the int |
|
63 |
* @throws IOException Signals that an I/O exception has occurred. |
|
64 |
* @throws InterruptedException the interrupted exception |
|
65 |
*/ |
|
66 |
public int feed(final String epr, final String xsl, final ClusterName clusterName, final String tableName, final boolean simulation) |
|
67 |
throws IOException, InterruptedException { |
|
68 |
return doWrite(asRows(epr, xsl), getConf(clusterName), tableName, simulation); |
|
69 |
} |
|
70 |
|
|
71 |
/** |
|
72 |
* Do writeOnHBase. |
|
73 |
* |
|
74 |
* @param rows the rows |
|
75 |
* @param configuration the configuration |
|
76 |
* @param tableName the table name |
|
77 |
* @param simulation the simulation |
|
78 |
* @return the int |
|
79 |
* @throws IOException Signals that an I/O exception has occurred. |
|
80 |
* @throws InterruptedException the interrupted exception |
|
81 |
*/ |
|
82 |
private int doWrite(final Iterable<Row> rows, final Configuration configuration, final String tableName, final boolean simulation) |
|
83 |
throws IOException, InterruptedException { |
|
84 |
final List<Mutation> buffer = Lists.newArrayList(); |
|
85 |
|
|
86 |
int count = 0; |
|
87 |
if (simulation) { |
|
88 |
log.info("running in simulation mode ..."); |
|
89 |
log.info(String.format("... simulated import of %d records", Iterables.size(rows))); |
|
90 |
} else { |
|
91 |
|
|
92 |
final HTable htable = new HTable(configuration, tableName); |
|
93 |
try { |
|
94 |
int i = 0; |
|
95 |
for (final Row row : rows) { |
|
96 |
addOperation(buffer, row); |
|
97 |
if ((++i % getBatchSize()) == 0) { |
|
98 |
flush(tableName, buffer, htable); |
|
99 |
count += buffer.size(); |
|
100 |
buffer.clear(); |
|
101 |
} |
|
102 |
} |
|
103 |
} finally { |
|
104 |
if (!buffer.isEmpty()) { |
|
105 |
flush(tableName, buffer, htable); |
|
106 |
count += buffer.size(); |
|
107 |
} |
|
108 |
htable.flushCommits(); |
|
109 |
htable.close(); |
|
110 |
} |
|
111 |
} |
|
112 |
return count; |
|
113 |
} |
|
114 |
|
|
115 |
private void flush(final String tableName, final List<Mutation> buffer, final HTable htable) throws IOException, InterruptedException { |
|
116 |
if (!checkOp(htable.batch(buffer), tableName)) throw new IOException("unable to flush operation on HBase table: " + tableName); |
|
117 |
} |
|
118 |
|
|
119 |
private boolean checkOp(final Object[] res, final String tableName) throws IOException { |
|
120 |
return Iterables.all(Arrays.asList(res), Predicates.notNull()); |
|
121 |
} |
|
122 |
|
|
123 |
/** |
|
124 |
* As rows. |
|
125 |
* |
|
126 |
* @param epr the epr |
|
127 |
* @param xsl the xsl |
|
128 |
* @return the iterable |
|
129 |
*/ |
|
130 |
protected Iterable<Row> asRows(final String epr, final String xsl) { |
|
131 |
return Iterables.concat(Iterables.transform(getResultSetClientFactory().getClient(epr), XsltRowTransformerFactory.newInstance(xsl))); |
|
132 |
} |
|
133 |
|
|
134 |
/** |
|
135 |
* Gets the conf. |
|
136 |
* |
|
137 |
* @param clusterName the cluster name |
|
138 |
* @return the conf |
|
139 |
*/ |
|
140 |
protected Configuration getConf(final ClusterName clusterName) { |
|
141 |
return configurationEnumerator.get(clusterName); |
|
142 |
} |
|
143 |
|
|
144 |
/** |
|
145 |
* Gets the batch size. |
|
146 |
* |
|
147 |
* @return the batch size |
|
148 |
*/ |
|
149 |
public int getBatchSize() { |
|
150 |
return batchSize; |
|
151 |
} |
|
152 |
|
|
153 |
/** |
|
154 |
* Sets the batch size. |
|
155 |
* |
|
156 |
* @param batchSize the new batch size |
|
157 |
*/ |
|
158 |
public void setBatchSize(final int batchSize) { |
|
159 |
this.batchSize = batchSize; |
|
160 |
} |
|
161 |
|
|
162 |
/** |
|
163 |
* Gets the result set client factory. |
|
164 |
* |
|
165 |
* @return the result set client factory |
|
166 |
*/ |
|
167 |
public ResultSetClientFactory getResultSetClientFactory() { |
|
168 |
return resultSetClientFactory; |
|
169 |
} |
|
170 |
|
|
171 |
/** |
|
172 |
* Sets the result set client factory. |
|
173 |
* |
|
174 |
* @param resultSetClientFactory the new result set client factory |
|
175 |
*/ |
|
176 |
@Required |
|
177 |
public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) { |
|
178 |
this.resultSetClientFactory = resultSetClientFactory; |
|
179 |
} |
|
180 |
|
|
181 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/utils/JobProfile.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.utils; |
|
2 |
|
|
3 |
import java.util.Map; |
|
4 |
import java.util.Set; |
|
5 |
|
|
6 |
import com.google.common.collect.Maps; |
|
7 |
import com.google.common.collect.Sets; |
|
8 |
|
|
9 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobType; |
|
10 |
|
|
11 |
public class JobProfile { |
|
12 |
|
|
13 |
private final Map<String, String> jobDefinition = Maps.newHashMap(); |
|
14 |
private final Set<String> requiredParams = Sets.newHashSet(); |
|
15 |
private ScanProperties scanProperties; |
|
16 |
|
|
17 |
private String name; |
|
18 |
|
|
19 |
private String description = ""; |
|
20 |
|
|
21 |
private HadoopJobType jobType; |
|
22 |
|
|
23 |
public boolean isEmpty() { |
|
24 |
return getJobDefinition().isEmpty(); |
|
25 |
} |
|
26 |
|
|
27 |
public Map<String, String> getJobDefinition() { |
|
28 |
return jobDefinition; |
|
29 |
} |
|
30 |
|
|
31 |
public Set<String> getRequiredParams() { |
|
32 |
return requiredParams; |
|
33 |
} |
|
34 |
|
|
35 |
public String getName() { |
|
36 |
return name; |
|
37 |
} |
|
38 |
|
|
39 |
public void setName(String name) { |
|
40 |
this.name = name; |
|
41 |
} |
|
42 |
|
|
43 |
public String getDescription() { |
|
44 |
return description; |
|
45 |
} |
|
46 |
|
|
47 |
public void setDescription(String description) { |
|
48 |
this.description = description; |
|
49 |
} |
|
50 |
|
|
51 |
public ScanProperties getScanProperties() { |
|
52 |
return scanProperties; |
|
53 |
} |
|
54 |
|
|
55 |
public void setScanProperties(ScanProperties scanProperties) { |
|
56 |
this.scanProperties = scanProperties; |
|
57 |
} |
|
58 |
|
|
59 |
public HadoopJobType getJobType() { |
|
60 |
return jobType; |
|
61 |
} |
|
62 |
|
|
63 |
public void setJobType(HadoopJobType jobType) { |
|
64 |
this.jobType = jobType; |
|
65 |
} |
|
66 |
|
|
67 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/hbase/HBaseDeleteFeeder.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
import org.apache.hadoop.hbase.client.Delete; |
|
8 |
import org.apache.hadoop.hbase.client.Mutation; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
|
|
11 |
import eu.dnetlib.data.transform.Column; |
|
12 |
import eu.dnetlib.data.transform.Row; |
|
13 |
|
|
14 |
/** |
|
15 |
* The Class HBaseDeleteFeeder performs a batch of Delete operations. |
|
16 |
*/ |
|
17 |
public class HBaseDeleteFeeder extends HbaseTableFeeder { |
|
18 |
|
|
19 |
/** |
|
20 |
* Logger. |
|
21 |
*/ |
|
22 |
private static final Log log = LogFactory.getLog(HBaseDeleteFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 |
|
|
24 |
/* |
|
25 |
* (non-Javadoc) |
|
26 |
* |
|
27 |
* @see eu.dnetlib.data.hadoop.hbase.HbaseTableFeeder#addOperation(java.util.List, eu.dnetlib.data.transform.Row) |
|
28 |
*/ |
|
29 |
@Override |
|
30 |
protected void addOperation(final List<Mutation> buffer, final Row row) { |
|
31 |
final Delete delete = new Delete(Bytes.toBytes(row.getKey())); |
|
32 |
delete.setWriteToWAL(true); |
|
33 |
|
|
34 |
for (final Column<String, byte[]> col : row) { |
|
35 |
log.debug(String.format("deleting K: '%s' CF:'%s' Q:'%s'", row.getKey(), row.getColumnFamily(), col.getName())); |
|
36 |
delete.deleteColumns(Bytes.toBytes(row.getColumnFamily()), Bytes.toBytes(col.getName())); |
|
37 |
} |
|
38 |
|
|
39 |
buffer.add(delete); |
|
40 |
} |
|
41 |
|
|
42 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/JobRegistry.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop; |
|
2 |
|
|
3 |
import java.util.Date; |
|
4 |
import java.util.List; |
|
5 |
import java.util.Map; |
|
6 |
import java.util.Map.Entry; |
|
7 |
|
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
import org.springframework.beans.factory.annotation.Required; |
|
11 |
|
|
12 |
import com.google.common.collect.BiMap; |
|
13 |
import com.google.common.collect.HashBiMap; |
|
14 |
import com.google.common.collect.Iterables; |
|
15 |
import com.google.common.collect.Lists; |
|
16 |
import com.google.common.collect.Maps; |
|
17 |
|
|
18 |
import eu.dnetlib.data.hadoop.HadoopJob.Status; |
|
19 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
20 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor; |
|
21 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
|
22 |
import eu.dnetlib.data.hadoop.utils.HadoopUtils; |
|
23 |
|
|
24 |
public class JobRegistry { |
|
25 |
|
|
26 |
private static final Log log = LogFactory.getLog(JobRegistry.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
27 |
|
|
28 |
private int maxJobs; |
|
29 |
|
|
30 |
private final BiMap<String, HadoopJob> jobs = HashBiMap.create(); |
|
31 |
|
|
32 |
public String registerJob(HadoopJob hadoopJob) throws HadoopServiceException { |
|
33 |
|
|
34 |
if (jobs.containsValue(hadoopJob)) { return jobs.inverse().get(hadoopJob); } |
|
35 |
|
|
36 |
if (jobs.size() > getMaxJobs()) { |
|
37 |
removeOldestProcess(); |
|
38 |
} |
|
39 |
|
|
40 |
jobs.put(hadoopJob.getId(), hadoopJob); |
|
41 |
log.info("Registered hadoop job " + hadoopJob.getId()); |
|
42 |
hadoopJob.startMonitor(); |
|
43 |
|
|
44 |
return hadoopJob.getId(); |
|
45 |
} |
|
46 |
|
|
47 |
public Status getJobStatus(String id) { |
|
48 |
return findJob(id).getStatus(); |
|
49 |
} |
|
50 |
|
|
51 |
public HadoopJob findJob(String id) { |
|
52 |
return jobs.get(id); |
|
53 |
} |
|
54 |
|
|
55 |
public void unregisterJob(String id) throws HadoopServiceException { |
|
56 |
|
|
57 |
if (!jobs.containsKey(id)) { throw new HadoopServiceException("unable to unregister job, could not find jobId in registry: " + id); } |
|
58 |
|
|
59 |
log.info("unregistering job: " + id); |
|
60 |
jobs.get(id).getJobMonitor().kill(); |
|
61 |
jobs.remove(id); |
|
62 |
} |
|
63 |
|
|
64 |
private void removeOldestProcess() throws HadoopServiceException { |
|
65 |
Date oldDate = new Date(); |
|
66 |
String oldId = null; |
|
67 |
|
|
68 |
for (Entry<String, HadoopJob> e : jobs.entrySet()) { |
|
69 |
final HadoopJob hadoopJob = e.getValue(); |
|
70 |
|
|
71 |
if (hadoopJob.isComplete()) { |
|
72 |
final Date date = hadoopJob.getLastActivity(); |
|
73 |
if (date.before(oldDate)) { |
|
74 |
oldDate = date; |
|
75 |
oldId = e.getKey(); |
|
76 |
} |
|
77 |
} |
|
78 |
} |
|
79 |
|
|
80 |
if (oldId != null) { |
|
81 |
unregisterJob(oldId); |
|
82 |
} |
|
83 |
|
|
84 |
} |
|
85 |
|
|
86 |
public List<HadoopJobDescriptor> listJobs(ClusterName clusterName) { |
|
87 |
Map<String, HadoopJob> filtered = Maps.filterValues(jobs, HadoopUtils.filterByCluster(clusterName)); |
|
88 |
return Lists.newArrayList(Iterables.transform(filtered.entrySet(), HadoopUtils.hadoopJobDescriptor())); |
|
89 |
} |
|
90 |
|
|
91 |
@Required |
|
92 |
public void setMaxJobs(final int maxJobs) { |
|
93 |
this.maxJobs = maxJobs; |
|
94 |
} |
|
95 |
|
|
96 |
public int getMaxJobs() { |
|
97 |
return maxJobs; |
|
98 |
} |
|
99 |
|
|
100 |
} |
modules/dnet-hadoop-service/tags/dnet-hadoop-service-2.4.1/src/main/java/eu/dnetlib/data/hadoop/hbase/HBasePutFeeder.java | ||
---|---|---|
1 |
package eu.dnetlib.data.hadoop.hbase; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
Also available in: Unified diff
[maven-release-plugin] copy for tag dnet-hadoop-service-2.4.1