Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.ByteArrayInputStream;
4
import java.io.File;
5
import java.io.InputStream;
6
import java.util.ArrayList;
7
import java.util.HashMap;
8
import java.util.List;
9
import java.util.Map;
10
import java.util.Map.Entry;
11
import java.util.Properties;
12

    
13
import org.apache.hadoop.conf.Configuration;
14
import org.apache.hadoop.fs.FileSystem;
15
import org.apache.hadoop.fs.LocatedFileStatus;
16
import org.apache.hadoop.fs.Path;
17
import org.apache.hadoop.fs.RemoteIterator;
18
import org.apache.log4j.Logger;
19
import org.apache.sqoop.Sqoop;
20

    
21
import com.google.common.collect.ArrayListMultimap;
22
import com.google.common.collect.Multimap;
23

    
24
public class SqoopDriver {
25
	private Logger log = Logger.getLogger(this.getClass());
26

    
27
	private Properties tableMappings = new Properties();
28

    
29
	private String outputPath;
30
	private String dbUser;
31
	private String dbPass;
32
	private String connectionUrl;
33
	// Number of statements in each batch insert op
34
	private String RecsPerStatement;
35
	// Number of statements for each commit ( commit every 1000 inserts, for
36
	// instance). Set to high value to reduce I/O costs.
37
	private String StatementPerTrans;
38
	// Field Seperator ( Delimeter ) used in import HDFS fields.
39
	private String delim;
40
	// Number of reducer classes that Sqoop can use.
41
	private String sqoopReducersCount;
42
	private FileSystem hdfs;
43
	private Configuration conf;
44
	private boolean useHdfsStore = true;
45
	private boolean batch = true;
46
	private boolean verbose = true;
47
	private String tableMapConf;
48
	 
49

    
50
	/**
51
	 * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
52
	 * destination table> pair given in the @tables argument.
53
	 * 
54
	 * Needed parameters ( connection, database name, etc are set in class
55
	 * parameters when an new Sqoop Driver instance is created.
56
	 * 
57
	 * @param tables
58
	 */
59
	public void run(Multimap<String, String> tables) throws Exception {
60

    
61
		for (Entry<String, String> table : tables.entries()) {
62

    
63
			log.info("Importing " + table);
64

    
65
			String[] str = { "export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans, "-Dmapreduce.job.reduces = " + sqoopReducersCount,
66

    
67
			"--connect", connectionUrl, "--table", table.getKey(),
68

    
69
			"--export-dir", table.getValue(),
70
 
71
			 "--input-fields-terminated-by", delim,
72
			 "--input-enclosed-by ", "\"",
73
			
74
			 "--verbose", "--username", dbUser, "--password", dbPass, "--driver", "org.postgresql.Driver", "--batch", "--mapreduce-job-name", "Sqoop Stats Import Job for " + table.getKey(), "--m", sqoopReducersCount };
75

    
76
			int ret = Sqoop.runTool(str);
77

    
78
			if (ret != 0) {
79
				log.error("Could not run Sqoop Tool " + Integer.toString(ret));
80
				throw new RuntimeException("Could not run Sqoop Tool " + Integer.toString(ret));
81
			}
82

    
83
			cleanUp(table.getKey());
84
		}
85
	}
86

    
87
	public void initSqoopJob() throws Exception {
88
		loadTableMappings();
89
		ArrayList<String> fileNames;
90
		if (useHdfsStore) {
91
			fileNames = listHdfsDir();
92
		} else {
93
			fileNames = listFilesystemDir();
94
		}
95
		// TODO keep this a Multimap collection since we can have multiple files
96
		// for
97
		// each table
98
		// when using multiple reducers ( each reducer creates an output file )
99
		Multimap<String, String> tables = ArrayListMultimap.create();
100

    
101
		// Table mappings containt the mapping between HDFS files and the Stats
102
		// DB table that each should be imported to
103

    
104
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
105
			String name = (String) e.getKey();
106
			log.info("Found tableMappings   " + name);
107

    
108
			for (String filename : fileNames) {
109

    
110
				String str = filename.substring(filename.lastIndexOf('/') + 1);
111

    
112
				String split[] = str.split("-");
113

    
114
				split[0] = split[0].replaceAll(".*/", "");
115

    
116
				if (split[0].equals(name)) {
117

    
118
					tables.put((String) e.getValue(), filename);
119
					log.info("    match   " +  e.getValue()  + "  " + filename);
120
				}
121
			}
122
		}
123

    
124
		long startTime = System.currentTimeMillis();
125
		log.info("    match   " +  tables.entries());
126

    
127
		try {
128
			this.run(tables);
129

    
130
		} catch (Exception e) {
131
			log.error("Error while importing tables in Sqoop: ", e);
132
			throw new Exception("Error while importing tables in Sqoop", e);
133
		}
134
		long endtime = System.currentTimeMillis();
135

    
136
		log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
137

    
138
	}
139

    
140
	public ArrayList<String> listHdfsDir() throws Exception {
141
		if (conf == null) {
142
			conf = new Configuration();
143
		}
144
		hdfs = FileSystem.get(conf);
145

    
146
		RemoteIterator<LocatedFileStatus> Files;
147

    
148
		try {
149
			Path exportPath = new Path(hdfs.getUri() + outputPath);
150
			Files = hdfs.listFiles(exportPath, false);
151
			if (Files == null) {
152
				log.error("No files generated in " + outputPath + " at " + exportPath.getName());
153
				throw new Exception("No files generated in " + outputPath + " at " + exportPath.getName() + " in " + hdfs.resolvePath(exportPath));
154

    
155
			}
156
		} catch (Exception e) {
157
			log.error("HDFS file path with exported data does not exist : " + outputPath + " at " + new Path(hdfs.getUri() + outputPath));
158
			throw new Exception("HDFS file path with exported data does not exist :   " + outputPath + " at " + new Path(hdfs.getUri() + outputPath), e);
159

    
160
		}
161
		ArrayList<String> fileNames = new ArrayList<String>();
162

    
163
		while (Files.hasNext()) {
164

    
165
			String fileName = Files.next().getPath().toString();
166

    
167
			log.info("Found hdfs file " + fileName);
168
			fileNames.add(fileName);
169

    
170
		}
171
		return fileNames;
172
	}
173

    
174
	public ArrayList<String> listFilesystemDir() throws Exception {
175
		ArrayList<String> fileNames = new ArrayList<String>();
176

    
177
		String files;
178
		File folder = new File(outputPath);
179
		File[] listOfFiles = folder.listFiles();
180

    
181
		if (listOfFiles == null) {
182
			log.error("No files generated in " + outputPath);
183
			throw new Exception("No files generated in " + outputPath);
184

    
185
		}
186

    
187
		for (int i = 0; i < listOfFiles.length; i++) {
188

    
189
			if (listOfFiles[i].isFile()) {
190
				files = listOfFiles[i].getAbsolutePath();
191
				log.info("Found " + files);
192
				fileNames.add(files);
193

    
194
			}
195
		}
196

    
197
		return fileNames;
198
	}
199

    
200
	public void loadTableMappings() throws Exception
201

    
202
	{
203
		tableMappings = new Properties();
204
		if (tableMapConf == null) {
205
			log.error("NULL TABLE MAP CONFIG  IN MAPPER  : ");
206
		}
207

    
208
		tableMapConf = tableMapConf.replaceAll(",", "\n");
209

    
210
		InputStream stream = new ByteArrayInputStream(tableMapConf.getBytes());
211

    
212
		tableMappings.load(stream);
213
		stream.close();
214
		log.info("tm" + tableMappings.entrySet());
215
		if (tableMappings == null || tableMappings.isEmpty()) {
216
			throw new Exception("Could not load Table Mconfappings in sqoop init job");
217
		}
218

    
219
	}
220

    
221
	/**
222
	 * Cleans up auto-generated Sqoop class files
223
	 * 
224
	 * @param table
225
	 */
226

    
227
	private void cleanUp(String table) {
228
		try {
229

    
230
			File file = new File(table + ".java");
231

    
232
			if (file.delete()) {
233
				log.info(file.getName() + " is deleted!");
234
			} else {
235
				log.info("Delete operation   failed.");
236
			}
237

    
238
		} catch (Exception e) {
239

    
240
			log.error("Delete operation   failed" + table);
241

    
242
		}
243
	}
244

    
245
	public Logger getLog() {
246
		return log;
247
	}
248

    
249
	public void setLog(Logger log) {
250
		this.log = log;
251
	}
252

    
253
	public Properties getTableMappings() {
254
		return tableMappings;
255
	}
256

    
257
	public void setTableMappings(Properties tableMappings) {
258
		this.tableMappings = tableMappings;
259
	}
260

    
261
	public String getOutputPath() {
262
		return outputPath;
263
	}
264

    
265
	public void setOutputPath(String outputPath) {
266
		this.outputPath = outputPath;
267
	}
268

    
269
	public String getDbUser() {
270
		return dbUser;
271
	}
272

    
273
	public void setDbUser(String dbUser) {
274
		this.dbUser = dbUser;
275
	}
276

    
277
	public String getDbPass() {
278
		return dbPass;
279
	}
280

    
281
	public void setDbPass(String dbPass) {
282
		this.dbPass = dbPass;
283
	}
284

    
285
	public String getConnectionUrl() {
286
		return connectionUrl;
287
	}
288

    
289
	public void setConnectionUrl(String connectionUrl) {
290
		this.connectionUrl = connectionUrl;
291
	}
292

    
293
	public String getRecsPerStatement() {
294
		return RecsPerStatement;
295
	}
296

    
297
	public void setRecsPerStatement(String recsPerStatement) {
298
		RecsPerStatement = recsPerStatement;
299
	}
300

    
301
	public String getStatementPerTrans() {
302
		return StatementPerTrans;
303
	}
304

    
305
	public void setStatementPerTrans(String statementPerTrans) {
306
		StatementPerTrans = statementPerTrans;
307
	}
308

    
309
	public String getDelim() {
310
		return delim;
311
	}
312

    
313
	public void setDelim(String delim) {
314
		this.delim = delim;
315
	}
316

    
317
	public String getSqoopReducersCount() {
318
		return sqoopReducersCount;
319
	}
320

    
321
	public void setSqoopReducersCount(String sqoopReducersCount) {
322
		this.sqoopReducersCount = sqoopReducersCount;
323
	}
324

    
325
	public FileSystem getHdfs() {
326
		return hdfs;
327
	}
328

    
329
	public void setHdfs(FileSystem hdfs) {
330
		this.hdfs = hdfs;
331
	}
332

    
333
	public Configuration getConf() {
334
		return conf;
335
	}
336

    
337
	public void setConf(Configuration conf) {
338
		this.conf = conf;
339
	}
340

    
341
	public boolean isUseHdfsStore() {
342
		return useHdfsStore;
343
	}
344

    
345
	public void setUseHdfsStore(boolean useHdfsStore) {
346
		this.useHdfsStore = useHdfsStore;
347
	}
348

    
349
	public boolean isBatch() {
350
		return batch;
351
	}
352

    
353
	public void setBatch(boolean batch) {
354
		this.batch = batch;
355
	}
356

    
357
	public boolean isVerbose() {
358
		return verbose;
359
	}
360

    
361
	public void setVerbose(boolean verbose) {
362
		this.verbose = verbose;
363
	}
364

    
365
	public String getTableMapConf() {
366
		return tableMapConf;
367
	}
368

    
369
	public void setTableMapConf(String tableMapConf) {
370
		this.tableMapConf = tableMapConf;
371
	}
372

    
373
	 
374

    
375
}
(2-2/3)