Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport;
2

    
3
import java.io.InputStream;
4
import java.util.Map.Entry;
5
import java.util.Properties;
6

    
7
import org.apache.hadoop.conf.Configuration;
8
import org.apache.hadoop.fs.FileSystem;
9
import org.apache.hadoop.fs.Path;
10
import org.apache.hadoop.hbase.client.Scan;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
13
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
14
import org.apache.hadoop.hbase.util.Bytes;
15
import org.apache.hadoop.io.Text;
16
import org.apache.hadoop.mapreduce.Job;
17
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
18
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
19
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
20
import org.apache.log4j.Logger;
21

    
22
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
23
import eu.dnetlib.data.mapreduce.hbase.HBaseTableUtils;
24

    
25
public class StatsJob extends AbstractHBaseMapReduceJob {
26
	private Logger log = Logger.getLogger(this.getClass());
27

    
28
	private FileSystem hdfs;
29

    
30
	private Properties tableMappings = new Properties();
31
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
32

    
33
	@Override
34
	public Job setJobDetails(Job job, Properties p) throws Exception {
35

    
36
		InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
37

    
38
		tableMappings.load(file);
39
		file.close();
40
		// TODO dnet config - only if working directly on cluster
41
		dnetConfig(job);
42

    
43
		if (p != null) {
44
			for (Entry<Object, Object> prop : p.entrySet()) {
45
				job.getConfiguration().set(prop.getKey().toString(), prop.getValue().toString());
46
			}
47
		}
48

    
49
		job.setJarByClass(StatsJob.class);
50
		job.setMapperClass(StatsMapper.class);
51
		job.setReducerClass(StatsReducer.class);
52
		job.setInputFormatClass(TableInputFormat.class);
53
		job.setMapOutputKeyClass(Text.class);
54
		job.setMapOutputValueClass(ImmutableBytesWritable.class);
55

    
56
		//
57
		// TODO here how many m/r tasks????
58
		job.setNumReduceTasks(10);
59
		this.hdfs = FileSystem.get(new Configuration());
60

    
61
		job.getConfiguration().set(TableInputFormat.INPUT_TABLE, job.getConfiguration().get("hbase.mapreduce.inputtable"));
62

    
63
		job.setOutputValueClass(SequenceFileOutputFormat.class);
64
		// TODO
65
		hdfs.delete(new Path(job.getConfiguration().get("mapred.output.dir")), true);
66

    
67
		job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
68
		job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
69

    
70
		job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
71
		job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
72

    
73
		job.getConfiguration().setBoolean("mapred.compress.map.output", false);
74

    
75
		Scan scan = new Scan();
76
		// TODO add all column families here to get correct results?
77
		// scan.addFamily(Bytes.toBytes("result"));
78
		// scan.addFamily(Bytes.toBytes("personResult_authorship_hasAuthor"));
79
		// scan.addFamily(Bytes.toBytes("resultResult_publicationDataset_isRelatedTo"));
80
		scan.addFamily(Bytes.toBytes("datasource"));
81
		// scan.addFamily(Bytes.toBytes("datasourceOrganization_provision_provides"));
82

    
83
		scan.setCaching(200);
84

    
85
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
86
			String type = (String) e.getKey();
87

    
88
			log.info("Creating MultipleOutput : " + type.toString());
89
			MultipleOutputs.addNamedOutput(job, type.toString(), TextOutputFormat.class, Text.class, Text.class);
90

    
91
		}
92

    
93
		TableMapReduceUtil.initTableMapperJob(job.getConfiguration().get("hbase.mapreduce.inputtable"), scan, StatsMapper.class, Text.class, ImmutableBytesWritable.class, job);
94

    
95
//		log.info("Table Utils" + HBaseTableUtils.listColumns());
96
		// [datasourceOrganization, result, organizationOrganization, person,
97
		// projectPerson, resultOrganization, dedup, resultResult,
98
		// resultProject, project, organization, personResult,
99
		// projectOrganization, personPerson, dedupPerson, datasource]
100
		SequenceFileOutputFormat.setCompressOutput(job, false);
101

    
102
		Path path = new Path(job.getConfiguration().get("mapred.output.dir"));
103

    
104
		SequenceFileOutputFormat.setOutputPath(job, path);
105

    
106
		return job;
107

    
108
	}
109

    
110
	private void dnetConfig(Job job) {// #HBASE-SITE
111

    
112
		job.getConfiguration().set("hbase.rootdir", "hdfs://nmis-hadoop-cluster/hbase");
113

    
114
		job.getConfiguration().set("hbase.security.authentication", "simple");
115
		// ZOOKEEPER
116

    
117
		job.getConfiguration().set("zookeeper.znode.rootserver", "root-region-server");
118

    
119
		job.getConfiguration().set("hbase.zookeeper.quorum",
120
				"quorum1.t.hadoop.research-infrastructures.eu,quorum2.t.hadoop.research-infrastructures.eu,quorum3.t.hadoop.research-infrastructures.eu,quorum4.t.hadoop.research-infrastructures.eu,jobtracker.t.hadoop.research-infrastructures.eu");
121

    
122
		job.getConfiguration().set("hbase.zookeeper.property.clientPort", "2182");
123

    
124
		// job.getConfiguration().set("dnet.clustername", "DM");
125
		// #CORE-SITE
126
		// job.getConfiguration().set("fs.defaultFS",
127
		// "hdfs://quorum1.t.hadoop.research-infrastructures.eu");
128
		//
129
		// job.getConfiguration().set("hadoop.security.authentication",
130
		// "simple");
131
		// job.getConfiguration().set("hadoop.security.auth_to_local",
132
		// "DEFAULT");
133
		// job.getConfiguration().set("hadoop.rpc.socket.factory.class.default",
134
		// "org.apache.hadoop.net.StandardSocketFactory");
135

    
136
		// #HDFS-SITE
137

    
138
		// job.getConfiguration().set("hadoop.rpc.socket.factory.class.default",
139
		// "org.apache.hadoop.net.StandardSocketFactory");
140
		// job.getConfiguration().set("dfs.ha.namenodes.nmis-hadoop-cluster",
141
		// "nn1,nn2 \n dfs.namenode.rpc-address.nmis-hadoop-cluster.nn1=quorum1.t.hadoop.research-infrastructures.eu:8020 \n"
142
		// +
143
		// " dfs.namenode.http-address.nmis-hadoop-cluster.nn1=quorum1.t.hadoop.research-infrastructures.eu:50070 \n "
144
		// +
145
		// " dfs.namenode.rpc-address.nmis-hadoop-cluster.nn2=quorum2.t.hadoop.research-infrastructures.eu:8020 \n "
146
		// +
147
		// "dfs.namenode.http-address.nmis-hadoop-cluster.nn2=quorum2.t.hadoop.research-infrastructures.eu:50070 \n"
148
		// +
149
		// "dfs.client.failover.proxy.provider.nmis-hadoop-cluster=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
150

    
151
		// #MAPRED-SITE
152
		// job.getConfiguration().set("mapred.job.tracker", "nmis-hadoop-jt");
153
		// job.getConfiguration().set("mapred.jobtrackers.nmis-hadoop-jt",
154
		// "jt1,jt2");
155
		// job.getConfiguration().set("mapred.jobtracker.rpc-address.nmis-hadoop-jt.jt1",
156
		// "jobtracker.t.hadoop.research-infrastructures.eu:8021");
157
		// job.getConfiguration().set("mapred.jobtracker.rpc-address.nmis-hadoop-jt.jt2",
158
		// "quorum4.t.hadoop.research-infrastructures.eu:8022");
159
		//
160
		// job.getConfiguration().set("mapred.mapper.new-api", "true");
161

    
162
		// #OOZIE SERVER
163

    
164
		// job.getConfiguration().set("oozie.service.loc",
165
		// "http://oozie.t.hadoop.research-infrastructures.eu:11000/oozie");
166
	}
167
}
(1-1/3)