Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.File;
4
import java.io.IOException;
5
import java.io.InputStream;
6
import java.util.ArrayList;
7
import java.util.HashMap;
8
import java.util.Map.Entry;
9
import java.util.Properties;
10

    
11
import org.apache.hadoop.conf.Configuration;
12
import org.apache.hadoop.fs.FileSystem;
13
import org.apache.hadoop.fs.LocatedFileStatus;
14
import org.apache.hadoop.fs.Path;
15
import org.apache.hadoop.fs.RemoteIterator;
16
import org.apache.log4j.Logger;
17
import org.apache.sqoop.Sqoop;
18

    
19
public class SqoopDriver {
20
	private Logger log = Logger.getLogger(this.getClass());
21

    
22
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
23

    
24
	private Properties tableMappings = new Properties();
25

    
26
	private String outputPath;
27
	private String dbUser;
28
	private String dbPass;
29
	private String connectionUrl;
30
	// Number of statements in each batch insert op
31
	private String RecsPerStatement;
32
	// Number of statements for each commit ( commit every 1000 inserts, for
33
	// instance). Set to high value to reduce I/O costs.
34
	private String StatementPerTrans;
35
	// Field Seperator ( Delimeter ) used in import HDFS fields.
36
	private String delim;
37
	// Number of reducer classes that Sqoop can use.
38
	private String sqoopReducersCount;
39
	private FileSystem hdfs;
40
	private Configuration conf;
41
	private boolean useHdfsStore = true;
42

    
43
	/**
44
	 * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
45
	 * destination table> pair given in the @tables argument.
46
	 * 
47
	 * Needed parameters ( connection, database name, etc are set in class
48
	 * parameters when an new Sqoop Driver instance is created.
49
	 * 
50
	 * @param tables
51
	 */
52
	public void run(HashMap<String, String> tables) throws Exception {
53

    
54
		for (Entry<String, String> table : tables.entrySet()) {
55

    
56
			log.info(table);
57

    
58
			String[] str = { "export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans,
59

    
60
			"--connect", connectionUrl, "--table", table.getKey(),
61

    
62
			"--export-dir ", table.getValue(),
63
					// TODO : important!! this is necessary since MR multiple
64
					// outputs writer appends a \t char after writing the KEY
65
					// field in HDFS.
66
					// use this so sqoop can ignore it
67
					"--optionally-enclosed-by", "	", "--input-fields-terminated-by", delim,
68

    
69
					"--verbose", "--username", dbUser, "--password", dbPass, "--batch", "--mapreduce-job-name", "Sqoop Stats Import", "-m", sqoopReducersCount };
70

    
71
			try {
72
				Sqoop.runTool(str,conf);
73
				 
74
			} catch (Exception ex) {
75
				log.error("Could not run Sqoop Tool " + ex.getMessage());
76
				throw new Exception("Could not run Sqoop Tool ", ex);
77
			}
78
			cleanUp(table.getKey());
79
		}
80
	}
81

    
82
	public void loadTableMappings() throws Exception
83

    
84
	{
85
		InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
86

    
87
		tableMappings.load(file);
88
		file.close();
89
		if (tableMappings == null || tableMappings.isEmpty()) {
90
			throw new Exception("Could not load Table Mappings in sqoop init job");
91
		}
92
	}
93

    
94
	public void initSqoopJob() throws Exception {
95
		loadTableMappings();
96
		ArrayList<String> fileNames;
97
		if (useHdfsStore) {
98
			fileNames = listHdfsDir();
99
		} else {
100
			fileNames = listFilesystemDir();
101
		}
102

    
103
		HashMap<String, String> tables = new HashMap<String, String>();
104

    
105
		// Table mappings containt the mapping between HDFS files and the Stats
106
		// DB table that each should be imported to
107
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
108
			String name = (String) e.getKey();
109
			for (String filename : fileNames) {
110

    
111
				String split[] = filename.split("-");
112

    
113
				split[0] = split[0].replaceAll(".*/", "");
114

    
115
				if (split[0].equals(name)) {
116

    
117
					tables.put((String) e.getValue(), filename);
118

    
119
				}
120
			}
121
		}
122

    
123
		long startTime = System.currentTimeMillis();
124

    
125
		try {
126
			this.run(tables);
127

    
128
		} catch (Exception e) {
129
			log.error("Error while importing tables in Sqoop: ", e);
130
			throw new Exception("Error while importing tables in Sqoop", e);
131
		}
132
		long endtime = System.currentTimeMillis();
133

    
134
		log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
135

    
136
	}
137

    
138
	private ArrayList<String> listHdfsDir() throws Exception {
139
		hdfs = FileSystem.get(conf);
140

    
141
		RemoteIterator<LocatedFileStatus> Files;
142

    
143
		try {
144
			Path exportPath = new Path(hdfs.getUri() + outputPath);
145
			Files = hdfs.listFiles(exportPath, true);
146
			if (Files == null) {
147
				log.error("No files generated in " + outputPath + " at " + exportPath.getName());
148
				throw new Exception("No files generated in " + outputPath + " at " + exportPath.getName() + " in " + hdfs.resolvePath(exportPath));
149

    
150
			}
151
		} catch (Exception e) {
152
			log.error("HDFS file path with exported data does not exist : " + outputPath + " at " + new Path(hdfs.getUri() + outputPath));
153
			throw new Exception("HDFS file path with exported data does not exist :   " + outputPath + " at " + new Path(hdfs.getUri() + outputPath), e);
154

    
155
		}
156
		ArrayList<String> fileNames = new ArrayList<String>();
157

    
158
		while (Files.hasNext()) {
159

    
160
			String fileName = Files.next().getPath().toString();
161

    
162
			log.info(fileName);
163
			fileNames.add(fileName);
164

    
165
		}
166
		return fileNames;
167
	}
168

    
169
	private ArrayList<String> listFilesystemDir() throws Exception {
170
		ArrayList<String> fileNames = new ArrayList<String>();
171

    
172
		String files;
173
		File folder = new File(outputPath);
174
		File[] listOfFiles = folder.listFiles();
175

    
176
		if (listOfFiles == null) {
177
			log.error("No files generated in " + outputPath);
178
			throw new Exception("No files generated in " + outputPath);
179

    
180
		}
181

    
182
		for (int i = 0; i < listOfFiles.length; i++) {
183

    
184
			if (listOfFiles[i].isFile()) {
185
				files = listOfFiles[i].getAbsolutePath();
186
				log.info(files);
187
				fileNames.add(files);
188
				System.out.println(files);
189
			}
190
		}
191

    
192
		return fileNames;
193
	}
194

    
195
	/**
196
	 * Cleans up auto-generated Sqoop class files
197
	 * 
198
	 * @param table
199
	 */
200

    
201
	private void cleanUp(String table) {
202
		try {
203

    
204
			File file = new File(table + ".java");
205

    
206
			if (file.delete()) {
207
				log.info(file.getName() + " is deleted!");
208
			} else {
209
				log.info("Delete operation   failed.");
210
			}
211

    
212
		} catch (Exception e) {
213

    
214
			log.error("Delete operation   failed" + table);
215

    
216
		}
217
	}
218

    
219
	public String getConnectionUrl() {
220
		return connectionUrl;
221
	}
222

    
223
	public void setConnectionUrl(String connectionUrl) {
224
		this.connectionUrl = connectionUrl;
225
	}
226

    
227
	public String getDbUser() {
228
		return dbUser;
229
	}
230

    
231
	public void setDbUser(String dbUser) {
232
		this.dbUser = dbUser;
233
	}
234

    
235
	public String getDbPass() {
236
		return dbPass;
237
	}
238

    
239
	public void setDbPass(String dbPass) {
240
		this.dbPass = dbPass;
241
	}
242

    
243
	public String getDelim() {
244
		return delim;
245
	}
246

    
247
	public void setDelim(String delim) {
248
		this.delim = delim;
249
	}
250

    
251
	public String getReducersCount() {
252
		return sqoopReducersCount;
253
	}
254

    
255
	public void setReducersCount(String reducersCount) {
256
		this.sqoopReducersCount = reducersCount;
257
	}
258

    
259
	public String getRecsPerStatement() {
260
		return RecsPerStatement;
261
	}
262

    
263
	public void setRecsPerStatement(String recsPerStatement) {
264
		RecsPerStatement = recsPerStatement;
265
	}
266

    
267
	public String getStatementPerTrans() {
268
		return StatementPerTrans;
269
	}
270

    
271
	public void setStatementPerTrans(String statementPerTrans) {
272
		StatementPerTrans = statementPerTrans;
273
	}
274

    
275
	public Logger getLog() {
276
		return log;
277
	}
278

    
279
	public void setLog(Logger log) {
280
		this.log = log;
281
	}
282

    
283
	public String getTABLE_MAP_PATH() {
284
		return TABLE_MAP_PATH;
285
	}
286

    
287
	public void setTABLE_MAP_PATH(String tABLE_MAP_PATH) {
288
		TABLE_MAP_PATH = tABLE_MAP_PATH;
289
	}
290

    
291
	public Properties getTableMappings() {
292
		return tableMappings;
293
	}
294

    
295
	public void setTableMappings(Properties tableMappings) {
296
		this.tableMappings = tableMappings;
297
	}
298

    
299
	public String getOutputPath() {
300
		return outputPath;
301
	}
302

    
303
	public void setOutputPath(String outputPath) {
304
		this.outputPath = outputPath;
305
	}
306

    
307
	public String getSqoopReducersCount() {
308
		return sqoopReducersCount;
309
	}
310

    
311
	public void setSqoopReducersCount(String sqoopReducersCount) {
312
		this.sqoopReducersCount = sqoopReducersCount;
313
	}
314

    
315
	public FileSystem getHdfs() {
316
		return hdfs;
317
	}
318

    
319
	public void setHdfs(FileSystem hdfs) {
320
		this.hdfs = hdfs;
321
	}
322

    
323
	public boolean isUseHdfsStore() {
324
		return useHdfsStore;
325
	}
326

    
327
	public void setUseHdfsStore(boolean useHdfsStore) {
328
		this.useHdfsStore = useHdfsStore;
329
	}
330

    
331
	public Configuration getConf() {
332
		return conf;
333
	}
334

    
335
	public void setConf(Configuration conf) {
336
		this.conf = conf;
337
	}
338

    
339
}
(2-2/3)