Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.InputStream;
4
import java.util.Properties;
5

    
6
import org.apache.hadoop.conf.Configuration;
7
import org.apache.hadoop.mapreduce.Job;
8
import org.apache.log4j.Logger;
9

    
10
import eu.dnetlib.data.mapreduce.hbase.statsExport.StatsJob;
11

    
12
/**
13
 * @author eri
14
 * 
15
 */
16
public class StatsJobDriver {
17

    
18
	private Logger log = Logger.getLogger(this.getClass());
19

    
20
	private Configuration configuration = new Configuration();
21

    
22
	private StatsJob statsJob;
23

    
24
	private Job job;
25

    
26
	private Properties props;
27

    
28
	public void init() throws Exception {
29
		InputStream file = ClassLoader.getSystemResourceAsStream("eu/dnetlib/data/mapreduce/hbase/statsExport/StatsProperties");
30
		props = new Properties();
31
		props.load(file);
32
		file.close();
33
		configuration.set("stats.delim", props.getProperty("Stats.delimCharacter"));
34

    
35
		configuration.set("hbase.mapreduce.inputtable", props.getProperty("Stats.HbaseSourceTable"));
36

    
37
		if (!props.getProperty("Stats.outputPath").endsWith("/")) {
38
			props.setProperty("Stats.outputPath", props.getProperty("Stats.outputPath") + "/");
39
		}
40

    
41
		configuration.set("mapred.output.dir", props.getProperty("Stats.outputPath"));
42

    
43
		configuration.set("stats.nullNum", props.getProperty("Stats.nullNumericField"));
44
		configuration.set("stats.nullString", props.getProperty("Stats.nullStringField"));
45

    
46
		job = new Job(configuration);
47

    
48
		statsJob = new StatsJob();
49
		statsJob.setJobDetails(job, props);
50

    
51
	}
52

    
53
	public void run() throws Exception {
54

    
55
		long startTime = System.currentTimeMillis();
56
		long endtime;
57
		double totalTime1;
58

    
59
		// TODO here, start the Stats Export Job: Read from HBASE, and dump to
60
		// HDFS
61
		boolean completionStatus = job.waitForCompletion(true);
62

    
63
		if (completionStatus) {
64
			endtime = System.currentTimeMillis();
65
			totalTime1 = (endtime - startTime) / 60000;
66

    
67
			log.info("Time taken for MR Export : " + totalTime1 + " minutes");
68

    
69
		} else {
70
			log.error("JOB FAILED :");
71

    
72
		}
73

    
74
	}
75

    
76
}
(3-3/3)