1
|
package eu.dnetlib.data.mapreduce.hbase.statsExport;
|
2
|
|
3
|
import java.io.InputStream;
|
4
|
import java.nio.charset.Charset;
|
5
|
import java.util.Map.Entry;
|
6
|
import java.util.Properties;
|
7
|
|
8
|
import org.apache.hadoop.conf.Configuration;
|
9
|
import org.apache.hadoop.fs.FileSystem;
|
10
|
import org.apache.hadoop.fs.Path;
|
11
|
import org.apache.hadoop.hbase.client.Scan;
|
12
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
13
|
import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
|
14
|
import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
|
15
|
import org.apache.hadoop.hbase.util.Bytes;
|
16
|
import org.apache.hadoop.io.Text;
|
17
|
import org.apache.hadoop.mapreduce.Job;
|
18
|
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
|
19
|
import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
|
20
|
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
|
21
|
import org.apache.log4j.Logger;
|
22
|
import org.junit.Test;
|
23
|
|
24
|
import eu.dnetlib.data.mapreduce.hbase.AbstractHBaseMapReduceJob;
|
25
|
|
26
|
public class StatsJob extends AbstractHBaseMapReduceJob {
|
27
|
private Logger log = Logger.getLogger(this.getClass());
|
28
|
|
29
|
private FileSystem hdfs;
|
30
|
|
31
|
private Properties tableMappings = new Properties();
|
32
|
private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
|
33
|
|
34
|
@Override
|
35
|
public Job setJobDetails(Job job, Properties p) throws Exception {
|
36
|
|
37
|
InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
|
38
|
|
39
|
tableMappings.load(file);
|
40
|
file.close();
|
41
|
// TODO dnet config - only if working directly on cluster
|
42
|
dnetConfig(job);
|
43
|
|
44
|
if (p != null) {
|
45
|
for (Entry<Object, Object> prop : p.entrySet()) {
|
46
|
job.getConfiguration().set(prop.getKey().toString(), prop.getValue().toString());
|
47
|
}
|
48
|
}
|
49
|
|
50
|
job.setJarByClass(StatsJob.class);
|
51
|
job.setMapperClass(StatsMapper.class);
|
52
|
job.setReducerClass(StatsReducer.class);
|
53
|
job.setInputFormatClass(TableInputFormat.class);
|
54
|
job.setMapOutputKeyClass(Text.class);
|
55
|
job.setMapOutputValueClass(ImmutableBytesWritable.class);
|
56
|
|
57
|
//
|
58
|
// TODO here how many m/r tasks????
|
59
|
job.setNumReduceTasks(10);
|
60
|
this.hdfs = FileSystem.get(new Configuration());
|
61
|
|
62
|
job.getConfiguration().set(TableInputFormat.INPUT_TABLE, job.getConfiguration().get("hbase.mapreduce.inputtable"));
|
63
|
|
64
|
job.setOutputValueClass(SequenceFileOutputFormat.class);
|
65
|
// TODO
|
66
|
hdfs.delete(new Path(job.getConfiguration().get("mapred.output.dir")), true);
|
67
|
|
68
|
job.getConfiguration().setBoolean("mapred.reduce.tasks.speculative.execution", false);
|
69
|
job.getConfiguration().setBoolean("mapred.map.tasks.speculative.execution", false);
|
70
|
|
71
|
job.getConfiguration().setBoolean("mapreduce.map.speculative", false);
|
72
|
job.getConfiguration().setBoolean("mapreduce.reduce.speculative", false);
|
73
|
|
74
|
job.getConfiguration().setBoolean("mapred.compress.map.output", false);
|
75
|
|
76
|
Scan scan = new Scan();
|
77
|
// TODO add all column families here to get correct results?
|
78
|
// scan.addFamily(Bytes.toBytes("result"));
|
79
|
// scan.addFamily(Bytes.toBytes("personResult_authorship_hasAuthor"));
|
80
|
// scan.addFamily(Bytes.toBytes("resultResult_publicationDataset_isRelatedTo"));
|
81
|
scan.addFamily(Bytes.toBytes("datasource"));
|
82
|
// scan.addFamily(Bytes.toBytes("datasourceOrganization_provision_provides"));
|
83
|
|
84
|
scan.setCaching(200);
|
85
|
|
86
|
for (Entry<Object, Object> e : tableMappings.entrySet()) {
|
87
|
String type = (String) e.getKey();
|
88
|
|
89
|
log.info("Creating MultipleOutput : " + type.toString());
|
90
|
MultipleOutputs.addNamedOutput(job, type.toString(), TextOutputFormat.class, Text.class, Text.class);
|
91
|
|
92
|
}
|
93
|
|
94
|
TableMapReduceUtil.initTableMapperJob(job.getConfiguration().get("hbase.mapreduce.inputtable"), scan, StatsMapper.class, Text.class, ImmutableBytesWritable.class, job);
|
95
|
|
96
|
// log.info("Table Utils" + HBaseTableUtils.listColumns());
|
97
|
// [datasourceOrganization, result, organizationOrganization, person,
|
98
|
// projectPerson, resultOrganization, dedup, resultResult,
|
99
|
// resultProject, project, organization, personResult,
|
100
|
// projectOrganization, personPerson, dedupPerson, datasource]
|
101
|
SequenceFileOutputFormat.setCompressOutput(job, false);
|
102
|
|
103
|
Path path = new Path(job.getConfiguration().get("mapred.output.dir"));
|
104
|
|
105
|
SequenceFileOutputFormat.setOutputPath(job, path);
|
106
|
|
107
|
return job;
|
108
|
|
109
|
}
|
110
|
//@Test
|
111
|
// public void test() {
|
112
|
// String d = "Ερη";
|
113
|
//
|
114
|
// Text t = new Text(d);
|
115
|
//
|
116
|
// System.out.print("str is " + t.toString());
|
117
|
// t = new Text(d.getBytes(Charset.forName("UTF-8")));
|
118
|
//
|
119
|
// System.out.print("byte str is " + new String(t.getBytes(),Charset.forName("UTF-8")));
|
120
|
//
|
121
|
// }
|
122
|
|
123
|
private void dnetConfig(Job job) {// #HBASE-SITE
|
124
|
|
125
|
job.getConfiguration().set("hbase.rootdir", "hdfs://nmis-hadoop-cluster/hbase");
|
126
|
|
127
|
job.getConfiguration().set("hbase.security.authentication", "simple");
|
128
|
// ZOOKEEPER
|
129
|
|
130
|
job.getConfiguration().set("zookeeper.znode.rootserver", "root-region-server");
|
131
|
|
132
|
job.getConfiguration().set("hbase.zookeeper.quorum",
|
133
|
"quorum1.t.hadoop.research-infrastructures.eu,quorum2.t.hadoop.research-infrastructures.eu,quorum3.t.hadoop.research-infrastructures.eu,quorum4.t.hadoop.research-infrastructures.eu,jobtracker.t.hadoop.research-infrastructures.eu");
|
134
|
|
135
|
job.getConfiguration().set("hbase.zookeeper.property.clientPort", "2182");
|
136
|
|
137
|
// job.getConfiguration().set("dnet.clustername", "DM");
|
138
|
// #CORE-SITE
|
139
|
// job.getConfiguration().set("fs.defaultFS",
|
140
|
// "hdfs://quorum1.t.hadoop.research-infrastructures.eu");
|
141
|
//
|
142
|
// job.getConfiguration().set("hadoop.security.authentication",
|
143
|
// "simple");
|
144
|
// job.getConfiguration().set("hadoop.security.auth_to_local",
|
145
|
// "DEFAULT");
|
146
|
// job.getConfiguration().set("hadoop.rpc.socket.factory.class.default",
|
147
|
// "org.apache.hadoop.net.StandardSocketFactory");
|
148
|
|
149
|
// #HDFS-SITE
|
150
|
|
151
|
// job.getConfiguration().set("hadoop.rpc.socket.factory.class.default",
|
152
|
// "org.apache.hadoop.net.StandardSocketFactory");
|
153
|
// job.getConfiguration().set("dfs.ha.namenodes.nmis-hadoop-cluster",
|
154
|
// "nn1,nn2 \n dfs.namenode.rpc-address.nmis-hadoop-cluster.nn1=quorum1.t.hadoop.research-infrastructures.eu:8020 \n"
|
155
|
// +
|
156
|
// " dfs.namenode.http-address.nmis-hadoop-cluster.nn1=quorum1.t.hadoop.research-infrastructures.eu:50070 \n "
|
157
|
// +
|
158
|
// " dfs.namenode.rpc-address.nmis-hadoop-cluster.nn2=quorum2.t.hadoop.research-infrastructures.eu:8020 \n "
|
159
|
// +
|
160
|
// "dfs.namenode.http-address.nmis-hadoop-cluster.nn2=quorum2.t.hadoop.research-infrastructures.eu:50070 \n"
|
161
|
// +
|
162
|
// "dfs.client.failover.proxy.provider.nmis-hadoop-cluster=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider");
|
163
|
|
164
|
// #MAPRED-SITE
|
165
|
// job.getConfiguration().set("mapred.job.tracker", "nmis-hadoop-jt");
|
166
|
// job.getConfiguration().set("mapred.jobtrackers.nmis-hadoop-jt",
|
167
|
// "jt1,jt2");
|
168
|
// job.getConfiguration().set("mapred.jobtracker.rpc-address.nmis-hadoop-jt.jt1",
|
169
|
// "jobtracker.t.hadoop.research-infrastructures.eu:8021");
|
170
|
// job.getConfiguration().set("mapred.jobtracker.rpc-address.nmis-hadoop-jt.jt2",
|
171
|
// "quorum4.t.hadoop.research-infrastructures.eu:8022");
|
172
|
//
|
173
|
// job.getConfiguration().set("mapred.mapper.new-api", "true");
|
174
|
|
175
|
// #OOZIE SERVER
|
176
|
|
177
|
// job.getConfiguration().set("oozie.service.loc",
|
178
|
// "http://oozie.t.hadoop.research-infrastructures.eu:11000/oozie");
|
179
|
}
|
180
|
}
|