Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.ByteArrayInputStream;
4
import java.io.File;
5
import java.io.InputStream;
6
import java.util.ArrayList;
7
import java.util.HashMap;
8
import java.util.List;
9
import java.util.Map;
10
import java.util.Map.Entry;
11
import java.util.Properties;
12

    
13
import org.apache.hadoop.conf.Configuration;
14
import org.apache.hadoop.fs.FileSystem;
15
import org.apache.hadoop.fs.LocatedFileStatus;
16
import org.apache.hadoop.fs.Path;
17
import org.apache.hadoop.fs.RemoteIterator;
18
import org.apache.log4j.Logger;
19
import org.apache.sqoop.Sqoop;
20
import org.junit.experimental.runners.Enclosed;
21

    
22
import com.google.common.collect.ArrayListMultimap;
23
import com.google.common.collect.Multimap;
24

    
25
public class SqoopDriver {
26
	private Logger log = Logger.getLogger(this.getClass());
27

    
28
	private Properties tableMappings = new Properties();
29

    
30
	private String outputPath;
31
	private String dbUser;
32
	private String dbPass;
33
	private String connectionUrl;
34
	// Number of statements in each batch insert op
35
	private String RecsPerStatement;
36
	// Number of statements for each commit ( commit every 1000 inserts, for
37
	// instance). Set to high value to reduce I/O costs.
38
	private String StatementPerTrans;
39
	// Field Seperator ( Delimeter ) used in import HDFS fields.
40
	private String delim;
41
	// Number of reducer classes that Sqoop can use.
42
	private String sqoopReducersCount;
43
	private FileSystem hdfs;
44
	private Configuration conf;
45
	private boolean useHdfsStore = true;
46
	private boolean batch = true;
47
	private boolean verbose = true;
48
	private String tableMapConf;
49
	private String enclosed;
50

    
51
	/**
52
	 * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
53
	 * destination table> pair given in the @tables argument.
54
	 *
55
	 * Needed parameters ( connection, database name, etc are set in class
56
	 * parameters when an new Sqoop Driver instance is created.
57
	 *
58
	 * @param tables
59
	 */
60
	public void run(Multimap<String, String> tables) throws Exception {
61

    
62
		for (Entry<String, String> table : tables.entries()) {
63

    
64
			log.info("Importing " + table);
65

    
66
			String[] str = { "export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans, "-Dmapreduce.job.reduces = " + sqoopReducersCount,
67

    
68
					"--connect", connectionUrl, "--table", table.getKey(),
69

    
70
					"--export-dir", table.getValue(),
71

    
72
					"--input-fields-terminated-by", delim,
73
//			"--input-enclosed-by", enclosed,
74
					"--optionally-enclosed-by", enclosed,
75

    
76
					"--verbose", "--username", dbUser, "--password", dbPass, "--driver", "org.postgresql.Driver", "--batch", "--mapreduce-job-name", "Sqoop Stats Import Job for " + table.getKey(), "--m", sqoopReducersCount };
77

    
78
			int ret = Sqoop.runTool(str);
79

    
80
			if (ret != 0) {
81
				log.error("Could not run Sqoop Tool " + Integer.toString(ret));
82
				throw new RuntimeException("Could not run Sqoop Tool " + Integer.toString(ret));
83
			}
84

    
85
			cleanUp(table.getKey());
86
		}
87
	}
88

    
89
	public void initSqoopJob() throws Exception {
90
		loadTableMappings();
91
		ArrayList<String> fileNames;
92
		if (useHdfsStore) {
93
			fileNames = listHdfsDir();
94
		} else {
95
			fileNames = listFilesystemDir();
96
		}
97
		// TODO keep this a Multimap collection since we can have multiple files
98
		// for
99
		// each table
100
		// when using multiple reducers ( each reducer creates an output file )
101
		Multimap<String, String> tables = ArrayListMultimap.create();
102

    
103
		// Table mappings containt the mapping between HDFS files and the Stats
104
		// DB table that each should be imported to
105

    
106
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
107
			String name = (String) e.getKey();
108
			log.info("Found tableMappings   " + name);
109

    
110
			for (String filename : fileNames) {
111

    
112
				String str = filename.substring(filename.lastIndexOf('/') + 1);
113

    
114
				String split[] = str.split("-");
115

    
116
				split[0] = split[0].replaceAll(".*/", "");
117

    
118
				if (split[0].equals(name)) {
119

    
120
					tables.put((String) e.getValue(), filename);
121
					log.info("    match   " + e.getValue() + "  " + filename);
122
				}
123
			}
124
		}
125

    
126
		long startTime = System.currentTimeMillis();
127
		log.info("    match   " + tables.entries());
128

    
129
		try {
130
			this.run(tables);
131

    
132
		} catch (Exception e) {
133
			log.error("Error while importing tables in Sqoop: ", e);
134
			throw new Exception("Error while importing tables in Sqoop", e);
135
		}
136
		long endtime = System.currentTimeMillis();
137

    
138
		log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
139

    
140
	}
141

    
142
	public ArrayList<String> listHdfsDir() throws Exception {
143
		if (conf == null) {
144
			conf = new Configuration();
145
		}
146
		hdfs = FileSystem.get(conf);
147

    
148
		RemoteIterator<LocatedFileStatus> Files;
149

    
150
		try {
151
			Path exportPath = new Path(hdfs.getUri() + outputPath);
152
			Files = hdfs.listFiles(exportPath, false);
153
			if (Files == null) {
154
				log.error("No files generated in " + outputPath + " at " + exportPath.getName());
155
				throw new Exception("No files generated in " + outputPath + " at " + exportPath.getName() + " in " + hdfs.resolvePath(exportPath));
156

    
157
			}
158
		} catch (Exception e) {
159
			log.error("HDFS file path with exported data does not exist : " + outputPath + " at " + new Path(hdfs.getUri() + outputPath));
160
			throw new Exception("HDFS file path with exported data does not exist :   " + outputPath + " at " + new Path(hdfs.getUri() + outputPath), e);
161

    
162
		}
163
		ArrayList<String> fileNames = new ArrayList<String>();
164

    
165
		while (Files.hasNext()) {
166

    
167
			String fileName = Files.next().getPath().toString();
168

    
169
			log.info("Found hdfs file " + fileName);
170
			fileNames.add(fileName);
171

    
172
		}
173
		return fileNames;
174
	}
175

    
176
	public ArrayList<String> listFilesystemDir() throws Exception {
177
		ArrayList<String> fileNames = new ArrayList<String>();
178

    
179
		String files;
180
		File folder = new File(outputPath);
181
		File[] listOfFiles = folder.listFiles();
182

    
183
		if (listOfFiles == null) {
184
			log.error("No files generated in " + outputPath);
185
			throw new Exception("No files generated in " + outputPath);
186

    
187
		}
188

    
189
		for (int i = 0; i < listOfFiles.length; i++) {
190

    
191
			if (listOfFiles[i].isFile()) {
192
				files = listOfFiles[i].getAbsolutePath();
193
				log.info("Found " + files);
194
				fileNames.add(files);
195

    
196
			}
197
		}
198

    
199
		return fileNames;
200
	}
201

    
202
	public void loadTableMappings() throws Exception
203

    
204
	{
205
		tableMappings = new Properties();
206
		if (tableMapConf == null) {
207
			log.error("NULL TABLE MAP CONFIG  IN MAPPER  : ");
208
		}
209

    
210
		tableMapConf = tableMapConf.replaceAll(",", "\n");
211

    
212
		InputStream stream = new ByteArrayInputStream(tableMapConf.getBytes());
213

    
214
		tableMappings.load(stream);
215
		stream.close();
216
		log.info("tm" + tableMappings.entrySet());
217
		if (tableMappings == null || tableMappings.isEmpty()) {
218
			throw new Exception("Could not load Table Mconfappings in sqoop init job");
219
		}
220

    
221
	}
222

    
223
	/**
224
	 * Cleans up auto-generated Sqoop class files
225
	 *
226
	 * @param table
227
	 */
228

    
229
	private void cleanUp(String table) {
230
		try {
231

    
232
			File file = new File(table + ".java");
233

    
234
			if (file.delete()) {
235
				log.info(file.getName() + " is deleted!");
236
			} else {
237
				log.info("Delete operation   failed.");
238
			}
239

    
240
		} catch (Exception e) {
241

    
242
			log.error("Delete operation   failed" + table);
243

    
244
		}
245
	}
246

    
247
	public Logger getLog() {
248
		return log;
249
	}
250

    
251
	public void setLog(Logger log) {
252
		this.log = log;
253
	}
254

    
255
	public Properties getTableMappings() {
256
		return tableMappings;
257
	}
258

    
259
	public void setTableMappings(Properties tableMappings) {
260
		this.tableMappings = tableMappings;
261
	}
262

    
263
	public String getOutputPath() {
264
		return outputPath;
265
	}
266

    
267
	public void setOutputPath(String outputPath) {
268
		this.outputPath = outputPath;
269
	}
270

    
271
	public String getDbUser() {
272
		return dbUser;
273
	}
274

    
275
	public void setDbUser(String dbUser) {
276
		this.dbUser = dbUser;
277
	}
278

    
279
	public String getDbPass() {
280
		return dbPass;
281
	}
282

    
283
	public void setDbPass(String dbPass) {
284
		this.dbPass = dbPass;
285
	}
286

    
287
	public String getConnectionUrl() {
288
		return connectionUrl;
289
	}
290

    
291
	public void setConnectionUrl(String connectionUrl) {
292
		this.connectionUrl = connectionUrl;
293
	}
294

    
295
	public String getRecsPerStatement() {
296
		return RecsPerStatement;
297
	}
298

    
299
	public void setRecsPerStatement(String recsPerStatement) {
300
		RecsPerStatement = recsPerStatement;
301
	}
302

    
303
	public String getStatementPerTrans() {
304
		return StatementPerTrans;
305
	}
306

    
307
	public void setStatementPerTrans(String statementPerTrans) {
308
		StatementPerTrans = statementPerTrans;
309
	}
310

    
311
	public String getDelim() {
312
		return delim;
313
	}
314

    
315
	public void setDelim(String delim) {
316
		this.delim = delim;
317
	}
318

    
319
	public String getSqoopReducersCount() {
320
		return sqoopReducersCount;
321
	}
322

    
323
	public void setSqoopReducersCount(String sqoopReducersCount) {
324
		this.sqoopReducersCount = sqoopReducersCount;
325
	}
326

    
327
	public FileSystem getHdfs() {
328
		return hdfs;
329
	}
330

    
331
	public void setHdfs(FileSystem hdfs) {
332
		this.hdfs = hdfs;
333
	}
334

    
335
	public Configuration getConf() {
336
		return conf;
337
	}
338

    
339
	public void setConf(Configuration conf) {
340
		this.conf = conf;
341
	}
342

    
343
	public boolean isUseHdfsStore() {
344
		return useHdfsStore;
345
	}
346

    
347
	public void setUseHdfsStore(boolean useHdfsStore) {
348
		this.useHdfsStore = useHdfsStore;
349
	}
350

    
351
	public boolean isBatch() {
352
		return batch;
353
	}
354

    
355
	public void setBatch(boolean batch) {
356
		this.batch = batch;
357
	}
358

    
359
	public boolean isVerbose() {
360
		return verbose;
361
	}
362

    
363
	public void setVerbose(boolean verbose) {
364
		this.verbose = verbose;
365
	}
366

    
367
	public String getTableMapConf() {
368
		return tableMapConf;
369
	}
370

    
371
	public void setTableMapConf(String tableMapConf) {
372
		this.tableMapConf = tableMapConf;
373
	}
374

    
375
	public String getEnclosed() {
376
		return enclosed;
377
	}
378

    
379
	public void setEnclosed(String enclosed) {
380
		this.enclosed = enclosed;
381
	}
382

    
383
}
(2-2/3)