Project

General

Profile

« Previous | Next » 

Revision 49831

cleanup

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/AbstractHBaseMapReduceJob.java
1
package eu.dnetlib.data.mapreduce.hbase;
2

  
3
import java.io.IOException;
4
import java.util.Map.Entry;
5
import java.util.Properties;
6

  
7
import javax.annotation.Resource;
8

  
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.conf.Configured;
12
import org.apache.hadoop.fs.FileSystem;
13
import org.apache.hadoop.fs.Path;
14
import org.apache.hadoop.mapreduce.Job;
15
import org.springframework.beans.factory.BeanNameAware;
16

  
17
import eu.dnetlib.data.hadoop.config.ClusterName;
18
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
19
import eu.dnetlib.data.mapreduce.HadoopJob;
20

  
21
public abstract class AbstractHBaseMapReduceJob extends Configured implements HadoopJob, BeanNameAware {
22

  
23
	protected static final Log log = LogFactory.getLog(AbstractHBaseMapReduceJob.class);
24

  
25
	protected String jobName;
26

  
27
	@Resource
28
	protected ConfigurationEnumerator configurationEnumerator;
29

  
30
	protected abstract Job setJobDetails(Job job, Properties p) throws Exception;
31

  
32
	@Override
33
	public Job setJobDetails(ClusterName name, Properties p) {
34
		try {
35
			final Job job = createJobCommon(name, p);
36
			return setJobDetails(job, p);
37
		} catch (Exception e) {
38
			throw new RuntimeException("unable to define Job: " + getClass().getSimpleName(), e);
39
		}
40
	}
41

  
42
	protected Job createJobCommon(ClusterName name, Properties p) throws IOException {
43

  
44
		Job job = new Job(configurationEnumerator.get(name), getClass().getSimpleName());
45

  
46
		merge(job, p);
47

  
48
		return job;
49
	}
50

  
51
	protected void merge(final Job job, final Properties p) {
52
		for (Entry<Object, Object> e : p.entrySet()) {
53
			job.getConfiguration().set((String) e.getKey(), (String) e.getValue());
54
		}
55
	}
56

  
57
	protected void deleteHdfsFile(Job job, Path path) {
58
		try {
59
			FileSystem hdfs = FileSystem.get(job.getConfiguration());
60
			if (hdfs.exists(path)) {
61
				hdfs.delete(path, true);
62
			}
63
		} catch (IOException e) {
64
			e.printStackTrace();
65
		}
66
	}
67

  
68
	@Override
69
	public void setBeanName(String name) {
70
		this.jobName = name;
71
	}
72

  
73
	@Override
74
	public String getName() {
75
		return jobName;
76
	}
77

  
78
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/actions/CommitActionsJob.java
1
package eu.dnetlib.data.mapreduce.hbase.actions;
2

  
3
import org.apache.hadoop.hbase.client.Put;
4
import org.apache.hadoop.hbase.client.Scan;
5
import org.apache.hadoop.hbase.filter.FilterList;
6
import org.apache.hadoop.hbase.filter.PrefixFilter;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
public class CommitActionsJob extends AbstractActionsJob {
12

  
13
	@Override
14
	protected Class<? extends TableMapper<ImmutableBytesWritable, ?>> getMapperClass() {
15
		return CommitActionsMapper.class;
16
	}
17

  
18
	@Override
19
	protected Class<?> getMapOutputValueClass() {
20
		return Put.class;
21
	}
22

  
23
	@Override
24
	protected void addSpecificFilters(FilterList filters) {
25
		filters.addFilter(new PrefixFilter(Bytes.toBytes("aac|")));
26
	}
27

  
28
	@Override
29
	protected void addSpecificScanClauses(Scan scan) {
30
		//scan.addFamily(Bytes.toBytes("target"));
31
	}
32
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/actions/DeleteActionsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.actions;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.hbase.client.Delete;
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
public class DeleteActionsMapper extends TableMapper<ImmutableBytesWritable, Delete> {
12

  
13
	private String set = null;
14

  
15
	@Override
16
	protected void setup(Context context) throws IOException, InterruptedException {
17
		set = context.getConfiguration().get("set");
18
		if (set != null) {
19
			System.out.println("Deleting set: " + set);
20
		} else {
21
			System.out.println("Deleting ALL sets");
22
		}
23
	}
24

  
25
	@Override
26
	protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
27
		if (set == null || value.getColumnLatest(Bytes.toBytes("set"), Bytes.toBytes(set)) != null) {
28

  
29
			final byte[] rowKey = key.copyBytes();
30
			System.out.println("Deleting action " + Bytes.toString(rowKey));
31

  
32
			context.getCounter("Actions", "N. Deletes").increment(1);
33
			context.write(key, new Delete(rowKey));
34
		}
35
	}
36
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/actions/DeleteActionsJob.java
1
package eu.dnetlib.data.mapreduce.hbase.actions;
2

  
3
import org.apache.hadoop.hbase.client.Delete;
4
import org.apache.hadoop.hbase.client.Scan;
5
import org.apache.hadoop.hbase.filter.FilterList;
6
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
7
import org.apache.hadoop.hbase.mapreduce.TableMapper;
8

  
9
public class DeleteActionsJob extends AbstractActionsJob {
10

  
11
	@Override
12
	protected Class<? extends TableMapper<ImmutableBytesWritable, ?>> getMapperClass() {
13
		return DeleteActionsMapper.class;
14
	}
15

  
16
	@Override
17
	protected Class<?> getMapOutputValueClass() {
18
		return Delete.class;
19
	}
20

  
21
	@Override
22
	protected void addSpecificFilters(FilterList filters) {
23
		// NOT necessary
24
	}
25

  
26
	@Override
27
	protected void addSpecificScanClauses(Scan scan) {
28
		// NOT necessary
29
	}
30
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/actions/CommitActionsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.actions;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.hbase.client.Put;
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10

  
11
public class CommitActionsMapper extends TableMapper<ImmutableBytesWritable, Put> {
12

  
13
	private String set = null;
14

  
15
	@Override
16
	protected void setup(Context context) throws IOException, InterruptedException {
17
		set = context.getConfiguration().get("set");
18
		if (set != null) {
19
			System.out.println("Committing set: " + set);
20
		} else {
21
			System.out.println("Committing ALL sets");
22
		}
23
	}
24

  
25
	@Override
26
	protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
27

  
28
		if (set == null || value.getColumnLatest(Bytes.toBytes("set"), Bytes.toBytes(set)) != null) {
29
			System.out.println("Processing action " + Bytes.toString(key.copyBytes()));
30

  
31
			byte[] cf = Bytes.toBytes("target");
32

  
33
			byte[] tkey = value.getValue(cf, Bytes.toBytes("rowKey"));
34
			byte[] tcf = value.getValue(cf, Bytes.toBytes("columnFamily"));
35
			byte[] tc = value.getValue(cf, Bytes.toBytes("column"));
36
			byte[] tv = value.getValue(cf, Bytes.toBytes("content"));
37

  
38
			Put put = new Put(tkey);
39
			put.add(tcf, tc, tv);
40

  
41
			context.getCounter("Actions", Bytes.toString(tcf)).increment(1);
42

  
43
			context.write(new ImmutableBytesWritable(tkey), put);
44
		}
45
	}
46
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/actions/AbstractActionsJob.java
1
package eu.dnetlib.data.mapreduce.hbase.actions;
2

  
3
import java.io.IOException;
4
import java.util.Properties;
5

  
6
import org.apache.hadoop.hbase.client.Scan;
7
import org.apache.hadoop.hbase.filter.FilterList;
8
import org.apache.hadoop.hbase.filter.FilterList.Operator;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
13
import org.apache.hadoop.io.Text;
14
import org.apache.hadoop.mapreduce.Job;
15

  
16
import eu.dnetlib.data.mapreduce.JobParams;
17
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
18

  
19
abstract public class AbstractActionsJob extends AbstractHBaseMapReduceJob {
20

  
21
	@Override
22
	protected Job setJobDetails(Job job, Properties p) throws Exception {
23
		initMapper(job, getScan(p), p.getProperty(JobParams.HBASE_SOURCE_TABLE));
24

  
25
		job.setOutputFormatClass(TableOutputFormat.class);
26
		job.setMapOutputKeyClass(Text.class);
27
		job.setMapOutputValueClass(getMapOutputValueClass());
28
		job.setNumReduceTasks(0);
29

  
30
		return job;
31
	}
32

  
33
	private void initMapper(final Job job, final Scan scan, final String sourceTable) {
34
		try {
35
			TableMapReduceUtil.initTableMapperJob(sourceTable, scan, getMapperClass(), Text.class, ImmutableBytesWritable.class, job);
36
		} catch (IOException e) {
37
			throw new RuntimeException(e);
38
		}
39
	}
40

  
41
	abstract protected Class<? extends TableMapper<ImmutableBytesWritable, ?>> getMapperClass();
42

  
43
	abstract protected Class<?> getMapOutputValueClass();
44

  
45
	abstract protected void addSpecificFilters(FilterList filters);
46

  
47
	abstract protected void addSpecificScanClauses(Scan scan);
48

  
49
	private Scan getScan(Properties p) {
50

  
51
		Scan scan = new Scan();
52
		scan.setCaching(500);
53
		scan.setCacheBlocks(false);
54

  
55
		FilterList filters = new FilterList(Operator.MUST_PASS_ALL);
56

  
57
		//if (p.containsKey("set")) {
58
		//filters.addFilter(new FamilyFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("set"))));
59
		//filters.addFilter(new QualifierFilter(CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes(p.getProperty("set")))));
60
		//scan.addFamily(Bytes.toBytes("set"));
61
		//			byte[] qualifier = Bytes.toBytes(p.getProperty("set"));
62
		//			SingleColumnValueFilter filter = new SingleColumnValueFilter(Bytes.toBytes("set"), qualifier, CompareOp.EQUAL, qualifier);
63
		//			filter.setFilterIfMissing(true);
64
		//			filters.addFilter(filter);
65
		//scan.addColumn(Bytes.toBytes("set"), Bytes.toBytes(p.getProperty("set")));
66
		//}
67

  
68
		addSpecificFilters(filters);
69
		addSpecificScanClauses(scan);
70

  
71
		scan.setFilter(filters);
72
		return scan;
73
	}
74

  
75
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/index/DocumentDatabaseFeedJob.java
1
package eu.dnetlib.data.mapreduce.hbase.index;
2

  
3
import java.io.IOException;
4
import java.util.Properties;
5

  
6
import org.apache.hadoop.fs.Path;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Job;
9
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
10
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
11
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
12

  
13
import eu.dnetlib.data.mapreduce.JobParams;
14
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
15

  
16
public class DocumentDatabaseFeedJob extends AbstractHBaseMapReduceJob {
17

  
18
	@Override
19
	public Job setJobDetails(final Job job, final Properties p) {
20

  
21
		job.setInputFormatClass(SequenceFileInputFormat.class);
22
		try {
23
			FileInputFormat.setInputPaths(job, p.getProperty(JobParams.MAPRED_INPUT_DIR));
24
			FileOutputFormat.setOutputPath(job, new Path(p.getProperty(JobParams.DOCUMENT_DB_ROTTEN_FILE)));
25
		} catch (IOException e) {
26
			throw new RuntimeException(e);
27
		}
28
		job.setMapperClass(DocumentDatabaseMapper.class);
29
		job.setMapOutputKeyClass(Text.class);
30
		job.setMapOutputValueClass(Text.class);
31

  
32
		job.setNumReduceTasks(0);
33

  
34
		job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
35
		job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
36

  
37
		job.getConfiguration().setBoolean("mapred.compress.map.output", true);
38

  
39
		return job;
40
	}
41
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/HBaseCopyTableJob.java
1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

  
3
import java.util.Properties;
4

  
5
import org.apache.hadoop.hbase.mapreduce.CopyTable;
6
import org.apache.hadoop.mapreduce.Job;
7

  
8
import com.google.common.collect.Iterables;
9
import com.google.common.collect.Lists;
10

  
11
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
12

  
13
/**
14
 * HBaseCopyTableJob is a simple wrapper over org.apache.hadoop.hbase.mapreduce.CopyTable that makes it invokable from a
15
 * D-Net workflow, passing the required parameters.
16
 * 
17
 * Copies the given table to the remote hbase instance.
18
 * 
19
 * @author claudio
20
 * 
21
 */
22
public class HBaseCopyTableJob extends AbstractHBaseMapReduceJob {
23

  
24
	@Override
25
	protected Job setJobDetails(Job job, Properties p) throws Exception {
26

  
27
		String remoteCluster = "--peer.adr=" + p.getProperty("peername");
28
		String tablename = p.getProperty("tablename");
29

  
30
		String[] args = Iterables.toArray(Lists.newArrayList(remoteCluster, tablename), String.class);
31

  
32
		return CopyTable.createSubmittableJob(job.getConfiguration(), args);
33
	}
34

  
35
}
36 0

  

Also available in: Unified diff