Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.drivers;
2

    
3
import java.io.ByteArrayInputStream;
4
import java.io.File;
5
import java.io.InputStream;
6
import java.util.ArrayList;
7
import java.util.HashMap;
8
import java.util.List;
9
import java.util.Map;
10
import java.util.Map.Entry;
11
import java.util.Properties;
12

    
13
import org.apache.hadoop.conf.Configuration;
14
import org.apache.hadoop.fs.FileSystem;
15
import org.apache.hadoop.fs.LocatedFileStatus;
16
import org.apache.hadoop.fs.Path;
17
import org.apache.hadoop.fs.RemoteIterator;
18
import org.apache.log4j.Logger;
19
import org.apache.sqoop.Sqoop;
20
import org.junit.experimental.runners.Enclosed;
21

    
22
import com.google.common.collect.ArrayListMultimap;
23
import com.google.common.collect.Multimap;
24

    
25
public class SqoopDriver {
26
    private Logger log = Logger.getLogger(this.getClass());
27

    
28
    private Properties tableMappings = new Properties();
29

    
30
    private String outputPath;
31
    private String dbUser;
32
    private String dbPass;
33
    private String connectionUrl;
34
    // Number of statements in each batch insert op
35
    private String RecsPerStatement;
36
    // Number of statements for each commit ( commit every 1000 inserts, for
37
    // instance). Set to high value to reduce I/O costs.
38
    private String StatementPerTrans;
39
    // Field Seperator ( Delimeter ) used in import HDFS fields.
40
    private String delim;
41
    // Number of reducer classes that Sqoop can use.
42
    private String sqoopReducersCount;
43
    private FileSystem hdfs;
44
    private Configuration conf;
45
    private boolean useHdfsStore = true;
46
    private boolean batch = true;
47
    private boolean verbose = true;
48
    private String tableMapConf;
49
    private String enclosed;
50

    
51
    /**
52
     * Driver for the Sqoop tool. Calls the Sqoop Client for each <input file,
53
     * destination table> pair given in the @tables argument.
54
     * <p/>
55
     * Needed parameters ( connection, database name, etc are set in class
56
     * parameters when an new Sqoop Driver instance is created.
57
     *
58
     * @param tables
59
     */
60
    public void run(Multimap<String, String> tables) throws Exception {
61

    
62
        for (Entry<String, String> table : tables.entries()) {
63

    
64
            log.info("Importing " + table);
65

    
66
            String[] str = {"export", "-Dsqoop.export.records.per.statement =" + RecsPerStatement, "-Dsqoop.export.statements.per.transaction = " + StatementPerTrans, "-Dmapreduce.job.reduces = " + sqoopReducersCount,
67

    
68
                    "--connect", connectionUrl, "--table", table.getKey(),
69

    
70
                    "--export-dir", table.getValue(),
71

    
72
                    "--input-fields-terminated-by", delim,
73
                    //			"--input-enclosed-by", enclosed,
74
                    "--optionally-enclosed-by", enclosed,
75

    
76
                    "--verbose", "--username", dbUser, "--password", dbPass, "--driver", "org.postgresql.Driver", "--batch", "--mapreduce-job-name", "Sqoop Stats Import Job for " + table.getKey(), "--m", sqoopReducersCount};
77

    
78
            int ret = Sqoop.runTool(str);
79

    
80
            if (ret != 0) {
81
                log.error("Could not run Sqoop Tool " + Integer.toString(ret));
82
                throw new RuntimeException("Could not run Sqoop Tool " + Integer.toString(ret));
83
            }
84

    
85
            cleanUp(table.getKey());
86
        }
87
    }
88

    
89
    public void initSqoopJob() throws Exception {
90
        loadTableMappings();
91
        ArrayList<String> fileNames;
92
        if (useHdfsStore) {
93
            fileNames = listHdfsDir();
94
        } else {
95
            fileNames = listFilesystemDir();
96
        }
97
        // TODO keep this a Multimap collection since we can have multiple files
98
        // for
99
        // each table
100
        // when using multiple reducers ( each reducer creates an output file )
101
        Multimap<String, String> tables = ArrayListMultimap.create();
102

    
103
        // Table mappings containt the mapping between HDFS files and the Stats
104
        // DB table that each should be imported to
105

    
106
        for (Entry<Object, Object> e : tableMappings.entrySet()) {
107
            String name = (String) e.getKey();
108
            log.info("Found tableMappings   " + name);
109

    
110
            for (String filename : fileNames) {
111

    
112
                String str = filename.substring(filename.lastIndexOf('/') + 1);
113

    
114
                String split[] = str.split("-");
115

    
116
                split[0] = split[0].replaceAll(".*/", "");
117

    
118
                if (split[0].equals(name)) {
119

    
120
                    tables.put((String) e.getValue(), filename);
121
                    log.info("    match   " + e.getValue() + "  " + filename);
122
                }
123
            }
124
        }
125

    
126
        long startTime = System.currentTimeMillis();
127
        log.info("    match   " + tables.entries());
128

    
129
        try {
130
            this.run(tables);
131

    
132
        } catch (Exception e) {
133
            log.error("Error while importing tables in Sqoop: ", e);
134
            throw new Exception("Error while importing tables in Sqoop", e);
135
        }
136
        long endtime = System.currentTimeMillis();
137

    
138
        log.info("Time taken for Sqoop Import : " + (endtime - startTime) / 60000 + " minutes");
139

    
140
    }
141

    
142
    public ArrayList<String> listHdfsDir() throws Exception {
143
        if (conf == null) {
144
            conf = new Configuration();
145
        }
146
        hdfs = FileSystem.get(conf);
147

    
148
        RemoteIterator<LocatedFileStatus> Files;
149

    
150
        try {
151
            Path exportPath = new Path(hdfs.getUri() + outputPath);
152
            Files = hdfs.listFiles(exportPath, false);
153
            if (Files == null) {
154
                log.error("No files generated in " + outputPath + " at " + exportPath.getName());
155
                throw new Exception("No files generated in " + outputPath + " at " + exportPath.getName() + " in " + hdfs.resolvePath(exportPath));
156

    
157
            }
158
        } catch (Exception e) {
159
            log.error("HDFS file path with exported data does not exist : " + outputPath + " at " + new Path(hdfs.getUri() + outputPath));
160
            throw new Exception("HDFS file path with exported data does not exist :   " + outputPath + " at " + new Path(hdfs.getUri() + outputPath), e);
161

    
162
        }
163
        ArrayList<String> fileNames = new ArrayList<String>();
164

    
165
        while (Files.hasNext()) {
166

    
167
            String fileName = Files.next().getPath().toString();
168

    
169
            log.info("Found hdfs file " + fileName);
170
            fileNames.add(fileName);
171

    
172
        }
173
        return fileNames;
174
    }
175

    
176
    public ArrayList<String> listFilesystemDir() throws Exception {
177
        ArrayList<String> fileNames = new ArrayList<String>();
178

    
179
        String files;
180
        File folder = new File(outputPath);
181
        File[] listOfFiles = folder.listFiles();
182

    
183
        if (listOfFiles == null) {
184
            log.error("No files generated in " + outputPath);
185
            throw new Exception("No files generated in " + outputPath);
186

    
187
        }
188

    
189
        for (int i = 0; i < listOfFiles.length; i++) {
190

    
191
            if (listOfFiles[i].isFile()) {
192
                files = listOfFiles[i].getAbsolutePath();
193
                log.info("Found " + files);
194
                fileNames.add(files);
195

    
196
            }
197
        }
198

    
199
        return fileNames;
200
    }
201

    
202
    public void loadTableMappings() throws Exception
203

    
204
    {
205
        tableMappings = new Properties();
206
        if (tableMapConf == null) {
207
            log.error("NULL TABLE MAP CONFIG  IN MAPPER  : ");
208
        }
209

    
210
        tableMapConf = tableMapConf.replaceAll(",", "\n");
211

    
212
        InputStream stream = new ByteArrayInputStream(tableMapConf.getBytes());
213

    
214
        tableMappings.load(stream);
215
        stream.close();
216
        log.info("tm" + tableMappings.entrySet());
217
        if (tableMappings == null || tableMappings.isEmpty()) {
218
            throw new Exception("Could not load Table Mconfappings in sqoop init job");
219
        }
220

    
221
    }
222

    
223
    /**
224
     * Cleans up auto-generated Sqoop class files
225
     *
226
     * @param table
227
     */
228

    
229
    private void cleanUp(String table) {
230
        try {
231

    
232
            File file = new File(table + ".java");
233

    
234
            if (file.delete()) {
235
                log.info(file.getName() + " is deleted!");
236
            } else {
237
                log.info("Delete operation   failed.");
238
            }
239

    
240
        } catch (Exception e) {
241

    
242
            log.error("Delete operation   failed" + table);
243

    
244
        }
245
    }
246

    
247
    public Logger getLog() {
248
        return log;
249
    }
250

    
251
    public void setLog(Logger log) {
252
        this.log = log;
253
    }
254

    
255
    public Properties getTableMappings() {
256
        return tableMappings;
257
    }
258

    
259
    public void setTableMappings(Properties tableMappings) {
260
        this.tableMappings = tableMappings;
261
    }
262

    
263
    public String getOutputPath() {
264
        return outputPath;
265
    }
266

    
267
    public void setOutputPath(String outputPath) {
268
        this.outputPath = outputPath;
269
    }
270

    
271
    public String getDbUser() {
272
        return dbUser;
273
    }
274

    
275
    public void setDbUser(String dbUser) {
276
        this.dbUser = dbUser;
277
    }
278

    
279
    public String getDbPass() {
280
        return dbPass;
281
    }
282

    
283
    public void setDbPass(String dbPass) {
284
        this.dbPass = dbPass;
285
    }
286

    
287
    public String getConnectionUrl() {
288
        return connectionUrl;
289
    }
290

    
291
    public void setConnectionUrl(String connectionUrl) {
292
        this.connectionUrl = connectionUrl;
293
    }
294

    
295
    public String getRecsPerStatement() {
296
        return RecsPerStatement;
297
    }
298

    
299
    public void setRecsPerStatement(String recsPerStatement) {
300
        RecsPerStatement = recsPerStatement;
301
    }
302

    
303
    public String getStatementPerTrans() {
304
        return StatementPerTrans;
305
    }
306

    
307
    public void setStatementPerTrans(String statementPerTrans) {
308
        StatementPerTrans = statementPerTrans;
309
    }
310

    
311
    public String getDelim() {
312
        return delim;
313
    }
314

    
315
    public void setDelim(String delim) {
316
        this.delim = delim;
317
    }
318

    
319
    public String getSqoopReducersCount() {
320
        return sqoopReducersCount;
321
    }
322

    
323
    public void setSqoopReducersCount(String sqoopReducersCount) {
324
        this.sqoopReducersCount = sqoopReducersCount;
325
    }
326

    
327
    public FileSystem getHdfs() {
328
        return hdfs;
329
    }
330

    
331
    public void setHdfs(FileSystem hdfs) {
332
        this.hdfs = hdfs;
333
    }
334

    
335
    public Configuration getConf() {
336
        return conf;
337
    }
338

    
339
    public void setConf(Configuration conf) {
340
        this.conf = conf;
341
    }
342

    
343
    public boolean isUseHdfsStore() {
344
        return useHdfsStore;
345
    }
346

    
347
    public void setUseHdfsStore(boolean useHdfsStore) {
348
        this.useHdfsStore = useHdfsStore;
349
    }
350

    
351
    public boolean isBatch() {
352
        return batch;
353
    }
354

    
355
    public void setBatch(boolean batch) {
356
        this.batch = batch;
357
    }
358

    
359
    public boolean isVerbose() {
360
        return verbose;
361
    }
362

    
363
    public void setVerbose(boolean verbose) {
364
        this.verbose = verbose;
365
    }
366

    
367
    public String getTableMapConf() {
368
        return tableMapConf;
369
    }
370

    
371
    public void setTableMapConf(String tableMapConf) {
372
        this.tableMapConf = tableMapConf;
373
    }
374

    
375
    public String getEnclosed() {
376
        return enclosed;
377
    }
378

    
379
    public void setEnclosed(String enclosed) {
380
        this.enclosed = enclosed;
381
    }
382

    
383
}
(2-2/2)