Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.File;
4
import java.io.InputStream;
5
import java.util.ArrayList;
6
import java.util.Arrays;
7
import java.util.HashMap;
8
import java.util.Map.Entry;
9
import java.util.Properties;
10

    
11
import org.apache.hadoop.conf.Configuration;
12
import org.apache.hadoop.fs.FileSystem;
13
import org.apache.hadoop.fs.LocatedFileStatus;
14
import org.apache.hadoop.fs.Path;
15
import org.apache.hadoop.fs.RemoteIterator;
16
import org.apache.log4j.BasicConfigurator;
17
import org.apache.log4j.Logger;
18
import org.apache.sqoop.Sqoop;
19
import org.apache.sqoop.tool.SqoopTool;
20

    
21
public class SqoopDriver {
22
	private Logger log = Logger.getLogger(this.getClass());
23

    
24
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
25

    
26
	private Properties tableMappings = new Properties();
27

    
28
	private String outputPath;
29
	private String dbUser;
30
	private String dbPass;
31
	private String connectionUrl;
32
	// Number of statements in each batch insert op
33
	private String RecsPerStatement;
34
	// Number of statements for each commit ( commit every 1000 inserts, for
35
	// instance). Set to high value to reduce I/O costs.
36
	private String StatementPerTrans;
37
	// Field Seperator ( Delimeter ) used in import HDFS fields.
38
	private String delim;
39
	// Number of reducer classes that Sqoop can use.
40
	private String sqoopReducersCount;
41
	private FileSystem hdfs;
42
	private Configuration conf;
43
	private boolean useHdfsStore = true;
44

    
45
	/**
46
	 * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
47
	 * destination table> pair given in the @tables argument.
48
	 * 
49
	 * Needed parameters ( connection, database name, etc are set in class
50
	 * parameters when an new Sqoop Driver instance is created.
51
	 * 
52
	 * @param tables
53
	 */
54
	public void run(HashMap<String, String> tables) throws Exception {
55

    
56
		for (Entry<String, String> table : tables.entrySet()) {
57

    
58
			log.info("Importing " + table);
59

    
60
			String[] str = { "export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans,
61

    
62
			"--connect", connectionUrl, "--table", table.getKey(),
63

    
64
			"--export-dir", table.getValue(),
65
					// TODO : important!! this is necessary since MR multiple
66
					// outputs writer appends a \t char after writing the KEY
67
					// field in HDFS.
68
					// use this so sqoop can ignore it
69
					"--optionally-enclosed-by", "	", "--input-fields-terminated-by", delim,
70

    
71
					"--verbose", "--username", dbUser, "--password", dbPass, "--driver", "org.postgresql.Driver", "--batch", "--mapreduce-job-name", "Sqoop Stats Import", "-m", sqoopReducersCount };
72
			log.info("Sqoop Tool spec " + Sqoop.SQOOP_OPTIONS_FILE_SPECIFIER);
73
//			int ret = Sqoop.runTool(str);
74
//			if (ret != 0) {
75
//				log.error("Could not run Sqoop Tool " + Integer.toString(ret));
76
//				throw new RuntimeException("Could not run Sqoop Tool " + Integer.toString(ret));
77
//			}
78
			SqoopTool tool = SqoopTool.getTool("dnetSqoop");
79
			if (null == tool) {
80
				log.error("No such sqoop tool: " + "dnetSqoop" + ". See 'sqoop help'.");
81

    
82
			}
83
			Configuration pluginConf = SqoopTool.loadPlugins(new Configuration());
84

    
85
			Sqoop sqoop = new Sqoop((com.cloudera.sqoop.tool.SqoopTool) tool);
86
			sqoop.run(str);
87
		 
88
			cleanUp(table.getKey());
89
		}
90
	}
91

    
92
	public void initSqoopJob() throws Exception {
93
		loadTableMappings();
94
		ArrayList<String> fileNames;
95
		if (useHdfsStore) {
96
			fileNames = listHdfsDir();
97
		} else {
98
			fileNames = listFilesystemDir();
99
		}
100

    
101
		HashMap<String, String> tables = new HashMap<String, String>();
102

    
103
		// Table mappings containt the mapping between HDFS files and the Stats
104
		// DB table that each should be imported to
105

    
106
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
107
			String name = (String) e.getKey();
108
			log.info("Found tableMappings   " + name);
109

    
110
			for (String filename : fileNames) {
111
				log.info("  filename    " + filename);
112

    
113
				String str = filename.substring(filename.lastIndexOf('/') + 1);
114

    
115
				String split[] = str.split("-");
116

    
117
				split[0] = split[0].replaceAll(".*/", "");
118
				log.info("  filename processed  " + split[0]);
119
				log.info(split[0]);
120
				if (split[0].equals(name)) {
121
					log.info("    match   " + e.getValue() + " " + filename);
122
					tables.put((String) e.getValue(), filename);
123

    
124
				}
125
			}
126
		}
127

    
128
		long startTime = System.currentTimeMillis();
129

    
130
		try {
131
			log.info("Running for" + tables);
132
			this.run(tables);
133

    
134
		} catch (Exception e) {
135
			log.error("Error while importing tables in Sqoop: ", e);
136
			throw new Exception("Error while importing tables in Sqoop", e);
137
		}
138
		long endtime = System.currentTimeMillis();
139

    
140
		log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
141

    
142
	}
143

    
144
	public ArrayList<String> listHdfsDir() throws Exception {
145
		if (conf == null) {
146
			conf = new Configuration();
147
		}
148
		hdfs = FileSystem.get(conf);
149

    
150
		RemoteIterator<LocatedFileStatus> Files;
151

    
152
		try {
153
			Path exportPath = new Path(hdfs.getUri() + outputPath);
154
			Files = hdfs.listFiles(exportPath, false);
155
			if (Files == null) {
156
				log.error("No files generated in " + outputPath + " at " + exportPath.getName());
157
				throw new Exception("No files generated in " + outputPath + " at " + exportPath.getName() + " in " + hdfs.resolvePath(exportPath));
158

    
159
			}
160
		} catch (Exception e) {
161
			log.error("HDFS file path with exported data does not exist : " + outputPath + " at " + new Path(hdfs.getUri() + outputPath));
162
			throw new Exception("HDFS file path with exported data does not exist :   " + outputPath + " at " + new Path(hdfs.getUri() + outputPath), e);
163

    
164
		}
165
		ArrayList<String> fileNames = new ArrayList<String>();
166

    
167
		while (Files.hasNext()) {
168

    
169
			String fileName = Files.next().getPath().toString();
170

    
171
			log.info("Found hdfs file " + fileName);
172
			fileNames.add(fileName);
173

    
174
		}
175
		return fileNames;
176
	}
177

    
178
	public ArrayList<String> listFilesystemDir() throws Exception {
179
		ArrayList<String> fileNames = new ArrayList<String>();
180

    
181
		String files;
182
		File folder = new File(outputPath);
183
		File[] listOfFiles = folder.listFiles();
184

    
185
		if (listOfFiles == null) {
186
			log.error("No files generated in " + outputPath);
187
			throw new Exception("No files generated in " + outputPath);
188

    
189
		}
190

    
191
		for (int i = 0; i < listOfFiles.length; i++) {
192

    
193
			if (listOfFiles[i].isFile()) {
194
				files = listOfFiles[i].getAbsolutePath();
195
				log.info("Found " + files);
196
				fileNames.add(files);
197

    
198
			}
199
		}
200

    
201
		return fileNames;
202
	}
203

    
204
	public void loadTableMappings() throws Exception
205

    
206
	{
207
		InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
208

    
209
		tableMappings.load(file);
210
		file.close();
211
		if (tableMappings == null || tableMappings.isEmpty()) {
212
			throw new Exception("Could not load Table Mconfappings in sqoop init job");
213
		}
214
	}
215

    
216
	/**
217
	 * Cleans up auto-generated Sqoop class files
218
	 * 
219
	 * @param table
220
	 */
221

    
222
	private void cleanUp(String table) {
223
		try {
224

    
225
			File file = new File(table + ".java");
226

    
227
			if (file.delete()) {
228
				log.info(file.getName() + " is deleted!");
229
			} else {
230
				log.info("Delete operation   failed.");
231
			}
232

    
233
		} catch (Exception e) {
234

    
235
			log.error("Delete operation   failed" + table);
236

    
237
		}
238
	}
239

    
240
	public static void main(String[] args) {
241
		SqoopDriver sqoopDriver = new SqoopDriver();
242

    
243
		BasicConfigurator.configure();
244

    
245
		sqoopDriver.setDelim("!");
246
		sqoopDriver.setOutputPath("/tmp/test_stats/");
247

    
248
		sqoopDriver.setConnectionUrl("jdbc:postgresql://duffy.di.uoa.gr:5432/test_stats");
249
		sqoopDriver.setDbUser("sqoop");
250
		sqoopDriver.setDbPass("sqoop");
251
		sqoopDriver.setReducersCount("1");
252
		sqoopDriver.setRecsPerStatement("1000");
253
		sqoopDriver.setStatementPerTrans("1000");
254

    
255
		sqoopDriver.setUseHdfsStore(true);
256

    
257
		// HashMap<String, String> tables = new HashMap<String, String>();
258

    
259
		// tables.put("datasource", "/tmp/test_stats/datasource-r-00000");
260

    
261
		// throw new Exception(datasource + "  " + e +
262
		// sqoopDriver.getConnectionUrl() + sqoopDriver.getDbPass() +
263
		// sqoopDriver.getDbUser() );
264

    
265
		try {
266
			sqoopDriver.initSqoopJob();
267
			// sqoopDriver.run(tables);
268
		} catch (Exception e) {
269
			System.out.print("ERROR " + e.toString());
270
			e.printStackTrace();
271
		}
272

    
273
	}
274

    
275
	public String getConnectionUrl() {
276
		return connectionUrl;
277
	}
278

    
279
	public void setConnectionUrl(String connectionUrl) {
280
		this.connectionUrl = connectionUrl;
281
	}
282

    
283
	public String getDbUser() {
284
		return dbUser;
285
	}
286

    
287
	public void setDbUser(String dbUser) {
288
		this.dbUser = dbUser;
289
	}
290

    
291
	public String getDbPass() {
292
		return dbPass;
293
	}
294

    
295
	public void setDbPass(String dbPass) {
296
		this.dbPass = dbPass;
297
	}
298

    
299
	public String getDelim() {
300
		return delim;
301
	}
302

    
303
	public void setDelim(String delim) {
304
		this.delim = delim;
305
	}
306

    
307
	public String getReducersCount() {
308
		return sqoopReducersCount;
309
	}
310

    
311
	public void setReducersCount(String reducersCount) {
312
		this.sqoopReducersCount = reducersCount;
313
	}
314

    
315
	public String getRecsPerStatement() {
316
		return RecsPerStatement;
317
	}
318

    
319
	public void setRecsPerStatement(String recsPerStatement) {
320
		RecsPerStatement = recsPerStatement;
321
	}
322

    
323
	public String getStatementPerTrans() {
324
		return StatementPerTrans;
325
	}
326

    
327
	public void setStatementPerTrans(String statementPerTrans) {
328
		StatementPerTrans = statementPerTrans;
329
	}
330

    
331
	public Logger getLog() {
332
		return log;
333
	}
334

    
335
	public void setLog(Logger log) {
336
		this.log = log;
337
	}
338

    
339
	public String getTABLE_MAP_PATH() {
340
		return TABLE_MAP_PATH;
341
	}
342

    
343
	public void setTABLE_MAP_PATH(String tABLE_MAP_PATH) {
344
		TABLE_MAP_PATH = tABLE_MAP_PATH;
345
	}
346

    
347
	public Properties getTableMappings() {
348
		return tableMappings;
349
	}
350

    
351
	public void setTableMappings(Properties tableMappings) {
352
		this.tableMappings = tableMappings;
353
	}
354

    
355
	public String getOutputPath() {
356
		return outputPath;
357
	}
358

    
359
	public void setOutputPath(String outputPath) {
360
		this.outputPath = outputPath;
361
	}
362

    
363
	public String getSqoopReducersCount() {
364
		return sqoopReducersCount;
365
	}
366

    
367
	public void setSqoopReducersCount(String sqoopReducersCount) {
368
		this.sqoopReducersCount = sqoopReducersCount;
369
	}
370

    
371
	public FileSystem getHdfs() {
372
		return hdfs;
373
	}
374

    
375
	public void setHdfs(FileSystem hdfs) {
376
		this.hdfs = hdfs;
377
	}
378

    
379
	public boolean isUseHdfsStore() {
380
		return useHdfsStore;
381
	}
382

    
383
	public void setUseHdfsStore(boolean useHdfsStore) {
384
		this.useHdfsStore = useHdfsStore;
385
	}
386

    
387
	public Configuration getConf() {
388
		return conf;
389
	}
390

    
391
	public void setConf(Configuration conf) {
392
		this.conf = conf;
393
	}
394

    
395
}
(2-2/3)