Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.File;
4
import java.io.InputStream;
5
import java.util.ArrayList;
6
import java.util.HashMap;
7
import java.util.Map.Entry;
8
import java.util.Properties;
9

    
10
import org.apache.hadoop.conf.Configuration;
11
import org.apache.hadoop.fs.FileSystem;
12
import org.apache.hadoop.fs.LocatedFileStatus;
13
import org.apache.hadoop.fs.Path;
14
import org.apache.hadoop.fs.RemoteIterator;
15
import org.apache.log4j.Logger;
16
import org.apache.sqoop.Sqoop;
17

    
18
public class SqoopDriver {
19
	private Logger log = Logger.getLogger(this.getClass());
20

    
21
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
22

    
23
	private Properties tableMappings = new Properties();
24

    
25
	private String outputPath;
26
	private String dbUser;
27
	private String dbPass;
28
	private String connectionUrl;
29
	// Number of statements in each batch insert op
30
	private String RecsPerStatement;
31
	// Number of statements for each commit ( commit every 1000 inserts, for
32
	// instance). Set to high value to reduce I/O costs.
33
	private String StatementPerTrans;
34
	// Field Seperator ( Delimeter ) used in import HDFS fields.
35
	private String delim;
36
	// Number of reducer classes that Sqoop can use.
37
	private String sqoopReducersCount;
38
	private FileSystem hdfs;
39
	private Configuration conf;
40
	private boolean useHdfsStore = true;
41

    
42
	/**
43
	 * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
44
	 * destination table> pair given in the @tables argument.
45
	 * 
46
	 * Needed parameters ( connection, database name, etc are set in class
47
	 * parameters when an new Sqoop Driver instance is created.
48
	 * 
49
	 * @param tables
50
	 */
51
	public void run(HashMap<String, String> tables) {
52

    
53
		for (Entry<String, String> table : tables.entrySet()) {
54

    
55
			log.error(table);
56

    
57
			String[] str = { "export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans,
58

    
59
			"--connect", connectionUrl, "--table", table.getKey(),
60

    
61
			"--export-dir ", table.getValue(),
62
					// TODO : important!! this is necessary since MR multiple
63
					// outputs writer appends a \t char after writing the KEY
64
					// field in HDFS.
65
					// use this so sqoop can ignore it
66
					"--optionally-enclosed-by", "	", "--input-fields-terminated-by", delim,
67

    
68
					"--verbose", "--username", dbUser, "--password", dbPass, "--batch", "--mapreduce-job-name", "Sqoop Stats Import", "-m", sqoopReducersCount };
69

    
70
			Sqoop.runTool(str);
71
			cleanUp(table.getKey());
72
		}
73
	}
74

    
75
	public void initSqoopJob() throws Exception {
76
		InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
77

    
78
		tableMappings.load(file);
79
		file.close();
80
		if (tableMappings == null || tableMappings.isEmpty()) {
81
			throw new Exception("Could not load Table Mappings in sqoop init job");
82
		}
83
		ArrayList<String> fileNames;
84
		if (useHdfsStore) {
85
			fileNames = listHdfsDir();
86
		} else {
87
			fileNames = listFilesystemDir();
88
		}
89

    
90
		HashMap<String, String> tables = new HashMap<String, String>();
91

    
92
		// Table mappings containt the mapping between HDFS files and the Stats
93
		// DB table that each should be imported to
94
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
95
			String name = (String) e.getKey();
96
			for (String filename : fileNames) {
97

    
98
				String split[] = filename.split("-");
99

    
100
				split[0] = split[0].replaceAll(".*/", "");
101

    
102
				if (split[0].equals(name)) {
103

    
104
					tables.put((String) e.getValue(), filename);
105

    
106
				}
107
			}
108
		}
109

    
110
		long startTime = System.currentTimeMillis();
111

    
112
		try {
113
			this.run(tables);
114

    
115
		} catch (Exception e) {
116
			log.error("Error while importing tables in Sqoop: ", e);
117
			throw new Exception("Error while importing tables in Sqoop", e);
118
		}
119
		long endtime = System.currentTimeMillis();
120

    
121
		log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
122

    
123
	}
124

    
125
	private ArrayList<String> listHdfsDir() throws Exception {
126
		hdfs = FileSystem.get(conf);
127

    
128
		RemoteIterator<LocatedFileStatus> Files;
129

    
130
		try {
131
			Path exportPath = new Path(hdfs.getUri() + outputPath);
132
			Files = hdfs.listFiles(exportPath, false);
133
			if (Files == null) {
134
				log.error("No files generated in " + outputPath + " at " + exportPath.getName());
135
				throw new Exception("No files generated in " + outputPath + " at " + exportPath.getName() + " in " + hdfs.resolvePath(exportPath));
136

    
137
			}
138
		} catch (Exception e) {
139
			log.error("HDFS file path with exported data does not exist : " + outputPath + " at " +new Path(hdfs.getUri() + outputPath));
140
			throw new Exception("HDFS file path with exported data does not exist :   " + outputPath + " at " + new Path(hdfs.getUri() + outputPath), e);
141

    
142
		}
143
		ArrayList<String> fileNames = new ArrayList<String>();
144

    
145
		while (Files.hasNext()) {
146

    
147
			String fileName = Files.next().getPath().toString();
148

    
149
			log.info(fileName);
150
			fileNames.add(fileName);
151

    
152
		}
153
		return fileNames;
154
	}
155

    
156
	private ArrayList<String> listFilesystemDir() throws Exception {
157
		ArrayList<String> fileNames = new ArrayList<String>();
158

    
159
		String files;
160
		File folder = new File(outputPath);
161
		File[] listOfFiles = folder.listFiles();
162

    
163
		if (listOfFiles == null) {
164
			log.error("No files generated in " + outputPath);
165
			throw new Exception("No files generated in " + outputPath);
166

    
167
		}
168

    
169
		for (int i = 0; i < listOfFiles.length; i++) {
170

    
171
			if (listOfFiles[i].isFile()) {
172
				files = listOfFiles[i].getAbsolutePath();
173
				log.info(files);
174
				fileNames.add(files);
175
				System.out.println(files);
176
			}
177
		}
178

    
179
		return fileNames;
180
	}
181

    
182
	/**
183
	 * Cleans up auto-generated Sqoop class files
184
	 * 
185
	 * @param table
186
	 */
187

    
188
	private void cleanUp(String table) {
189
		try {
190

    
191
			File file = new File(table + ".java");
192

    
193
			if (file.delete()) {
194
				log.info(file.getName() + " is deleted!");
195
			} else {
196
				log.info("Delete operation   failed.");
197
			}
198

    
199
		} catch (Exception e) {
200

    
201
			log.error("Delete operation   failed" + table);
202

    
203
		}
204
	}
205

    
206
	public String getConnectionUrl() {
207
		return connectionUrl;
208
	}
209

    
210
	public void setConnectionUrl(String connectionUrl) {
211
		this.connectionUrl = connectionUrl;
212
	}
213

    
214
	public String getDbUser() {
215
		return dbUser;
216
	}
217

    
218
	public void setDbUser(String dbUser) {
219
		this.dbUser = dbUser;
220
	}
221

    
222
	public String getDbPass() {
223
		return dbPass;
224
	}
225

    
226
	public void setDbPass(String dbPass) {
227
		this.dbPass = dbPass;
228
	}
229

    
230
	public String getDelim() {
231
		return delim;
232
	}
233

    
234
	public void setDelim(String delim) {
235
		this.delim = delim;
236
	}
237

    
238
	public String getReducersCount() {
239
		return sqoopReducersCount;
240
	}
241

    
242
	public void setReducersCount(String reducersCount) {
243
		this.sqoopReducersCount = reducersCount;
244
	}
245

    
246
	public String getRecsPerStatement() {
247
		return RecsPerStatement;
248
	}
249

    
250
	public void setRecsPerStatement(String recsPerStatement) {
251
		RecsPerStatement = recsPerStatement;
252
	}
253

    
254
	public String getStatementPerTrans() {
255
		return StatementPerTrans;
256
	}
257

    
258
	public void setStatementPerTrans(String statementPerTrans) {
259
		StatementPerTrans = statementPerTrans;
260
	}
261

    
262
	public Logger getLog() {
263
		return log;
264
	}
265

    
266
	public void setLog(Logger log) {
267
		this.log = log;
268
	}
269

    
270
	public String getTABLE_MAP_PATH() {
271
		return TABLE_MAP_PATH;
272
	}
273

    
274
	public void setTABLE_MAP_PATH(String tABLE_MAP_PATH) {
275
		TABLE_MAP_PATH = tABLE_MAP_PATH;
276
	}
277

    
278
	public Properties getTableMappings() {
279
		return tableMappings;
280
	}
281

    
282
	public void setTableMappings(Properties tableMappings) {
283
		this.tableMappings = tableMappings;
284
	}
285

    
286
	public String getOutputPath() {
287
		return outputPath;
288
	}
289

    
290
	public void setOutputPath(String outputPath) {
291
		this.outputPath = outputPath;
292
	}
293

    
294
	public String getSqoopReducersCount() {
295
		return sqoopReducersCount;
296
	}
297

    
298
	public void setSqoopReducersCount(String sqoopReducersCount) {
299
		this.sqoopReducersCount = sqoopReducersCount;
300
	}
301

    
302
	public FileSystem getHdfs() {
303
		return hdfs;
304
	}
305

    
306
	public void setHdfs(FileSystem hdfs) {
307
		this.hdfs = hdfs;
308
	}
309

    
310
	public boolean isUseHdfsStore() {
311
		return useHdfsStore;
312
	}
313

    
314
	public void setUseHdfsStore(boolean useHdfsStore) {
315
		this.useHdfsStore = useHdfsStore;
316
	}
317

    
318
	public Configuration getConf() {
319
		return conf;
320
	}
321

    
322
	public void setConf(Configuration conf) {
323
		this.conf = conf;
324
	}
325

    
326
}
(2-2/3)