Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.File;
4
import java.io.InputStream;
5
import java.util.ArrayList;
6
import java.util.Arrays;
7
import java.util.HashMap;
8
import java.util.Map.Entry;
9
import java.util.Properties;
10

    
11
import org.apache.hadoop.conf.Configuration;
12
import org.apache.hadoop.fs.FileSystem;
13
import org.apache.hadoop.fs.LocatedFileStatus;
14
import org.apache.hadoop.fs.Path;
15
import org.apache.hadoop.fs.RemoteIterator;
16
import org.apache.log4j.BasicConfigurator;
17
import org.apache.log4j.Logger;
18
import org.apache.sqoop.Sqoop;
19
import org.apache.sqoop.tool.SqoopTool;
20

    
21
import com.cloudera.sqoop.SqoopOptions;
22

    
23
public class SqoopDriver {
24
	private Logger log = Logger.getLogger(this.getClass());
25

    
26
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
27

    
28
	private Properties tableMappings = new Properties();
29

    
30
	private String outputPath;
31
	private String dbUser;
32
	private String dbPass;
33
	private String connectionUrl;
34
	// Number of statements in each batch insert op
35
	private String RecsPerStatement;
36
	// Number of statements for each commit ( commit every 1000 inserts, for
37
	// instance). Set to high value to reduce I/O costs.
38
	private String StatementPerTrans;
39
	// Field Seperator ( Delimeter ) used in import HDFS fields.
40
	private String delim;
41
	// Number of reducer classes that Sqoop can use.
42
	private String sqoopReducersCount;
43
	private FileSystem hdfs;
44
	private Configuration conf;
45
	private boolean useHdfsStore = true;
46

    
47
	/**
48
	 * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
49
	 * destination table> pair given in the @tables argument.
50
	 * 
51
	 * Needed parameters ( connection, database name, etc are set in class
52
	 * parameters when an new Sqoop Driver instance is created.
53
	 * 
54
	 * @param tables
55
	 */
56
	public void run(HashMap<String, String> tables) throws Exception {
57

    
58
		for (Entry<String, String> table : tables.entrySet()) {
59

    
60
			log.info("Importing " + table);
61

    
62
			String[] str = { "export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans,
63

    
64
			"--connect", connectionUrl, "--table", table.getKey(),
65

    
66
			"--export-dir", table.getValue(),
67
					// TODO : important!! this is necessary since MR multiple
68
					// outputs writer appends a \t char after writing the KEY
69
					// field in HDFS.
70
					// use this so sqoop can ignore it
71
					"--optionally-enclosed-by", "	", "--input-fields-terminated-by", delim,
72

    
73
					"--verbose", "--username", dbUser, "--password", dbPass, "--driver", "org.postgresql.Driver", "--batch", "--mapreduce-job-name", "Sqoop Stats Import", "-m", sqoopReducersCount };
74
			log.info("Sqoop Tool spec " + Sqoop.SQOOP_OPTIONS_FILE_SPECIFIER);
75
			// int ret = Sqoop.runTool(str);
76
			// if (ret != 0) {
77
			// log.error("Could not run Sqoop Tool " + Integer.toString(ret));
78
			// throw new RuntimeException("Could not run Sqoop Tool " +
79
			// Integer.toString(ret));
80
			// }
81
			SqoopTool tool = SqoopTool.getTool("export");
82
			if (null == tool) {
83
				log.error("No such sqoop tool: " + "dnetSqoop" + ". See 'sqoop help'.");
84

    
85
			}
86
			SqoopOptions opts = new SqoopOptions();
87
			opts.setConnectString(connectionUrl);
88
			opts.setExportDir(table.getValue());
89
			opts.setUsername(dbUser);
90
			opts.setPassword(dbPass);
91
			opts.setInputFieldsTerminatedBy(delim.charAt(1));
92

    
93
			opts.setBatchMode(true);
94
			opts.setTableName(table.getKey());
95
			String[] args = new String[6];
96
			args[0] = "--optionally-enclosed-by";
97
			args[1] = "	";
98
			args[2] = "-Dsqoop.export.records.per.statement ";
99
			args[3] = RecsPerStatement;
100
			args[4] = "-Dsqoop.export.statements.per.transaction  ";
101
			args[5] = StatementPerTrans;
102
			opts.setExtraArgs(args);
103
			opts.setVerbose(true);
104
			opts.setNumMappers(Integer.parseInt(sqoopReducersCount));
105
			tool.run(opts);
106

    
107
			// sqoop.run(str);
108

    
109
			cleanUp(table.getKey());
110
		}
111
	}
112

    
113
	public void initSqoopJob() throws Exception {
114
		loadTableMappings();
115
		ArrayList<String> fileNames;
116
		if (useHdfsStore) {
117
			fileNames = listHdfsDir();
118
		} else {
119
			fileNames = listFilesystemDir();
120
		}
121

    
122
		HashMap<String, String> tables = new HashMap<String, String>();
123

    
124
		// Table mappings containt the mapping between HDFS files and the Stats
125
		// DB table that each should be imported to
126

    
127
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
128
			String name = (String) e.getKey();
129
			log.info("Found tableMappings   " + name);
130

    
131
			for (String filename : fileNames) {
132
				log.info("  filename    " + filename);
133

    
134
				String str = filename.substring(filename.lastIndexOf('/') + 1);
135

    
136
				String split[] = str.split("-");
137

    
138
				split[0] = split[0].replaceAll(".*/", "");
139
				log.info("  filename processed  " + split[0]);
140
				log.info(split[0]);
141
				if (split[0].equals(name)) {
142
					log.info("    match   " + e.getValue() + " " + filename);
143
					tables.put((String) e.getValue(), filename);
144

    
145
				}
146
			}
147
		}
148

    
149
		long startTime = System.currentTimeMillis();
150

    
151
		try {
152
			log.info("Running for" + tables);
153
			this.run(tables);
154

    
155
		} catch (Exception e) {
156
			log.error("Error while importing tables in Sqoop: ", e);
157
			throw new Exception("Error while importing tables in Sqoop", e);
158
		}
159
		long endtime = System.currentTimeMillis();
160

    
161
		log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
162

    
163
	}
164

    
165
	public ArrayList<String> listHdfsDir() throws Exception {
166
		if (conf == null) {
167
			conf = new Configuration();
168
		}
169
		hdfs = FileSystem.get(conf);
170

    
171
		RemoteIterator<LocatedFileStatus> Files;
172

    
173
		try {
174
			Path exportPath = new Path(hdfs.getUri() + outputPath);
175
			Files = hdfs.listFiles(exportPath, false);
176
			if (Files == null) {
177
				log.error("No files generated in " + outputPath + " at " + exportPath.getName());
178
				throw new Exception("No files generated in " + outputPath + " at " + exportPath.getName() + " in " + hdfs.resolvePath(exportPath));
179

    
180
			}
181
		} catch (Exception e) {
182
			log.error("HDFS file path with exported data does not exist : " + outputPath + " at " + new Path(hdfs.getUri() + outputPath));
183
			throw new Exception("HDFS file path with exported data does not exist :   " + outputPath + " at " + new Path(hdfs.getUri() + outputPath), e);
184

    
185
		}
186
		ArrayList<String> fileNames = new ArrayList<String>();
187

    
188
		while (Files.hasNext()) {
189

    
190
			String fileName = Files.next().getPath().toString();
191

    
192
			log.info("Found hdfs file " + fileName);
193
			fileNames.add(fileName);
194

    
195
		}
196
		return fileNames;
197
	}
198

    
199
	public ArrayList<String> listFilesystemDir() throws Exception {
200
		ArrayList<String> fileNames = new ArrayList<String>();
201

    
202
		String files;
203
		File folder = new File(outputPath);
204
		File[] listOfFiles = folder.listFiles();
205

    
206
		if (listOfFiles == null) {
207
			log.error("No files generated in " + outputPath);
208
			throw new Exception("No files generated in " + outputPath);
209

    
210
		}
211

    
212
		for (int i = 0; i < listOfFiles.length; i++) {
213

    
214
			if (listOfFiles[i].isFile()) {
215
				files = listOfFiles[i].getAbsolutePath();
216
				log.info("Found " + files);
217
				fileNames.add(files);
218

    
219
			}
220
		}
221

    
222
		return fileNames;
223
	}
224

    
225
	public void loadTableMappings() throws Exception
226

    
227
	{
228
		InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
229

    
230
		tableMappings.load(file);
231
		file.close();
232
		if (tableMappings == null || tableMappings.isEmpty()) {
233
			throw new Exception("Could not load Table Mconfappings in sqoop init job");
234
		}
235
	}
236

    
237
	/**
238
	 * Cleans up auto-generated Sqoop class files
239
	 * 
240
	 * @param table
241
	 */
242

    
243
	private void cleanUp(String table) {
244
		try {
245

    
246
			File file = new File(table + ".java");
247

    
248
			if (file.delete()) {
249
				log.info(file.getName() + " is deleted!");
250
			} else {
251
				log.info("Delete operation   failed.");
252
			}
253

    
254
		} catch (Exception e) {
255

    
256
			log.error("Delete operation   failed" + table);
257

    
258
		}
259
	}
260

    
261
	public static void main(String[] args) {
262
		SqoopDriver sqoopDriver = new SqoopDriver();
263

    
264
		BasicConfigurator.configure();
265

    
266
		sqoopDriver.setDelim("!");
267
		sqoopDriver.setOutputPath("/tmp/test_stats/");
268

    
269
		sqoopDriver.setConnectionUrl("jdbc:postgresql://duffy.di.uoa.gr:5432/test_stats");
270
		sqoopDriver.setDbUser("sqoop");
271
		sqoopDriver.setDbPass("sqoop");
272
		sqoopDriver.setReducersCount("1");
273
		sqoopDriver.setRecsPerStatement("1000");
274
		sqoopDriver.setStatementPerTrans("1000");
275

    
276
		sqoopDriver.setUseHdfsStore(true);
277

    
278
		// HashMap<String, String> tables = new HashMap<String, String>();
279

    
280
		// tables.put("datasource", "/tmp/test_stats/datasource-r-00000");
281

    
282
		// throw new Exception(datasource + "  " + e +
283
		// sqoopDriver.getConnectionUrl() + sqoopDriver.getDbPass() +
284
		// sqoopDriver.getDbUser() );
285

    
286
		try {
287
			sqoopDriver.initSqoopJob();
288
			// sqoopDriver.run(tables);
289
		} catch (Exception e) {
290
			System.out.print("ERROR " + e.toString());
291
			e.printStackTrace();
292
		}
293

    
294
	}
295

    
296
	public String getConnectionUrl() {
297
		return connectionUrl;
298
	}
299

    
300
	public void setConnectionUrl(String connectionUrl) {
301
		this.connectionUrl = connectionUrl;
302
	}
303

    
304
	public String getDbUser() {
305
		return dbUser;
306
	}
307

    
308
	public void setDbUser(String dbUser) {
309
		this.dbUser = dbUser;
310
	}
311

    
312
	public String getDbPass() {
313
		return dbPass;
314
	}
315

    
316
	public void setDbPass(String dbPass) {
317
		this.dbPass = dbPass;
318
	}
319

    
320
	public String getDelim() {
321
		return delim;
322
	}
323

    
324
	public void setDelim(String delim) {
325
		this.delim = delim;
326
	}
327

    
328
	public String getReducersCount() {
329
		return sqoopReducersCount;
330
	}
331

    
332
	public void setReducersCount(String reducersCount) {
333
		this.sqoopReducersCount = reducersCount;
334
	}
335

    
336
	public String getRecsPerStatement() {
337
		return RecsPerStatement;
338
	}
339

    
340
	public void setRecsPerStatement(String recsPerStatement) {
341
		RecsPerStatement = recsPerStatement;
342
	}
343

    
344
	public String getStatementPerTrans() {
345
		return StatementPerTrans;
346
	}
347

    
348
	public void setStatementPerTrans(String statementPerTrans) {
349
		StatementPerTrans = statementPerTrans;
350
	}
351

    
352
	public Logger getLog() {
353
		return log;
354
	}
355

    
356
	public void setLog(Logger log) {
357
		this.log = log;
358
	}
359

    
360
	public String getTABLE_MAP_PATH() {
361
		return TABLE_MAP_PATH;
362
	}
363

    
364
	public void setTABLE_MAP_PATH(String tABLE_MAP_PATH) {
365
		TABLE_MAP_PATH = tABLE_MAP_PATH;
366
	}
367

    
368
	public Properties getTableMappings() {
369
		return tableMappings;
370
	}
371

    
372
	public void setTableMappings(Properties tableMappings) {
373
		this.tableMappings = tableMappings;
374
	}
375

    
376
	public String getOutputPath() {
377
		return outputPath;
378
	}
379

    
380
	public void setOutputPath(String outputPath) {
381
		this.outputPath = outputPath;
382
	}
383

    
384
	public String getSqoopReducersCount() {
385
		return sqoopReducersCount;
386
	}
387

    
388
	public void setSqoopReducersCount(String sqoopReducersCount) {
389
		this.sqoopReducersCount = sqoopReducersCount;
390
	}
391

    
392
	public FileSystem getHdfs() {
393
		return hdfs;
394
	}
395

    
396
	public void setHdfs(FileSystem hdfs) {
397
		this.hdfs = hdfs;
398
	}
399

    
400
	public boolean isUseHdfsStore() {
401
		return useHdfsStore;
402
	}
403

    
404
	public void setUseHdfsStore(boolean useHdfsStore) {
405
		this.useHdfsStore = useHdfsStore;
406
	}
407

    
408
	public Configuration getConf() {
409
		return conf;
410
	}
411

    
412
	public void setConf(Configuration conf) {
413
		this.conf = conf;
414
	}
415

    
416
}
(2-2/3)