Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.File;
4
import java.io.FileNotFoundException;
5
import java.io.IOException;
6
import java.io.InputStream;
7
import java.util.ArrayList;
8
import java.util.HashMap;
9
import java.util.Map.Entry;
10
import java.util.Properties;
11

    
12
import org.apache.hadoop.conf.Configuration;
13
import org.apache.hadoop.fs.FileSystem;
14
import org.apache.hadoop.fs.LocatedFileStatus;
15
import org.apache.hadoop.fs.Path;
16
import org.apache.hadoop.fs.RemoteIterator;
17
import org.apache.log4j.Logger;
18
import org.apache.sqoop.Sqoop;
19

    
20
public class SqoopDriver {
21
	private Logger log = Logger.getLogger(this.getClass());
22

    
23
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
24

    
25
	private Properties tableMappings = new Properties();
26

    
27
	private String dbName;
28
	private String outputPath;
29
	private String dbUser;
30
	private String dbPass;
31
	private String connectionUrl;
32
	// Number of statements in each batch insert op
33
	private String RecsPerStatement;
34
	// Number of statements for each commit ( commit every 1000 inserts, for
35
	// instance). Set to high value to reduce I/O costs.
36
	private String StatementPerTrans;
37
	// Field Seperator ( Delimeter ) used in import HDFS fields.
38
	private String delim;
39
	// Number of reducer classes that Sqoop can use.
40
	private String sqoopReducersCount;
41
	private FileSystem hdfs;
42

    
43
	private boolean useHdfsStore = false;
44

    
45
	/**
46
	 * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
47
	 * destination table> pair given in the @tables argument.
48
	 * 
49
	 * Needed parameters ( connection, database name, etc are set in class
50
	 * parameters when an new Sqoop Driver instance is created.
51
	 * 
52
	 * @param tables
53
	 */
54
	public void run(HashMap<String, String> tables) {
55

    
56
		for (Entry<String, String> table : tables.entrySet()) {
57

    
58
			log.error(table);
59

    
60
			String[] str = { "export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans,
61

    
62
			"--connect", connectionUrl, "--table", table.getKey(),
63

    
64
			"--export-dir", table.getValue(),
65
					// TODO : important!! this is necessary since MR multiple
66
					// outputs writer appends a \t char after writing the KEY
67
					// field in HDFS.
68
					// use this so sqoop can ignore it
69
					"--optionally-enclosed-by", "	", "--input-fields-terminated-by", delim,
70

    
71
					"--verbose", "--username", dbUser, "--password", dbPass, "--batch", "--mapreduce-job-name", "Sqoop Stats Import", "-m", sqoopReducersCount };
72

    
73
			Sqoop.runTool(str);
74
			cleanUp(table.getKey());
75
		}
76
	}
77

    
78
	public void initSqoopJob() throws Exception {
79
		InputStream file = ClassLoader.getSystemResourceAsStream(TABLE_MAP_PATH);
80

    
81
		tableMappings.load(file);
82
		file.close();
83
		if (tableMappings == null || tableMappings.isEmpty()) {
84
			throw new Exception("Could not load Table Mappings in sqoop init job");
85
		}
86
		ArrayList<String> fileNames;
87
		if (useHdfsStore) {
88
			fileNames = listHdfsDir();
89
		} else {
90
			fileNames = listFilesystemDir();
91
		}
92

    
93
		HashMap<String, String> tables = new HashMap<String, String>();
94

    
95
		// Table mappings containt the mapping between HDFS files and the Stats
96
		// DB table that each should be imported to
97
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
98
			String name = (String) e.getKey();
99
			for (String filename : fileNames) {
100

    
101
				String split[] = filename.split("-");
102

    
103
				split[0] = split[0].replaceAll(".*/", "");
104

    
105
				if (split[0].equals(name)) {
106

    
107
					tables.put((String) e.getValue(), filename);
108

    
109
				}
110
			}
111
		}
112

    
113
		long startTime = System.currentTimeMillis();
114

    
115
		try {
116
			this.run(tables);
117

    
118
		} catch (Exception e) {
119
			log.error("Error while importing tables in Sqoop: ", e);
120
			throw new Exception("Error while importing tables in Sqoop", e);
121
		}
122
		long endtime = System.currentTimeMillis();
123

    
124
		log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
125

    
126
	}
127

    
128
	private ArrayList<String> listHdfsDir() throws Exception {
129
		hdfs = FileSystem.get(new Configuration());
130
		RemoteIterator<LocatedFileStatus> files;
131

    
132
		Path exportPath = new Path(hdfs.getUri() + outputPath);
133
		try {
134
			files = hdfs.listFiles(exportPath, false);
135

    
136
		} catch (FileNotFoundException e1) {
137
			log.error("HDFS file path with exported data does not exist : " + outputPath);
138
			throw new Exception("HDFS file path with exported data does not exist :   " + outputPath, e1);
139

    
140
		} catch (IOException e1) {
141
			log.error("HDFS file path with exported data does not exist :" + outputPath);
142
			throw new Exception("HDFS file path with exported data does not exist " + outputPath, e1);
143

    
144
		}
145
		ArrayList<String> fileNames = new ArrayList<String>();
146

    
147
		while (files.hasNext()) {
148

    
149
			String fileName = files.next().getPath().toString();
150

    
151
			log.info(fileName);
152
			fileNames.add(fileName);
153

    
154
		}
155
		return fileNames;
156
	}
157

    
158
	private ArrayList<String> listFilesystemDir() throws Exception {
159
		ArrayList<String> fileNames = new ArrayList<String>();
160

    
161
		String files;
162
		File folder = new File(outputPath);
163
		File[] listOfFiles = folder.listFiles();
164

    
165
		for (int i = 0; i < listOfFiles.length; i++) {
166

    
167
			if (listOfFiles[i].isFile()) {
168
				files = listOfFiles[i].getName();
169
				log.info(files);
170
				fileNames.add(files);
171
				System.out.println(files);
172
			}
173
		}
174

    
175
		return fileNames;
176
	}
177

    
178
	/**
179
	 * Cleans up auto-generated Sqoop class files
180
	 * 
181
	 * @param table
182
	 */
183

    
184
	private void cleanUp(String table) {
185
		try {
186

    
187
			File file = new File(table + ".java");
188

    
189
			if (file.delete()) {
190
				log.info(file.getName() + " is deleted!");
191
			} else {
192
				log.info("Delete operation   failed.");
193
			}
194

    
195
		} catch (Exception e) {
196

    
197
			log.error("Delete operation   failed" + table);
198

    
199
		}
200
	}
201

    
202
	public String getConnectionUrl() {
203
		return connectionUrl;
204
	}
205

    
206
	public void setConnectionUrl(String connectionUrl) {
207
		this.connectionUrl = connectionUrl;
208
	}
209

    
210
	public String getDbUser() {
211
		return dbUser;
212
	}
213

    
214
	public void setDbUser(String dbUser) {
215
		this.dbUser = dbUser;
216
	}
217

    
218
	public String getDbPass() {
219
		return dbPass;
220
	}
221

    
222
	public void setDbPass(String dbPass) {
223
		this.dbPass = dbPass;
224
	}
225

    
226
	public String getDelim() {
227
		return delim;
228
	}
229

    
230
	public void setDelim(String delim) {
231
		this.delim = delim;
232
	}
233

    
234
	public String getReducersCount() {
235
		return sqoopReducersCount;
236
	}
237

    
238
	public void setReducersCount(String reducersCount) {
239
		this.sqoopReducersCount = reducersCount;
240
	}
241

    
242
	public String getRecsPerStatement() {
243
		return RecsPerStatement;
244
	}
245

    
246
	public void setRecsPerStatement(String recsPerStatement) {
247
		RecsPerStatement = recsPerStatement;
248
	}
249

    
250
	public String getStatementPerTrans() {
251
		return StatementPerTrans;
252
	}
253

    
254
	public void setStatementPerTrans(String statementPerTrans) {
255
		StatementPerTrans = statementPerTrans;
256
	}
257

    
258
	public Logger getLog() {
259
		return log;
260
	}
261

    
262
	public void setLog(Logger log) {
263
		this.log = log;
264
	}
265

    
266
	public String getTABLE_MAP_PATH() {
267
		return TABLE_MAP_PATH;
268
	}
269

    
270
	public void setTABLE_MAP_PATH(String tABLE_MAP_PATH) {
271
		TABLE_MAP_PATH = tABLE_MAP_PATH;
272
	}
273

    
274
	public Properties getTableMappings() {
275
		return tableMappings;
276
	}
277

    
278
	public void setTableMappings(Properties tableMappings) {
279
		this.tableMappings = tableMappings;
280
	}
281

    
282
	public String getDbName() {
283
		return dbName;
284
	}
285

    
286
	public void setDbName(String dbName) {
287
		this.dbName = dbName;
288
	}
289

    
290
	public String getOutputPath() {
291
		return outputPath;
292
	}
293

    
294
	public void setOutputPath(String outputPath) {
295
		this.outputPath = outputPath;
296
	}
297

    
298
	public String getSqoopReducersCount() {
299
		return sqoopReducersCount;
300
	}
301

    
302
	public void setSqoopReducersCount(String sqoopReducersCount) {
303
		this.sqoopReducersCount = sqoopReducersCount;
304
	}
305

    
306
	public FileSystem getHdfs() {
307
		return hdfs;
308
	}
309

    
310
	public void setHdfs(FileSystem hdfs) {
311
		this.hdfs = hdfs;
312
	}
313

    
314
	public boolean isUseHdfsStore() {
315
		return useHdfsStore;
316
	}
317

    
318
	public void setUseHdfsStore(boolean useHdfsStore) {
319
		this.useHdfsStore = useHdfsStore;
320
	}
321

    
322
}
(2-2/3)