Revision 49831
Added by Claudio Atzori almost 7 years ago
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/AbstractHBaseMapReduceJob.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Map.Entry; |
|
5 |
import java.util.Properties; |
|
6 |
|
|
7 |
import javax.annotation.Resource; |
|
8 |
|
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
import org.apache.hadoop.conf.Configured; |
|
12 |
import org.apache.hadoop.fs.FileSystem; |
|
13 |
import org.apache.hadoop.fs.Path; |
|
14 |
import org.apache.hadoop.mapreduce.Job; |
|
15 |
import org.springframework.beans.factory.BeanNameAware; |
|
16 |
|
|
17 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
18 |
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator; |
|
19 |
import eu.dnetlib.data.mapreduce.HadoopJob; |
|
20 |
|
|
21 |
public abstract class AbstractHBaseMapReduceJob extends Configured implements HadoopJob, BeanNameAware { |
|
22 |
|
|
23 |
protected static final Log log = LogFactory.getLog(AbstractHBaseMapReduceJob.class); |
|
24 |
|
|
25 |
protected String jobName; |
|
26 |
|
|
27 |
@Resource |
|
28 |
protected ConfigurationEnumerator configurationEnumerator; |
|
29 |
|
|
30 |
protected abstract Job setJobDetails(Job job, Properties p) throws Exception; |
|
31 |
|
|
32 |
@Override |
|
33 |
public Job setJobDetails(ClusterName name, Properties p) { |
|
34 |
try { |
|
35 |
final Job job = createJobCommon(name, p); |
|
36 |
return setJobDetails(job, p); |
|
37 |
} catch (Exception e) { |
|
38 |
throw new RuntimeException("unable to define Job: " + getClass().getSimpleName(), e); |
|
39 |
} |
|
40 |
} |
|
41 |
|
|
42 |
protected Job createJobCommon(ClusterName name, Properties p) throws IOException { |
|
43 |
|
|
44 |
Job job = new Job(configurationEnumerator.get(name), getClass().getSimpleName()); |
|
45 |
|
|
46 |
merge(job, p); |
|
47 |
|
|
48 |
return job; |
|
49 |
} |
|
50 |
|
|
51 |
protected void merge(final Job job, final Properties p) { |
|
52 |
for (Entry<Object, Object> e : p.entrySet()) { |
|
53 |
job.getConfiguration().set((String) e.getKey(), (String) e.getValue()); |
|
54 |
} |
|
55 |
} |
|
56 |
|
|
57 |
protected void deleteHdfsFile(Job job, Path path) { |
|
58 |
try { |
|
59 |
FileSystem hdfs = FileSystem.get(job.getConfiguration()); |
|
60 |
if (hdfs.exists(path)) { |
|
61 |
hdfs.delete(path, true); |
|
62 |
} |
|
63 |
} catch (IOException e) { |
|
64 |
e.printStackTrace(); |
|
65 |
} |
|
66 |
} |
|
67 |
|
|
68 |
@Override |
|
69 |
public void setBeanName(String name) { |
|
70 |
this.jobName = name; |
|
71 |
} |
|
72 |
|
|
73 |
@Override |
|
74 |
public String getName() { |
|
75 |
return jobName; |
|
76 |
} |
|
77 |
|
|
78 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/actions/CommitActionsJob.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.actions; |
|
2 |
|
|
3 |
import org.apache.hadoop.hbase.client.Put; |
|
4 |
import org.apache.hadoop.hbase.client.Scan; |
|
5 |
import org.apache.hadoop.hbase.filter.FilterList; |
|
6 |
import org.apache.hadoop.hbase.filter.PrefixFilter; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
|
|
11 |
public class CommitActionsJob extends AbstractActionsJob { |
|
12 |
|
|
13 |
@Override |
|
14 |
protected Class<? extends TableMapper<ImmutableBytesWritable, ?>> getMapperClass() { |
|
15 |
return CommitActionsMapper.class; |
|
16 |
} |
|
17 |
|
|
18 |
@Override |
|
19 |
protected Class<?> getMapOutputValueClass() { |
|
20 |
return Put.class; |
|
21 |
} |
|
22 |
|
|
23 |
@Override |
|
24 |
protected void addSpecificFilters(FilterList filters) { |
|
25 |
filters.addFilter(new PrefixFilter(Bytes.toBytes("aac|"))); |
|
26 |
} |
|
27 |
|
|
28 |
@Override |
|
29 |
protected void addSpecificScanClauses(Scan scan) { |
|
30 |
//scan.addFamily(Bytes.toBytes("target")); |
|
31 |
} |
|
32 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/actions/DeleteActionsMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.actions; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.hbase.client.Delete; |
|
6 |
import org.apache.hadoop.hbase.client.Result; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
|
|
11 |
public class DeleteActionsMapper extends TableMapper<ImmutableBytesWritable, Delete> { |
|
12 |
|
|
13 |
private String set = null; |
|
14 |
|
|
15 |
@Override |
|
16 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
17 |
set = context.getConfiguration().get("set"); |
|
18 |
if (set != null) { |
|
19 |
System.out.println("Deleting set: " + set); |
|
20 |
} else { |
|
21 |
System.out.println("Deleting ALL sets"); |
|
22 |
} |
|
23 |
} |
|
24 |
|
|
25 |
@Override |
|
26 |
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { |
|
27 |
if (set == null || value.getColumnLatest(Bytes.toBytes("set"), Bytes.toBytes(set)) != null) { |
|
28 |
|
|
29 |
final byte[] rowKey = key.copyBytes(); |
|
30 |
System.out.println("Deleting action " + Bytes.toString(rowKey)); |
|
31 |
|
|
32 |
context.getCounter("Actions", "N. Deletes").increment(1); |
|
33 |
context.write(key, new Delete(rowKey)); |
|
34 |
} |
|
35 |
} |
|
36 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/actions/DeleteActionsJob.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.actions; |
|
2 |
|
|
3 |
import org.apache.hadoop.hbase.client.Delete; |
|
4 |
import org.apache.hadoop.hbase.client.Scan; |
|
5 |
import org.apache.hadoop.hbase.filter.FilterList; |
|
6 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
7 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
8 |
|
|
9 |
public class DeleteActionsJob extends AbstractActionsJob { |
|
10 |
|
|
11 |
@Override |
|
12 |
protected Class<? extends TableMapper<ImmutableBytesWritable, ?>> getMapperClass() { |
|
13 |
return DeleteActionsMapper.class; |
|
14 |
} |
|
15 |
|
|
16 |
@Override |
|
17 |
protected Class<?> getMapOutputValueClass() { |
|
18 |
return Delete.class; |
|
19 |
} |
|
20 |
|
|
21 |
@Override |
|
22 |
protected void addSpecificFilters(FilterList filters) { |
|
23 |
// NOT necessary |
|
24 |
} |
|
25 |
|
|
26 |
@Override |
|
27 |
protected void addSpecificScanClauses(Scan scan) { |
|
28 |
// NOT necessary |
|
29 |
} |
|
30 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/actions/CommitActionsMapper.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.actions; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
|
|
5 |
import org.apache.hadoop.hbase.client.Put; |
|
6 |
import org.apache.hadoop.hbase.client.Result; |
|
7 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
8 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
9 |
import org.apache.hadoop.hbase.util.Bytes; |
|
10 |
|
|
11 |
public class CommitActionsMapper extends TableMapper<ImmutableBytesWritable, Put> { |
|
12 |
|
|
13 |
private String set = null; |
|
14 |
|
|
15 |
@Override |
|
16 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
17 |
set = context.getConfiguration().get("set"); |
|
18 |
if (set != null) { |
|
19 |
System.out.println("Committing set: " + set); |
|
20 |
} else { |
|
21 |
System.out.println("Committing ALL sets"); |
|
22 |
} |
|
23 |
} |
|
24 |
|
|
25 |
@Override |
|
26 |
protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException { |
|
27 |
|
|
28 |
if (set == null || value.getColumnLatest(Bytes.toBytes("set"), Bytes.toBytes(set)) != null) { |
|
29 |
System.out.println("Processing action " + Bytes.toString(key.copyBytes())); |
|
30 |
|
|
31 |
byte[] cf = Bytes.toBytes("target"); |
|
32 |
|
|
33 |
byte[] tkey = value.getValue(cf, Bytes.toBytes("rowKey")); |
|
34 |
byte[] tcf = value.getValue(cf, Bytes.toBytes("columnFamily")); |
|
35 |
byte[] tc = value.getValue(cf, Bytes.toBytes("column")); |
|
36 |
byte[] tv = value.getValue(cf, Bytes.toBytes("content")); |
|
37 |
|
|
38 |
Put put = new Put(tkey); |
|
39 |
put.add(tcf, tc, tv); |
|
40 |
|
|
41 |
context.getCounter("Actions", Bytes.toString(tcf)).increment(1); |
|
42 |
|
|
43 |
context.write(new ImmutableBytesWritable(tkey), put); |
|
44 |
} |
|
45 |
} |
|
46 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/actions/AbstractActionsJob.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.actions; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Properties; |
|
5 |
|
|
6 |
import org.apache.hadoop.hbase.client.Scan; |
|
7 |
import org.apache.hadoop.hbase.filter.FilterList; |
|
8 |
import org.apache.hadoop.hbase.filter.FilterList.Operator; |
|
9 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
|
10 |
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; |
|
11 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
|
12 |
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat; |
|
13 |
import org.apache.hadoop.io.Text; |
|
14 |
import org.apache.hadoop.mapreduce.Job; |
|
15 |
|
|
16 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
17 |
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob; |
|
18 |
|
|
19 |
abstract public class AbstractActionsJob extends AbstractHBaseMapReduceJob { |
|
20 |
|
|
21 |
@Override |
|
22 |
protected Job setJobDetails(Job job, Properties p) throws Exception { |
|
23 |
initMapper(job, getScan(p), p.getProperty(JobParams.HBASE_SOURCE_TABLE)); |
|
24 |
|
|
25 |
job.setOutputFormatClass(TableOutputFormat.class); |
|
26 |
job.setMapOutputKeyClass(Text.class); |
|
27 |
job.setMapOutputValueClass(getMapOutputValueClass()); |
|
28 |
job.setNumReduceTasks(0); |
|
29 |
|
|
30 |
return job; |
|
31 |
} |
|
32 |
|
|
33 |
private void initMapper(final Job job, final Scan scan, final String sourceTable) { |
|
34 |
try { |
|
35 |
TableMapReduceUtil.initTableMapperJob(sourceTable, scan, getMapperClass(), Text.class, ImmutableBytesWritable.class, job); |
|
36 |
} catch (IOException e) { |
|
37 |
throw new RuntimeException(e); |
|
38 |
} |
|
39 |
} |
|
40 |
|
|
41 |
abstract protected Class<? extends TableMapper<ImmutableBytesWritable, ?>> getMapperClass(); |
|
42 |
|
|
43 |
abstract protected Class<?> getMapOutputValueClass(); |
|
44 |
|
|
45 |
abstract protected void addSpecificFilters(FilterList filters); |
|
46 |
|
|
47 |
abstract protected void addSpecificScanClauses(Scan scan); |
|
48 |
|
|
49 |
private Scan getScan(Properties p) { |
|
50 |
|
|
51 |
Scan scan = new Scan(); |
|
52 |
scan.setCaching(500); |
|
53 |
scan.setCacheBlocks(false); |
|
54 |
|
|
55 |
FilterList filters = new FilterList(Operator.MUST_PASS_ALL); |
|
56 |
|
|
57 |
//if (p.containsKey("set")) { |
|
58 |
//filters.addFilter(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("set")))); |
|
59 |
//filters.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(p.getProperty("set"))))); |
|
60 |
//scan.addFamily(Bytes.toBytes("set")); |
|
61 |
// byte[] qualifier = Bytes.toBytes(p.getProperty("set")); |
|
62 |
// SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("set"), qualifier, CompareOp.EQUAL, qualifier); |
|
63 |
// filter.setFilterIfMissing(true); |
|
64 |
// filters.addFilter(filter); |
|
65 |
//scan.addColumn(Bytes.toBytes("set"), Bytes.toBytes(p.getProperty("set"))); |
|
66 |
//} |
|
67 |
|
|
68 |
addSpecificFilters(filters); |
|
69 |
addSpecificScanClauses(scan); |
|
70 |
|
|
71 |
scan.setFilter(filters); |
|
72 |
return scan; |
|
73 |
} |
|
74 |
|
|
75 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/DocumentDatabaseFeedJob.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.index; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.Properties; |
|
5 |
|
|
6 |
import org.apache.hadoop.fs.Path; |
|
7 |
import org.apache.hadoop.io.Text; |
|
8 |
import org.apache.hadoop.mapreduce.Job; |
|
9 |
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; |
|
10 |
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; |
|
11 |
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; |
|
12 |
|
|
13 |
import eu.dnetlib.data.mapreduce.JobParams; |
|
14 |
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob; |
|
15 |
|
|
16 |
public class DocumentDatabaseFeedJob extends AbstractHBaseMapReduceJob { |
|
17 |
|
|
18 |
@Override |
|
19 |
public Job setJobDetails(final Job job, final Properties p) { |
|
20 |
|
|
21 |
job.setInputFormatClass(SequenceFileInputFormat.class); |
|
22 |
try { |
|
23 |
FileInputFormat.setInputPaths(job, p.getProperty(JobParams.MAPRED_INPUT_DIR)); |
|
24 |
FileOutputFormat.setOutputPath(job, new Path(p.getProperty(JobParams.DOCUMENT_DB_ROTTEN_FILE))); |
|
25 |
} catch (IOException e) { |
|
26 |
throw new RuntimeException(e); |
|
27 |
} |
|
28 |
job.setMapperClass(DocumentDatabaseMapper.class); |
|
29 |
job.setMapOutputKeyClass(Text.class); |
|
30 |
job.setMapOutputValueClass(Text.class); |
|
31 |
|
|
32 |
job.setNumReduceTasks(0); |
|
33 |
|
|
34 |
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false); |
|
35 |
job.getConfiguration().setBoolean("mapreduce.map.speculative", false); |
|
36 |
|
|
37 |
job.getConfiguration().setBoolean("mapred.compress.map.output", true); |
|
38 |
|
|
39 |
return job; |
|
40 |
} |
|
41 |
} |
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/HBaseCopyTableJob.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mapreduce.hbase.dataexport; |
|
2 |
|
|
3 |
import java.util.Properties; |
|
4 |
|
|
5 |
import org.apache.hadoop.hbase.mapreduce.CopyTable; |
|
6 |
import org.apache.hadoop.mapreduce.Job; |
|
7 |
|
|
8 |
import com.google.common.collect.Iterables; |
|
9 |
import com.google.common.collect.Lists; |
|
10 |
|
|
11 |
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob; |
|
12 |
|
|
13 |
/** |
|
14 |
* HBaseCopyTableJob is a simple wrapper over org.apache.hadoop.hbase.mapreduce.CopyTable that makes it invokable from a |
|
15 |
* D-Net workflow, passing the required parameters. |
|
16 |
* |
|
17 |
* Copies the given table to the remote hbase instance. |
|
18 |
* |
|
19 |
* @author claudio |
|
20 |
* |
|
21 |
*/ |
|
22 |
public class HBaseCopyTableJob extends AbstractHBaseMapReduceJob { |
|
23 |
|
|
24 |
@Override |
|
25 |
protected Job setJobDetails(Job job, Properties p) throws Exception { |
|
26 |
|
|
27 |
String remoteCluster = "--peer.adr=" + p.getProperty("peername"); |
|
28 |
String tablename = p.getProperty("tablename"); |
|
29 |
|
|
30 |
String[] args = Iterables.toArray(Lists.newArrayList(remoteCluster, tablename), String.class); |
|
31 |
|
|
32 |
return CopyTable.createSubmittableJob(job.getConfiguration(), args); |
|
33 |
} |
|
34 |
|
|
35 |
} |
|
36 | 0 |
Also available in: Unified diff
cleanup