Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport;
2

    
3
import java.io.InputStream;
4
import java.nio.charset.Charset;
5
import java.util.Map.Entry;
6
import java.util.Properties;
7

    
8
import org.apache.hadoop.conf.Configuration;
9
import org.apache.hadoop.fs.FileSystem;
10
import org.apache.hadoop.fs.Path;
11
import org.apache.hadoop.hbase.client.Scan;
12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
13
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
14
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
15
import org.apache.hadoop.hbase.util.Bytes;
16
import org.apache.hadoop.io.Text;
17
import org.apache.hadoop.mapreduce.Job;
18
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
19
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
20
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
21
import org.apache.log4j.Logger;
22
import org.junit.Test;
23

    
24
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
25

    
26
public class StatsJob extends AbstractHBaseMapReduceJob {
27
	private Logger log = Logger.getLogger(this.getClass());
28

    
29
	private FileSystem hdfs;
30

    
31
	private Properties tableMappings = new Properties();
32
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
33

    
34
	@Override
35
	public Job setJobDetails(Job job, Properties p) throws Exception {
36

    
37
		InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
38

    
39
		tableMappings.load(file);
40
		file.close();
41
		// TODO dnet config - only if working directly on cluster
42
		dnetConfig(job);
43

    
44
		if (p != null) {
45
			for (Entry<Object, Object> prop : p.entrySet()) {
46
				job.getConfiguration().set(prop.getKey().toString(), prop.getValue().toString());
47
			}
48
		}
49

    
50
		job.setJarByClass(StatsJob.class);
51
		job.setMapperClass(StatsMapper.class);
52
		job.setReducerClass(StatsReducer.class);
53
		job.setInputFormatClass(TableInputFormat.class);
54
		job.setMapOutputKeyClass(Text.class);
55
		job.setMapOutputValueClass(ImmutableBytesWritable.class);
56

    
57
		//
58
		// TODO here how many m/r tasks????
59
		job.setNumReduceTasks(10);
60
		this.hdfs = FileSystem.get(new Configuration());
61

    
62
		job.getConfiguration().set(TableInputFormat.INPUT_TABLE, job.getConfiguration().get("hbase.mapreduce.inputtable"));
63

    
64
		job.setOutputValueClass(SequenceFileOutputFormat.class);
65
		// TODO
66
		hdfs.delete(new Path(job.getConfiguration().get("mapred.output.dir")), true);
67

    
68
		job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
69
		job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
70

    
71
		job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
72
		job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
73

    
74
		job.getConfiguration().setBoolean("mapred.compress.map.output", false);
75

    
76
		Scan scan = new Scan();
77
		// TODO add all column families here to get correct results?
78
		// scan.addFamily(Bytes.toBytes("result"));
79
		// scan.addFamily(Bytes.toBytes("personResult_authorship_hasAuthor"));
80
		// scan.addFamily(Bytes.toBytes("resultResult_publicationDataset_isRelatedTo"));
81
		scan.addFamily(Bytes.toBytes("datasource"));
82
		// scan.addFamily(Bytes.toBytes("datasourceOrganization_provision_provides"));
83

    
84
		scan.setCaching(200);
85

    
86
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
87
			String type = (String) e.getKey();
88

    
89
			log.info("Creating MultipleOutput : " + type.toString());
90
			MultipleOutputs.addNamedOutput(job, type.toString(), TextOutputFormat.class, Text.class, Text.class);
91

    
92
		}
93

    
94
		TableMapReduceUtil.initTableMapperJob(job.getConfiguration().get("hbase.mapreduce.inputtable"), scan, StatsMapper.class, Text.class, ImmutableBytesWritable.class, job);
95

    
96
		// log.info("Table Utils" + HBaseTableUtils.listColumns());
97
		// [datasourceOrganization, result, organizationOrganization, person,
98
		// projectPerson, resultOrganization, dedup, resultResult,
99
		// resultProject, project, organization, personResult,
100
		// projectOrganization, personPerson, dedupPerson, datasource]
101
		SequenceFileOutputFormat.setCompressOutput(job, false);
102

    
103
		Path path = new Path(job.getConfiguration().get("mapred.output.dir"));
104

    
105
		SequenceFileOutputFormat.setOutputPath(job, path);
106

    
107
		return job;
108

    
109
	}
110
//@Test
111
//	public void test() {
112
//		String d = "Ερη";
113
//
114
//		Text t = new Text(d);
115
//
116
//		System.out.print("str is " + t.toString());
117
//		t = new Text(d.getBytes(Charset.forName("UTF-8")));
118
//		 
119
//		System.out.print("byte str is " + new String(t.getBytes(),Charset.forName("UTF-8")));
120
//
121
//	}
122

    
123
	private void dnetConfig(Job job) {// #HBASE-SITE
124

    
125
		job.getConfiguration().set("hbase.rootdir", "hdfs://nmis-hadoop-cluster/hbase");
126

    
127
		job.getConfiguration().set("hbase.security.authentication", "simple");
128
		// ZOOKEEPER
129

    
130
		job.getConfiguration().set("zookeeper.znode.rootserver", "root-region-server");
131

    
132
		job.getConfiguration().set("hbase.zookeeper.quorum",
133
				"quorum1.t.hadoop.research-infrastructures.eu,quorum2.t.hadoop.research-infrastructures.eu,quorum3.t.hadoop.research-infrastructures.eu,quorum4.t.hadoop.research-infrastructures.eu,jobtracker.t.hadoop.research-infrastructures.eu");
134

    
135
		job.getConfiguration().set("hbase.zookeeper.property.clientPort", "2182");
136

    
137
		// job.getConfiguration().set("dnet.clustername", "DM");
138
		// #CORE-SITE
139
		// job.getConfiguration().set("fs.defaultFS",
140
		// "hdfs://quorum1.t.hadoop.research-infrastructures.eu");
141
		//
142
		// job.getConfiguration().set("hadoop.security.authentication",
143
		// "simple");
144
		// job.getConfiguration().set("hadoop.security.auth_to_local",
145
		// "DEFAULT");
146
		// job.getConfiguration().set("hadoop.rpc.socket.factory.class.default",
147
		// "org.apache.hadoop.net.StandardSocketFactory");
148

    
149
		// #HDFS-SITE
150

    
151
		// job.getConfiguration().set("hadoop.rpc.socket.factory.class.default",
152
		// "org.apache.hadoop.net.StandardSocketFactory");
153
		// job.getConfiguration().set("dfs.ha.namenodes.nmis-hadoop-cluster",
154
		// "nn1,nn2 \n dfs.namenode.rpc-address.nmis-hadoop-cluster.nn1=quorum1.t.hadoop.research-infrastructures.eu:8020 \n"
155
		// +
156
		// " dfs.namenode.http-address.nmis-hadoop-cluster.nn1=quorum1.t.hadoop.research-infrastructures.eu:50070 \n "
157
		// +
158
		// " dfs.namenode.rpc-address.nmis-hadoop-cluster.nn2=quorum2.t.hadoop.research-infrastructures.eu:8020 \n "
159
		// +
160
		// "dfs.namenode.http-address.nmis-hadoop-cluster.nn2=quorum2.t.hadoop.research-infrastructures.eu:50070 \n"
161
		// +
162
		// "dfs.client.failover.proxy.provider.nmis-hadoop-cluster=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
163

    
164
		// #MAPRED-SITE
165
		// job.getConfiguration().set("mapred.job.tracker", "nmis-hadoop-jt");
166
		// job.getConfiguration().set("mapred.jobtrackers.nmis-hadoop-jt",
167
		// "jt1,jt2");
168
		// job.getConfiguration().set("mapred.jobtracker.rpc-address.nmis-hadoop-jt.jt1",
169
		// "jobtracker.t.hadoop.research-infrastructures.eu:8021");
170
		// job.getConfiguration().set("mapred.jobtracker.rpc-address.nmis-hadoop-jt.jt2",
171
		// "quorum4.t.hadoop.research-infrastructures.eu:8022");
172
		//
173
		// job.getConfiguration().set("mapred.mapper.new-api", "true");
174

    
175
		// #OOZIE SERVER
176

    
177
		// job.getConfiguration().set("oozie.service.loc",
178
		// "http://oozie.t.hadoop.research-infrastructures.eu:11000/oozie");
179
	}
180
}
(1-1/3)