Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.File;
4
import java.io.InputStream;
5
import java.util.ArrayList;
6
import java.util.HashMap;
7
import java.util.Map.Entry;
8
import java.util.Properties;
9

    
10
import org.apache.commons.cli.CommandLine;
11
import org.apache.hadoop.conf.Configuration;
12
import org.apache.hadoop.fs.FileSystem;
13
import org.apache.hadoop.fs.LocatedFileStatus;
14
import org.apache.hadoop.fs.Path;
15
import org.apache.hadoop.fs.RemoteIterator;
16
import org.apache.log4j.BasicConfigurator;
17
import org.apache.log4j.Logger;
18
import org.apache.sqoop.tool.SqoopTool;
19

    
20
import com.cloudera.sqoop.SqoopOptions;
21
import com.cloudera.sqoop.mapreduce.ExportOutputFormat;
22

    
23
public class SqoopDriver {
24
	private Logger log = Logger.getLogger(this.getClass());
25

    
26
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
27

    
28
	private Properties tableMappings = new Properties();
29

    
30
	private String outputPath;
31
	private String dbUser;
32
	private String dbPass;
33
	private String connectionUrl;
34
	// Number of statements in each batch insert op
35
	private String RecsPerStatement;
36
	// Number of statements for each commit ( commit every 1000 inserts, for
37
	// instance). Set to high value to reduce I/O costs.
38
	private String StatementPerTrans;
39
	// Field Seperator ( Delimeter ) used in import HDFS fields.
40
	private String delim;
41
	// Number of reducer classes that Sqoop can use.
42
	private String sqoopReducersCount;
43
	private FileSystem hdfs;
44
	private Configuration conf;
45
	private boolean useHdfsStore = true;
46
	private boolean batch = true;
47
	private boolean verbose = true;
48

    
49
	/**
50
	 * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
51
	 * destination table> pair given in the @tables argument.
52
	 * 
53
	 * Needed parameters ( connection, database name, etc are set in class
54
	 * parameters when an new Sqoop Driver instance is created.
55
	 * 
56
	 * @param tables
57
	 */
58
	public void run(HashMap<String, String> tables) throws Exception {
59

    
60
		for (Entry<String, String> table : tables.entrySet()) {
61

    
62
			log.info("Importing " + table);
63

    
64
			SqoopTool tool = SqoopTool.getTool("export");
65
			if (null == tool) {
66
				log.error("No such sqoop tool: " + "dnetSqoop" + ". See 'sqoop help'.");
67
				throw new RuntimeException("Could not get Sqoop  Export Tool ");
68
			}
69

    
70
			SqoopOptions opts = new SqoopOptions();
71
			opts.setConnectString(connectionUrl);
72
			opts.setExportDir(table.getValue());
73
			opts.setTableName(table.getKey());
74
			opts.setUsername(dbUser);
75
			opts.setPassword(dbPass);
76
			opts.setInputFieldsTerminatedBy(delim.charAt(0));
77
			opts.setBatchMode(batch);
78

    
79
			opts.setVerbose(verbose);
80

    
81
			opts.setNumMappers(Integer.parseInt(sqoopReducersCount));
82
			// TODO : important!! this is necessary since MR multiple
83
			// outputs writer appends a \t char after writing the KEY
84
			// field in HDFS.
85
			// use this so sqoop can ignore it
86

    
87
			opts.setInputEnclosedBy('	');
88
			opts.setInputEncloseRequired(false);
89

    
90
			opts.setExportDir(table.getValue());
91
			opts.setTableName(table.getKey());
92
		 
93
			  ArrayList<String> args = new ArrayList<String>();
94
			  args.add("-Dsqoop.statements.per.transaction");
95
			  args.add("10000"); 
96
			  args.add("-Dsqoop.statements.per.transaction");
97
			  args.add("10000"); 
98
//			  CommonArgs.addHadoopFlags(args);
99

    
100
				  
101
			  args.add(ExportOutputFormat.RECORDS_PER_STATEMENT_KEY + "="
102
			           + "1000");
103
			       args.add("-D");
104
			      args.add(ExportOutputFormat.STATEMENTS_PER_TRANSACTION_KEY + "="
105
		           + "1000"); 
106
			opts.setExtraArgs( args.toArray(new String[0]));
107

    
108
			conf.set("-Dsqoop.statements.per.transaction", "1000");
109
			conf.set("-Dsqoop.records.per.statement", "1000");
110
			opts.setConf(conf);
111

    
112
			tool.appendArgs( args.toArray(new String[0]));
113
//			
114
			tool.run(opts);
115

    
116
			cleanUp(table.getKey());
117
		}
118
	}
119

    
120
	public static void main(String[] args) {
121
		SqoopDriver sqoopDriver = new SqoopDriver();
122

    
123
		BasicConfigurator.configure();
124

    
125
		sqoopDriver.setDelim("!");
126
		sqoopDriver.setOutputPath("/tmp/test_stats/");
127

    
128
		sqoopDriver.setConnectionUrl("jdbc:postgresql://duffy.di.uoa.gr:5432/test_stats");
129
		sqoopDriver.setDbUser("sqoop");
130
		sqoopDriver.setDbPass("sqoop");
131
		sqoopDriver.setReducersCount("1");
132
		sqoopDriver.setRecsPerStatement("1000");
133
		sqoopDriver.setStatementPerTrans("1000");
134

    
135
		sqoopDriver.setUseHdfsStore(true);
136

    
137
		try {
138
			sqoopDriver.initSqoopJob();
139

    
140
		} catch (Exception e) {
141
			System.out.print("ERROR " + e.toString());
142
			e.printStackTrace();
143
		}
144

    
145
	}
146

    
147
	public void initSqoopJob() throws Exception {
148
		loadTableMappings();
149
		ArrayList<String> fileNames;
150
		if (useHdfsStore) {
151
			fileNames = listHdfsDir();
152
		} else {
153
			fileNames = listFilesystemDir();
154
		}
155

    
156
		HashMap<String, String> tables = new HashMap<String, String>();
157

    
158
		// Table mappings containt the mapping between HDFS files and the Stats
159
		// DB table that each should be imported to
160

    
161
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
162
			String name = (String) e.getKey();
163
			log.info("Found tableMappings   " + name);
164

    
165
			for (String filename : fileNames) {
166
				log.info("  filename    " + filename);
167

    
168
				String str = filename.substring(filename.lastIndexOf('/') + 1);
169

    
170
				String split[] = str.split("-");
171

    
172
				split[0] = split[0].replaceAll(".*/", "");
173
				log.info("  filename processed  " + split[0]);
174
				log.info(split[0]);
175
				if (split[0].equals(name)) {
176
					log.info("    match   " + e.getValue() + " " + filename);
177
					tables.put((String) e.getValue(), filename);
178

    
179
				}
180
			}
181
		}
182

    
183
		long startTime = System.currentTimeMillis();
184

    
185
		try {
186
			log.info("Running for" + tables);
187
			this.run(tables);
188

    
189
		} catch (Exception e) {
190
			log.error("Error while importing tables in Sqoop: ", e);
191
			throw new Exception("Error while importing tables in Sqoop", e);
192
		}
193
		long endtime = System.currentTimeMillis();
194

    
195
		log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
196

    
197
	}
198

    
199
	public ArrayList<String> listHdfsDir() throws Exception {
200
		if (conf == null) {
201
			conf = new Configuration();
202
		}
203
		hdfs = FileSystem.get(conf);
204

    
205
		RemoteIterator<LocatedFileStatus> Files;
206

    
207
		try {
208
			Path exportPath = new Path(hdfs.getUri() + outputPath);
209
			Files = hdfs.listFiles(exportPath, false);
210
			if (Files == null) {
211
				log.error("No files generated in " + outputPath + " at " + exportPath.getName());
212
				throw new Exception("No files generated in " + outputPath + " at " + exportPath.getName() + " in " + hdfs.resolvePath(exportPath));
213

    
214
			}
215
		} catch (Exception e) {
216
			log.error("HDFS file path with exported data does not exist : " + outputPath + " at " + new Path(hdfs.getUri() + outputPath));
217
			throw new Exception("HDFS file path with exported data does not exist :   " + outputPath + " at " + new Path(hdfs.getUri() + outputPath), e);
218

    
219
		}
220
		ArrayList<String> fileNames = new ArrayList<String>();
221

    
222
		while (Files.hasNext()) {
223

    
224
			String fileName = Files.next().getPath().toString();
225

    
226
			log.info("Found hdfs file " + fileName);
227
			fileNames.add(fileName);
228

    
229
		}
230
		return fileNames;
231
	}
232

    
233
	public ArrayList<String> listFilesystemDir() throws Exception {
234
		ArrayList<String> fileNames = new ArrayList<String>();
235

    
236
		String files;
237
		File folder = new File(outputPath);
238
		File[] listOfFiles = folder.listFiles();
239

    
240
		if (listOfFiles == null) {
241
			log.error("No files generated in " + outputPath);
242
			throw new Exception("No files generated in " + outputPath);
243

    
244
		}
245

    
246
		for (int i = 0; i < listOfFiles.length; i++) {
247

    
248
			if (listOfFiles[i].isFile()) {
249
				files = listOfFiles[i].getAbsolutePath();
250
				log.info("Found " + files);
251
				fileNames.add(files);
252

    
253
			}
254
		}
255

    
256
		return fileNames;
257
	}
258

    
259
	public void loadTableMappings() throws Exception
260

    
261
	{
262
		InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
263

    
264
		tableMappings.load(file);
265
		file.close();
266
		if (tableMappings == null || tableMappings.isEmpty()) {
267
			throw new Exception("Could not load Table Mconfappings in sqoop init job");
268
		}
269
	}
270

    
271
	/**
272
	 * Cleans up auto-generated Sqoop class files
273
	 * 
274
	 * @param table
275
	 */
276

    
277
	private void cleanUp(String table) {
278
		try {
279

    
280
			File file = new File(table + ".java");
281

    
282
			if (file.delete()) {
283
				log.info(file.getName() + " is deleted!");
284
			} else {
285
				log.info("Delete operation   failed.");
286
			}
287

    
288
		} catch (Exception e) {
289

    
290
			log.error("Delete operation   failed" + table);
291

    
292
		}
293
	}
294

    
295
	public String getConnectionUrl() {
296
		return connectionUrl;
297
	}
298

    
299
	public void setConnectionUrl(String connectionUrl) {
300
		this.connectionUrl = connectionUrl;
301
	}
302

    
303
	public String getDbUser() {
304
		return dbUser;
305
	}
306

    
307
	public void setDbUser(String dbUser) {
308
		this.dbUser = dbUser;
309
	}
310

    
311
	public String getDbPass() {
312
		return dbPass;
313
	}
314

    
315
	public void setDbPass(String dbPass) {
316
		this.dbPass = dbPass;
317
	}
318

    
319
	public String getDelim() {
320
		return delim;
321
	}
322

    
323
	public void setDelim(String delim) {
324
		this.delim = delim;
325
	}
326

    
327
	public String getReducersCount() {
328
		return sqoopReducersCount;
329
	}
330

    
331
	public void setReducersCount(String reducersCount) {
332
		this.sqoopReducersCount = reducersCount;
333
	}
334

    
335
	public String getRecsPerStatement() {
336
		return RecsPerStatement;
337
	}
338

    
339
	public void setRecsPerStatement(String recsPerStatement) {
340
		RecsPerStatement = recsPerStatement;
341
	}
342

    
343
	public String getStatementPerTrans() {
344
		return StatementPerTrans;
345
	}
346

    
347
	public void setStatementPerTrans(String statementPerTrans) {
348
		StatementPerTrans = statementPerTrans;
349
	}
350

    
351
	public Logger getLog() {
352
		return log;
353
	}
354

    
355
	public void setLog(Logger log) {
356
		this.log = log;
357
	}
358

    
359
	public String getTABLE_MAP_PATH() {
360
		return TABLE_MAP_PATH;
361
	}
362

    
363
	public void setTABLE_MAP_PATH(String tABLE_MAP_PATH) {
364
		TABLE_MAP_PATH = tABLE_MAP_PATH;
365
	}
366

    
367
	public Properties getTableMappings() {
368
		return tableMappings;
369
	}
370

    
371
	public void setTableMappings(Properties tableMappings) {
372
		this.tableMappings = tableMappings;
373
	}
374

    
375
	public String getOutputPath() {
376
		return outputPath;
377
	}
378

    
379
	public void setOutputPath(String outputPath) {
380
		this.outputPath = outputPath;
381
	}
382

    
383
	public String getSqoopReducersCount() {
384
		return sqoopReducersCount;
385
	}
386

    
387
	public void setSqoopReducersCount(String sqoopReducersCount) {
388
		this.sqoopReducersCount = sqoopReducersCount;
389
	}
390

    
391
	public FileSystem getHdfs() {
392
		return hdfs;
393
	}
394

    
395
	public void setHdfs(FileSystem hdfs) {
396
		this.hdfs = hdfs;
397
	}
398

    
399
	public boolean isUseHdfsStore() {
400
		return useHdfsStore;
401
	}
402

    
403
	public void setUseHdfsStore(boolean useHdfsStore) {
404
		this.useHdfsStore = useHdfsStore;
405
	}
406

    
407
	public Configuration getConf() {
408
		return conf;
409
	}
410

    
411
	public void setConf(Configuration conf) {
412
		this.conf = conf;
413
	}
414

    
415
	public boolean isBatch() {
416
		return batch;
417
	}
418

    
419
	public void setBatch(boolean batch) {
420
		this.batch = batch;
421
	}
422

    
423
	public boolean isVerbose() {
424
		return verbose;
425
	}
426

    
427
	public void setVerbose(boolean verbose) {
428
		this.verbose = verbose;
429
	}
430

    
431
}
(2-2/3)