Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.File;
4
import java.io.FileNotFoundException;
5
import java.io.IOException;
6
import java.io.InputStream;
7
import java.util.ArrayList;
8
import java.util.HashMap;
9
import java.util.Map.Entry;
10
import java.util.Properties;
11

    
12
import org.apache.hadoop.conf.Configuration;
13
import org.apache.hadoop.fs.FileSystem;
14
import org.apache.hadoop.fs.LocatedFileStatus;
15
import org.apache.hadoop.fs.Path;
16
import org.apache.hadoop.fs.RemoteIterator;
17
import org.apache.log4j.Logger;
18
import org.apache.sqoop.Sqoop;
19
import org.junit.Before;
20
import org.junit.Test;
21

    
22
import eu.dnetlib.data.mapreduce.hbase.statsExport.daos.SqlDAO;
23
import eu.dnetlib.data.mapreduce.hbase.statsExport.daos.SqlStore;
24

    
25
public class SqoopDriver {
26
	private Logger log = Logger.getLogger(this.getClass());
27

    
28
	private String TABLE_MAP_PATH = "eu/dnetlib/data/mapreduce/hbase/statsExport/exportTables";
29

    
30
	private Properties tableMappings = new Properties();
31

    
32
	private SqlDAO statsDao;
33
	private SqlStore statsStore;
34

    
35
	private Properties props;
36

    
37
	private String dbName;
38
	private String outputPath;
39
	private String dbUser;
40

    
41
	private String dbPass;
42

    
43
	private String connectionUrl;
44
	// Number of statements in each batch insert op
45
	private String RecsPerStatement;
46
	// Number of statements for each commit ( commit every 1000 inserts, for
47
	// instance). Set to high value to reduce I/O costs.
48
	private String StatementPerTrans;
49
	// Field Seperator ( Delimeter ) used in import HDFS fiels.
50
	private String delim;
51
	// Number of reducer classes that Sqoop can use.
52
	private String sqoopReducersCount;
53

    
54
	private FileSystem hdfs;
55

    
56
	public void init() throws IOException {
57
		InputStream file = ClassLoader.getSystemResourceAsStream("eu/dnetlib/data/mapreduce/hbase/statsExport/StatsProperties");
58
		props = new Properties();
59
		props.load(file);
60
		file.close();
61

    
62
		this.delim = props.getProperty("Stats.delimCharacter");
63

    
64
		statsStore = new SqlStore();
65

    
66
		statsStore.setDbName(props.getProperty("Stats.dbName"));
67

    
68
		statsStore.setDbDriver(props.getProperty("Stats.dbDriver"));
69

    
70
		statsStore.setDB_URL(props.getProperty("Stats.dbUrl"));
71
		statsStore.setDbUser(props.getProperty("Stats.dbUser"));
72
		statsStore.setDbPassword(props.getProperty("Stats.dbPass"));
73

    
74
		statsDao = new SqlDAO();
75

    
76
		statsDao.setStore(statsStore);
77
		this.outputPath = props.getProperty("Stats.outputPath");
78
		this.setDbName(props.getProperty("Stats.dbName"));
79
		this.setConnectionUrl(props.getProperty("Stats.dbUrl") + "/" + props.getProperty("Stats.dbName"));
80
		this.setDbUser(props.getProperty("Stats.dbUser"));
81
		this.setDbPass(props.getProperty("Stats.dbPass"));
82
		this.setReducersCount(props.getProperty("Stats.sqoopReducersCount"));
83
		this.setRecsPerStatement(props.getProperty("Stats.sqoopRecsPerStatement"));
84
		this.setStatementPerTrans(props.getProperty("Stats.sqoopStatementPerTrans"));
85
		hdfs = FileSystem.get(new Configuration());
86
	}
87

    
88
	/**
89
	 * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
90
	 * destination table> pair given in the @tables argument.
91
	 * 
92
	 * Needed parameters ( connection, database name, etc are set in class
93
	 * parameters when an new Sqoop Driver instance is created.
94
	 * 
95
	 * @param tables
96
	 */
97
	public void run(HashMap<String, String> tables) {
98

    
99
		for (Entry<String, String> table : tables.entrySet()) {
100

    
101
			String[] str = { "export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans,
102

    
103
			"--connect", connectionUrl, "--table", table.getKey(),
104

    
105
			"--export-dir", table.getValue(),
106
					// TODO : important!! this is necessary since MR multiple
107
					// outputs writer appends a \t char after writing the KEY
108
					// field in HDFS.
109
					// use this so sqoop can ignore it
110
					"--optionally-enclosed-by", "	", "--input-fields-terminated-by", delim,
111

    
112
					"--verbose", "--username", dbUser, "--password", dbPass, "--batch", "--mapreduce-job-name", "Sqoop Stats Import", "-m", sqoopReducersCount };
113

    
114
			Sqoop.runTool(str);
115
			cleanUp(table.getKey());
116
		}
117
	}
118

    
119
	public void initSqoopJob() throws Exception {
120

    
121
		init(); 
122
		// PREPARE THE STATS BACKEND ( SHADOW SCHEMA FOR IMPORT)
123

    
124
		prepareDB();
125

    
126
		RemoteIterator<LocatedFileStatus> files;
127

    
128
		try {
129
			files = hdfs.listFiles(new Path(outputPath), false);
130
		} catch (FileNotFoundException e1) {
131
			log.error("HDFS file path with exported data does not exist");
132
			throw new Exception("HDFS file path with exported data does not exist", e1);
133

    
134
		} catch (IOException e1) {
135
			log.error("HDFS file path with exported data does not exist");
136
			throw new Exception("HDFS file path with exported data does not exist", e1);
137

    
138
		}
139
		HashMap<String, String> tables = new HashMap<String, String>();
140
		ArrayList<String> fileNames = new ArrayList<String>();
141

    
142
		while (files.hasNext()) {
143

    
144
			String fileName = files.next().getPath().toString();
145

    
146
			fileNames.add(fileName);
147

    
148
		}
149
		// Table mappings containt the mapping between HDFS files and the Stats
150
		// DB table that each should be imported to
151
		for (Entry<Object, Object> e : tableMappings.entrySet()) {
152
			String name = (String) e.getKey();
153
			for (String filename : fileNames) {
154

    
155
				String split[] = filename.split("-");
156

    
157
				split[0] = split[0].replaceAll(".*/", "");
158

    
159
				if (split[0].equals(name)) {
160

    
161
					tables.put((String) e.getValue(), filename);
162

    
163
				}
164
			}
165
		}
166

    
167
		long startTime = System.currentTimeMillis();
168

    
169
		try {
170
			this.run(tables);
171

    
172
		} catch (Exception e) {
173
			log.error("Error while importing tables in Sqoop: ", e);
174
			throw new Exception("Error while importing tables in Sqoop", e);
175
		}
176
		long endtime = System.currentTimeMillis();
177

    
178
		log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
179

    
180
		finalizedDB();
181
	}
182

    
183
	/**
184
	 * Prepares shadow schema in the database that will be used to import data
185
	 * 
186
	 * @throws Exception
187
	 */
188

    
189
	private void prepareDB() throws Exception {
190
		try {
191

    
192
			statsDao.createSchema(dbName);
193

    
194
		} catch (Exception e) {
195
			log.error("Error Preparing Schema for DB import. ", e);
196
			throw new Exception("Error Preparing Schema for DB import.", e);
197
		}
198
	}
199

    
200
	private void finalizedDB() throws Exception {
201

    
202
		try {
203
			statsDao.insertContexts();
204
			statsDao.buildViews();
205
			statsDao.executeExtraInserts();
206
			// Leave indexes out for testing phase
207
			// statsDao.buildIndexes();
208

    
209
		} catch (Exception e) {
210
			log.error("Error while finalizing SQL DB", e);
211
			throw new Exception("Error while finalizing SQL DB", e);
212
		}
213
	}
214

    
215
	/**
216
	 * Cleans up auto-generated Sqoop class files
217
	 * 
218
	 * @param table
219
	 */
220

    
221
	private void cleanUp(String table) {
222
		try {
223

    
224
			File file = new File(table + ".java");
225

    
226
			if (file.delete()) {
227
				log.info(file.getName() + " is deleted!");
228
			} else {
229
				log.info("Delete operation   failed.");
230
			}
231

    
232
		} catch (Exception e) {
233

    
234
			log.error("Delete operation   failed" + table);
235

    
236
		}
237
	}
238

    
239
	public String getConnectionUrl() {
240
		return connectionUrl;
241
	}
242

    
243
	public void setConnectionUrl(String connectionUrl) {
244
		this.connectionUrl = connectionUrl;
245
	}
246

    
247
	public String getDbUser() {
248
		return dbUser;
249
	}
250

    
251
	public void setDbUser(String dbUser) {
252
		this.dbUser = dbUser;
253
	}
254

    
255
	public String getDbPass() {
256
		return dbPass;
257
	}
258

    
259
	public void setDbPass(String dbPass) {
260
		this.dbPass = dbPass;
261
	}
262

    
263
	public String getDelim() {
264
		return delim;
265
	}
266

    
267
	public void setDelim(String delim) {
268
		this.delim = delim;
269
	}
270

    
271
	public String getReducersCount() {
272
		return sqoopReducersCount;
273
	}
274

    
275
	public void setReducersCount(String reducersCount) {
276
		this.sqoopReducersCount = reducersCount;
277
	}
278

    
279
	public String getRecsPerStatement() {
280
		return RecsPerStatement;
281
	}
282

    
283
	public void setRecsPerStatement(String recsPerStatement) {
284
		RecsPerStatement = recsPerStatement;
285
	}
286

    
287
	public String getStatementPerTrans() {
288
		return StatementPerTrans;
289
	}
290

    
291
	public void setStatementPerTrans(String statementPerTrans) {
292
		StatementPerTrans = statementPerTrans;
293
	}
294

    
295
	public Logger getLog() {
296
		return log;
297
	}
298

    
299
	public void setLog(Logger log) {
300
		this.log = log;
301
	}
302

    
303
	public String getTABLE_MAP_PATH() {
304
		return TABLE_MAP_PATH;
305
	}
306

    
307
	public void setTABLE_MAP_PATH(String tABLE_MAP_PATH) {
308
		TABLE_MAP_PATH = tABLE_MAP_PATH;
309
	}
310

    
311
	public Properties getTableMappings() {
312
		return tableMappings;
313
	}
314

    
315
	public void setTableMappings(Properties tableMappings) {
316
		this.tableMappings = tableMappings;
317
	}
318

    
319
	public SqlDAO getStatsDao() {
320
		return statsDao;
321
	}
322

    
323
	public void setStatsDao(SqlDAO statsDao) {
324
		this.statsDao = statsDao;
325
	}
326

    
327
	public SqlStore getStatsStore() {
328
		return statsStore;
329
	}
330

    
331
	public void setStatsStore(SqlStore statsStore) {
332
		this.statsStore = statsStore;
333
	}
334

    
335
	public Properties getProps() {
336
		return props;
337
	}
338

    
339
	public void setProps(Properties props) {
340
		this.props = props;
341
	}
342

    
343
	public String getDbName() {
344
		return dbName;
345
	}
346

    
347
	public void setDbName(String dbName) {
348
		this.dbName = dbName;
349
	}
350

    
351
	public String getOutputPath() {
352
		return outputPath;
353
	}
354

    
355
	public void setOutputPath(String outputPath) {
356
		this.outputPath = outputPath;
357
	}
358

    
359
	public String getSqoopReducersCount() {
360
		return sqoopReducersCount;
361
	}
362

    
363
	public void setSqoopReducersCount(String sqoopReducersCount) {
364
		this.sqoopReducersCount = sqoopReducersCount;
365
	}
366

    
367
	public FileSystem getHdfs() {
368
		return hdfs;
369
	}
370

    
371
	public void setHdfs(FileSystem hdfs) {
372
		this.hdfs = hdfs;
373
	}
374

    
375
}
(1-1/2)