Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Properties;
5

    
6
import org.apache.hadoop.hbase.client.Put;
7
import org.apache.hadoop.hbase.client.Scan;
8
import org.apache.hadoop.hbase.filter.PrefixFilter;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
11
import org.apache.hadoop.hbase.mapreduce.TableOutputFormat;
12
import org.apache.hadoop.hbase.util.Bytes;
13
import org.apache.hadoop.io.Text;
14
import org.apache.hadoop.mapreduce.Job;
15

    
16
import eu.dnetlib.data.mapreduce.JobParams;
17
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
18
import eu.dnetlib.data.mapreduce.util.DedupUtils;
19
import eu.dnetlib.data.proto.TypeProtos.Type;
20
import eu.dnetlib.pace.util.DedupConfig;
21
import eu.dnetlib.pace.util.DedupConfigLoader;
22

    
23
public class DedupFindRootsJob extends AbstractHBaseMapReduceJob {
24

    
25
	@Override
26
	public Job setJobDetails(final Job job, final Properties p) {
27

    
28
		initMapper(job, getScan(p), p.getProperty(JobParams.HBASE_SOURCE_TABLE));
29

    
30
		job.setOutputFormatClass(TableOutputFormat.class);
31
		job.setMapOutputKeyClass(Text.class);
32
		job.setMapOutputValueClass(Put.class);
33
		job.setNumReduceTasks(0);
34

    
35
		job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
36
		job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
37

    
38
		return job;
39
	}
40

    
41
	private Scan getScan(final Properties p) {
42
		Scan scan = new Scan();
43
		scan.setCaching(100); // 1 is the default in Scan, which will be bad for MapReduce jobs
44
		scan.setCacheBlocks(false); // don't set to true for MR jobs
45

    
46
		DedupConfig dedupConfig = DedupConfigLoader.load(p.getProperty("dedup.wf.conf"));
47

    
48
		Type type = Type.valueOf(dedupConfig.getEntityType());
49
		scan.setFilter(new PrefixFilter(Bytes.toBytes(type.getNumber())));
50
		scan.addColumn(dedupConfig.getEntityNameBytes(), DedupUtils.BODY_B);
51
		scan.addFamily(DedupUtils.getSimilarityCFBytes(type));
52
		return scan;
53
	}
54

    
55
	private void initMapper(final Job job, final Scan scan, final String sourceTable) {
56
		try {
57
			TableMapReduceUtil.initTableMapperJob(sourceTable, scan, DedupFindRootsMapper.class, Text.class, ImmutableBytesWritable.class, job);
58
		} catch (IOException e) {
59
			throw new RuntimeException(e);
60
		}
61
	}
62

    
63
}
(4-4/23)