Project

General

Profile

1 28111 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2 27955 claudio.at
3
import java.io.File;
4 28589 eri.katsar
import java.io.IOException;
5 28368 eri.katsar
import java.io.InputStream;
6 28111 eri.katsar
import java.util.ArrayList;
7 27955 claudio.at
import java.util.HashMap;
8
import java.util.Map.Entry;
9 28111 eri.katsar
import java.util.Properties;
10 27955 claudio.at
11 28111 eri.katsar
import org.apache.hadoop.conf.Configuration;
12
import org.apache.hadoop.fs.FileSystem;
13
import org.apache.hadoop.fs.LocatedFileStatus;
14
import org.apache.hadoop.fs.Path;
15
import org.apache.hadoop.fs.RemoteIterator;
16 27955 claudio.at
import org.apache.log4j.Logger;
17
import org.apache.sqoop.Sqoop;
18
19
public class SqoopDriver {
20
	private Logger log = Logger.getLogger(this.getClass());
21
22 28111 eri.katsar
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
23
24
	private Properties tableMappings = new Properties();
25 28584 eri.katsar
26 28111 eri.katsar
	private String outputPath;
27 27955 claudio.at
	private String dbUser;
28
	private String dbPass;
29
	private String connectionUrl;
30
	// Number of statements in each batch insert op
31
	private String RecsPerStatement;
32
	// Number of statements for each commit ( commit every 1000 inserts, for
33
	// instance). Set to high value to reduce I/O costs.
34
	private String StatementPerTrans;
35 28183 eri.katsar
	// Field Seperator ( Delimeter ) used in import HDFS fields.
36 27955 claudio.at
	private String delim;
37
	// Number of reducer classes that Sqoop can use.
38
	private String sqoopReducersCount;
39 28111 eri.katsar
	private FileSystem hdfs;
40 28557 eri.katsar
	private Configuration conf;
41 28553 eri.katsar
	private boolean useHdfsStore = true;
42 28495 eri.katsar
43 27955 claudio.at
	/**
44
	 * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
45
	 * destination table> pair given in the @tables argument.
46
	 *
47
	 * Needed parameters ( connection, database name, etc are set in class
48
	 * parameters when an new Sqoop Driver instance is created.
49
	 *
50
	 * @param tables
51
	 */
52 28591 eri.katsar
	public void run(HashMap<String, String> tables) throws Exception {
53 27955 claudio.at
54
		for (Entry<String, String> table : tables.entrySet()) {
55
56 28591 eri.katsar
			log.info(table);
57 28475 eri.katsar
58 27955 claudio.at
			String[] str = { "export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans,
59
60
			"--connect", connectionUrl, "--table", table.getKey(),
61
62 28564 eri.katsar
			"--export-dir ", table.getValue(),
63 27955 claudio.at
					// TODO : important!! this is necessary since MR multiple
64
					// outputs writer appends a \t char after writing the KEY
65
					// field in HDFS.
66
					// use this so sqoop can ignore it
67
					"--optionally-enclosed-by", "	", "--input-fields-terminated-by", delim,
68
69
					"--verbose", "--username", dbUser, "--password", dbPass, "--batch", "--mapreduce-job-name", "Sqoop Stats Import", "-m", sqoopReducersCount };
70
71 28591 eri.katsar
			try {
72 28594 eri.katsar
				Sqoop.runTool(str,conf);
73
74 28591 eri.katsar
			} catch (Exception ex) {
75
				log.error("Could not run Sqoop Tool " + ex.getMessage());
76
				throw new Exception("Could not run Sqoop Tool ", ex);
77
			}
78 27955 claudio.at
			cleanUp(table.getKey());
79
		}
80
	}
81
82 28589 eri.katsar
	public void loadTableMappings() throws Exception
83
84
	{
85 28368 eri.katsar
		InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
86 28138 eri.katsar
87 28368 eri.katsar
		tableMappings.load(file);
88
		file.close();
89 28475 eri.katsar
		if (tableMappings == null || tableMappings.isEmpty()) {
90
			throw new Exception("Could not load Table Mappings in sqoop init job");
91
		}
92 28589 eri.katsar
	}
93
94
	public void initSqoopJob() throws Exception {
95
		loadTableMappings();
96 28495 eri.katsar
		ArrayList<String> fileNames;
97
		if (useHdfsStore) {
98
			fileNames = listHdfsDir();
99
		} else {
100
			fileNames = listFilesystemDir();
101
		}
102 28368 eri.katsar
103 28111 eri.katsar
		HashMap<String, String> tables = new HashMap<String, String>();
104
105
		// Table mappings containt the mapping between HDFS files and the Stats
106
		// DB table that each should be imported to
107
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
108
			String name = (String) e.getKey();
109
			for (String filename : fileNames) {
110
111
				String split[] = filename.split("-");
112
113
				split[0] = split[0].replaceAll(".*/", "");
114
115
				if (split[0].equals(name)) {
116
117
					tables.put((String) e.getValue(), filename);
118
119
				}
120
			}
121
		}
122
123
		long startTime = System.currentTimeMillis();
124
125
		try {
126
			this.run(tables);
127
128
		} catch (Exception e) {
129
			log.error("Error while importing tables in Sqoop: ", e);
130 28138 eri.katsar
			throw new Exception("Error while importing tables in Sqoop", e);
131 28111 eri.katsar
		}
132
		long endtime = System.currentTimeMillis();
133
134
		log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
135
136
	}
137 28203 eri.katsar
138 28495 eri.katsar
	private ArrayList<String> listHdfsDir() throws Exception {
139 28557 eri.katsar
		hdfs = FileSystem.get(conf);
140 28495 eri.katsar
141 28584 eri.katsar
		RemoteIterator<LocatedFileStatus> Files;
142 28563 eri.katsar
143 28495 eri.katsar
		try {
144 28584 eri.katsar
			Path exportPath = new Path(hdfs.getUri() + outputPath);
145 28587 eri.katsar
			Files = hdfs.listFiles(exportPath, true);
146 28584 eri.katsar
			if (Files == null) {
147 28563 eri.katsar
				log.error("No files generated in " + outputPath + " at " + exportPath.getName());
148
				throw new Exception("No files generated in " + outputPath + " at " + exportPath.getName() + " in " + hdfs.resolvePath(exportPath));
149 28495 eri.katsar
150 28551 eri.katsar
			}
151 28563 eri.katsar
		} catch (Exception e) {
152 28589 eri.katsar
			log.error("HDFS file path with exported data does not exist : " + outputPath + " at " + new Path(hdfs.getUri() + outputPath));
153 28584 eri.katsar
			throw new Exception("HDFS file path with exported data does not exist :   " + outputPath + " at " + new Path(hdfs.getUri() + outputPath), e);
154 28495 eri.katsar
155
		}
156
		ArrayList<String> fileNames = new ArrayList<String>();
157
158 28584 eri.katsar
		while (Files.hasNext()) {
159 28495 eri.katsar
160 28584 eri.katsar
			String fileName = Files.next().getPath().toString();
161 28495 eri.katsar
162
			log.info(fileName);
163
			fileNames.add(fileName);
164
165
		}
166
		return fileNames;
167
	}
168
169
	private ArrayList<String> listFilesystemDir() throws Exception {
170
		ArrayList<String> fileNames = new ArrayList<String>();
171
172
		String files;
173
		File folder = new File(outputPath);
174
		File[] listOfFiles = folder.listFiles();
175
176 28509 eri.katsar
		if (listOfFiles == null) {
177
			log.error("No files generated in " + outputPath);
178
			throw new Exception("No files generated in " + outputPath);
179
180
		}
181
182 28495 eri.katsar
		for (int i = 0; i < listOfFiles.length; i++) {
183
184
			if (listOfFiles[i].isFile()) {
185 28496 eri.katsar
				files = listOfFiles[i].getAbsolutePath();
186 28495 eri.katsar
				log.info(files);
187
				fileNames.add(files);
188
				System.out.println(files);
189
			}
190
		}
191
192
		return fileNames;
193
	}
194
195 27955 claudio.at
	/**
196
	 * Cleans up auto-generated Sqoop class files
197
	 *
198
	 * @param table
199
	 */
200
201
	private void cleanUp(String table) {
202
		try {
203
204
			File file = new File(table + ".java");
205
206
			if (file.delete()) {
207
				log.info(file.getName() + " is deleted!");
208
			} else {
209
				log.info("Delete operation   failed.");
210
			}
211
212
		} catch (Exception e) {
213
214
			log.error("Delete operation   failed" + table);
215
216
		}
217
	}
218
219
	public String getConnectionUrl() {
220
		return connectionUrl;
221
	}
222
223
	public void setConnectionUrl(String connectionUrl) {
224
		this.connectionUrl = connectionUrl;
225
	}
226
227
	public String getDbUser() {
228
		return dbUser;
229
	}
230
231
	public void setDbUser(String dbUser) {
232
		this.dbUser = dbUser;
233
	}
234
235
	public String getDbPass() {
236
		return dbPass;
237
	}
238
239
	public void setDbPass(String dbPass) {
240
		this.dbPass = dbPass;
241
	}
242
243
	public String getDelim() {
244
		return delim;
245
	}
246
247
	public void setDelim(String delim) {
248
		this.delim = delim;
249
	}
250
251
	public String getReducersCount() {
252
		return sqoopReducersCount;
253
	}
254
255
	public void setReducersCount(String reducersCount) {
256
		this.sqoopReducersCount = reducersCount;
257
	}
258
259
	public String getRecsPerStatement() {
260
		return RecsPerStatement;
261
	}
262
263
	public void setRecsPerStatement(String recsPerStatement) {
264
		RecsPerStatement = recsPerStatement;
265
	}
266
267
	public String getStatementPerTrans() {
268
		return StatementPerTrans;
269
	}
270
271
	public void setStatementPerTrans(String statementPerTrans) {
272
		StatementPerTrans = statementPerTrans;
273
	}
274
275 28111 eri.katsar
	public Logger getLog() {
276
		return log;
277
	}
278
279
	public void setLog(Logger log) {
280
		this.log = log;
281
	}
282
283
	public String getTABLE_MAP_PATH() {
284
		return TABLE_MAP_PATH;
285
	}
286
287
	public void setTABLE_MAP_PATH(String tABLE_MAP_PATH) {
288
		TABLE_MAP_PATH = tABLE_MAP_PATH;
289
	}
290
291
	public Properties getTableMappings() {
292
		return tableMappings;
293
	}
294
295
	public void setTableMappings(Properties tableMappings) {
296
		this.tableMappings = tableMappings;
297
	}
298
299
	public String getOutputPath() {
300
		return outputPath;
301
	}
302
303
	public void setOutputPath(String outputPath) {
304
		this.outputPath = outputPath;
305
	}
306
307
	public String getSqoopReducersCount() {
308
		return sqoopReducersCount;
309
	}
310
311
	public void setSqoopReducersCount(String sqoopReducersCount) {
312
		this.sqoopReducersCount = sqoopReducersCount;
313
	}
314
315
	public FileSystem getHdfs() {
316
		return hdfs;
317
	}
318
319
	public void setHdfs(FileSystem hdfs) {
320
		this.hdfs = hdfs;
321
	}
322
323 28495 eri.katsar
	public boolean isUseHdfsStore() {
324
		return useHdfsStore;
325
	}
326
327
	public void setUseHdfsStore(boolean useHdfsStore) {
328
		this.useHdfsStore = useHdfsStore;
329
	}
330
331 28557 eri.katsar
	public Configuration getConf() {
332
		return conf;
333
	}
334
335
	public void setConf(Configuration conf) {
336
		this.conf = conf;
337
	}
338
339 27955 claudio.at
}