Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.daos;
2

    
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.fs.FSDataInputStream;
5
import org.apache.hadoop.fs.FileSystem;
6
import org.apache.hadoop.fs.Path;
7
import org.apache.log4j.Logger;
8
import org.junit.Test;
9
import org.postgresql.copy.CopyManager;
10
import org.postgresql.core.BaseConnection;
11
import org.springframework.jdbc.core.JdbcTemplate;
12
import org.springframework.jdbc.core.simple.SimpleJdbcCall;
13
import org.springframework.jdbc.datasource.DriverManagerDataSource;
14

    
15
import java.io.FileInputStream;
16
import java.io.InputStream;
17
import java.sql.CallableStatement;
18
import java.sql.Connection;
19
import java.sql.ResultSet;
20
import java.sql.ResultSetMetaData;
21

    
22
/**
23
 * @author eri
24
 *         <p/>
25
 *         DB DAO with methods for creating the temporary shadow schema of the
26
 *         statsDB, executing extra inserts, creating views and indexes .
27
 */
28
public class StatsDAO {
29
    private org.springframework.jdbc.datasource.DriverManagerDataSource statsDatasource;
30

    
31
    private Logger log = Logger.getLogger(this.getClass());
32
    private long startTime;
33
    private long endtime;
34

    
35
    public StatsDAO(String dbUrl, String dbUser, String dbPassword, String dbDriver) {
36
        statsDatasource = new DriverManagerDataSource(dbUrl, dbUser, dbPassword);
37
        statsDatasource.setDriverClassName(dbDriver);
38
    }
39

    
40
    /**
41
     * Reads create schema script from a file and executes it. Used to create a
42
     * temporary Shadow schema for data import
43
     *
44
     * @throws Exception
45
     */
46

    
47
    public void createSchema() throws Exception {
48

    
49
        InputStream in = ClassLoader.getSystemResourceAsStream("eu/dnetlib/data/mapreduce/hbase/statsExport/stats_db_schema.sql");
50
        byte[] b = new byte[in.available()];
51
        in.read(b);
52
        String q = new String(b);
53

    
54
        in.close();
55
        Connection con = null;
56
        log.info(" Creating Shadow Schema Database   ....");
57

    
58
        try {
59

    
60
            con = statsDatasource.getConnection();
61
            CallableStatement st = con.prepareCall(q);
62
            st.execute();
63
            st.close();
64

    
65
        } catch (Exception e) {
66
            log.error("Could not   Create Tables in  Temporary database  :" + e.getMessage());
67
            throw new Exception("Could not Create Tables in Temporary database  :", e);
68
        } finally {
69
            con.close();
70
        }
71

    
72
        log.info("Shadow Schema created.");
73
    }
74

    
75

    
76
    /**
77
     * Sets the default schema (search path ) for the sqoop user.
78
     *
79
     * @param user
80
     * @throws Exception
81
     */
82
    public void setSearchPathDB(String user) throws Exception {
83

    
84

    
85
        String q = "ALTER ROLE " + user + " SET search_path TO shadow,public; ";
86
        Connection con = null;
87
        try {
88
            con = statsDatasource.getConnection();
89

    
90
            CallableStatement st = con.prepareCall(q);
91
            st.execute();
92
            st.close();
93
        } catch (Exception e) {
94
            log.error("Could not change default schema for user :" + e.getMessage());
95
            throw new Exception("Could not change default schema for user ", e);
96
        } finally {
97
            con.close();
98
        }
99

    
100
        log.info(" Default user schema set to shadow");
101
    }
102

    
103

    
104
    /**
105
     * Keeps a backup of the current public schema and then replaces it with the new  Shadow one.
106
     *
107
     * @throws Exception
108
     */
109
    public void renameSchema() throws Exception {
110
        Connection con = statsDatasource.getConnection();
111
        String q = "DROP SCHEMA IF EXISTS backup CASCADE;" +
112
                "DROP SCHEMA IF EXISTS public CASCADE;" +
113
                " ALTER SCHEMA shadow RENAME TO public; ";
114

    
115
        try {
116
            CallableStatement st = con.prepareCall(q);
117
            log.info("Renaming  Shadow  schema to  Public,,,");
118
            st.execute();
119
            st.close();
120
            con.close();
121

    
122
            log.info("Renaming done");
123

    
124
        } catch (Exception e) {
125
            log.error("Could not rename schema " + e.getMessage());
126
            throw new Exception("Could not rename schema  ", e);
127
        } finally {
128
            con.close();
129
        }
130
        log.info("Schema renamed");
131
    }
132

    
133
    public void cleanTables() throws Exception {
134
        log.info(" Cleaning Tables...");
135
        Connection con = statsDatasource.getConnection();
136
        startTime = System.currentTimeMillis();
137
        String q = "CREATE TABLE \"shadow\".rd_distinct AS SELECT DISTINCT * FROM \"shadow\".result_datasources;";
138
        CallableStatement st = con.prepareCall(q);
139
        st.execute();
140
        st.close();
141

    
142
        q = "TRUNCATE \"shadow\".result_datasources;";
143
        st = con.prepareCall(q);
144
        st.execute();
145
        st.close();
146

    
147
        q = "INSERT INTO \"shadow\".result_datasources SELECT * FROM \"shadow\".rd_distinct;";
148
        st = con.prepareCall(q);
149
        st.execute();
150
        st.close();
151

    
152
        q = "DROP TABLE \"shadow\".rd_distinct;";
153
        st = con.prepareCall(q);
154
        st.execute();
155
        st.close();
156

    
157
        con.close();
158
        endtime = System.currentTimeMillis();
159
        log.info("Time to clean tables : " + ((endtime - startTime) / 60000) + " minutes ");
160
    }
161

    
162
    /**
163
     * Create charts.
164
     *
165
     * @throws Exception
166
     */
167
    public void createCharts() throws Exception {
168
        log.info(" Creating Chart Tables...");
169
        Connection con = statsDatasource.getConnection();
170

    
171
        startTime = System.currentTimeMillis();
172
        String q = "{call create_charts()}";
173

    
174
        CallableStatement st = con.prepareCall(q);
175
        st.execute();
176

    
177
        st.close();
178
        con.close();
179

    
180
        endtime = System.currentTimeMillis();
181
        log.info("Time to create chart tables: " + ((endtime - startTime) / 60000) + " minutes ");
182
    }
183

    
184
    /**
185
     * Create chart indexes.
186
     *
187
     * @throws Exception
188
     */
189
    public void createChartIndexes() throws Exception {
190
        log.info(" Create Chart Indexes...");
191
        Connection con = statsDatasource.getConnection();
192

    
193
        startTime = System.currentTimeMillis();
194
        String q = "{call create_chart_indexes()}";
195

    
196
        CallableStatement st = con.prepareCall(q);
197
        st.execute();
198

    
199
        st.close();
200
        con.close();
201

    
202
        endtime = System.currentTimeMillis();
203
        log.info("Time to create chart indexes : " + ((endtime - startTime) / 60000) + " minutes ");
204
    }
205

    
206
    /**
207
     * Builds indexes.
208
     *
209
     * @throws Exception
210
     */
211
    public void buildIndexes() throws Exception {
212
        log.info(" Building Database Indexes...");
213
        Connection con = statsDatasource.getConnection();
214
        startTime = System.currentTimeMillis();
215
        String q = "{call create_indexes()}";
216
        CallableStatement st = con.prepareCall(q);
217
        st.execute();
218
        st.close();
219
        con.close();
220
        endtime = System.currentTimeMillis();
221
        log.info("Time to build indexes : " + ((endtime - startTime) / 60000) + " minutes ");
222
    }
223

    
224
    /**
225
     * Builds views.
226
     *
227
     * @throws Exception
228
     */
229
    public void buildViews() throws Exception {
230
        log.info(" Building Database Views...");
231
        Connection con = statsDatasource.getConnection();
232

    
233
        startTime = System.currentTimeMillis();
234
        String q = "{call create_views()}";
235

    
236
        CallableStatement st = con.prepareCall(q);
237
        st.execute();
238

    
239
        st.close();
240
        con.close();
241

    
242
        endtime = System.currentTimeMillis();
243
        log.info("Time to build Views : " + ((endtime - startTime) / 60000) + " minutes ");
244
    }
245

    
246
    /**
247
     * Executes extra inserts ( table updates )
248
     *
249
     * @throws Exception
250
     */
251
    public void executeExtraInserts() throws Exception {
252
        log.info("  Executing Extra Inserts...");
253

    
254
        startTime = System.currentTimeMillis();
255

    
256
        populateDefaults();
257
        update_project_results();
258
        //update_project_has_pubs();
259
        //update_project_pubs_count();
260
        //update_project_delated_pubs();
261
        //update_project_daysforlastpub();
262
        //update_project_delayed_pubs();
263
        cleanUp();
264

    
265
        endtime = System.currentTimeMillis();
266
        log.info("Total time for Extra Inserts : " + ((endtime - startTime) / 60000) + " minutes ");
267
    }
268

    
269

    
270
    public void populateDefaults() throws Exception {
271
        startTime = System.currentTimeMillis();
272
        log.info("  Populating defaults table and Updating Datasources...");
273

    
274
        Connection con = statsDatasource.getConnection();
275

    
276
        startTime = System.currentTimeMillis();
277
        String q = "{call extra_defaults_datasource()}";
278

    
279
        CallableStatement st = con.prepareCall(q);
280
        st.execute();
281

    
282
        st.close();
283
        con.close();
284
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
285
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
286
        //new SimpleJdbcCall(statsDatasource).withProcedureName("extra_defaults_datasource");
287
        endtime = System.currentTimeMillis();
288
        log.info("Time  for   Populating defaults : " + ((endtime - startTime) / 60000) + " minutes ");
289
    }
290

    
291

    
292
    public void update_project_results() throws Exception {
293
        startTime = System.currentTimeMillis();
294
        log.info("   Updating Project Results...");
295

    
296
        Connection con = statsDatasource.getConnection();
297

    
298
        startTime = System.currentTimeMillis();
299
        String q = "{call update_project_results()}";
300

    
301
        CallableStatement st = con.prepareCall(q);
302
        st.execute();
303

    
304
        st.close();
305
        con.close();
306
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
307
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
308
        //new SimpleJdbcCall(statsDatasource).withProcedureName("update_project_results");
309
        log.info("Done.");
310
        endtime = System.currentTimeMillis();
311
        log.info("Time  for  Updating Project Results: " + ((endtime - startTime) / 60000) + " minutes ");
312
    }
313

    
314

    
315
    public void update_project_has_pubs() throws Exception {
316

    
317
        startTime = System.currentTimeMillis();
318
        log.info("   Updating Project with Publications..");
319
        Connection con = statsDatasource.getConnection();
320

    
321
        startTime = System.currentTimeMillis();
322
        String q = "{call project_has_pubs()}";
323

    
324
        CallableStatement st = con.prepareCall(q);
325
        st.execute();
326

    
327
        st.close();
328
        con.close();
329
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
330
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
331
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_has_pubs");
332
        log.info("Done.");
333
        endtime = System.currentTimeMillis();
334
        log.info("Time  for Updating Project with Publications : " + ((endtime - startTime) / 60000) + " minutes ");
335
    }
336

    
337
    public void update_project_pubs_count() throws Exception {
338

    
339
        startTime = System.currentTimeMillis();
340
        log.info("   Updating Project  Publications Count..");
341
        Connection con = statsDatasource.getConnection();
342

    
343
        startTime = System.currentTimeMillis();
344
        String q = "{call project_pubs_count()}";
345

    
346
        CallableStatement st = con.prepareCall(q);
347
        st.execute();
348

    
349
        st.close();
350
        con.close();
351
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
352
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
353
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_pubs_count");
354
        log.info("Done.");
355
        endtime = System.currentTimeMillis();
356
        log.info("Time  for Updating Project  Publications Count : " + ((endtime - startTime) / 60000) + " minutes ");
357
    }
358

    
359
    public void update_project_delated_pubs() throws Exception {
360

    
361
        startTime = System.currentTimeMillis();
362
        log.info("   Updating Project  Delayed Publications...");
363
        Connection con = statsDatasource.getConnection();
364

    
365
        startTime = System.currentTimeMillis();
366
        String q = "{call project_delayedpubs()}";
367

    
368
        CallableStatement st = con.prepareCall(q);
369
        st.execute();
370

    
371
        st.close();
372
        con.close();
373
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
374
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
375
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_delayedpubs");
376

    
377
        log.info("Done.");
378
        endtime = System.currentTimeMillis();
379
        log.info("Time  for Updating Project  Delayed Publications : " + ((endtime - startTime) / 60000) + " minutes ");
380
    }
381

    
382

    
383
    public void update_project_daysforlastpub() throws Exception {
384

    
385
        startTime = System.currentTimeMillis();
386
        log.info("Updating Project Days For Last Publication...");
387
        Connection con = statsDatasource.getConnection();
388

    
389
        startTime = System.currentTimeMillis();
390
        String q = "{call project_daysforlastpub()}";
391

    
392
        CallableStatement st = con.prepareCall(q);
393
        st.execute();
394

    
395
        st.close();
396
        con.close();
397

    
398
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
399
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
400
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_daysforlastpub");
401
        log.info("Done.");
402

    
403
        endtime = System.currentTimeMillis();
404
        log.info("Time  for Updating Project Days For Last Publication   : " + ((endtime - startTime) / 60000) + " minutes ");
405
    }
406

    
407
    public void update_project_delayed_pubs() throws Exception {
408

    
409
        startTime = System.currentTimeMillis();
410
        log.info("   Updating Project  Delayed  Publications...");
411
        Connection con = statsDatasource.getConnection();
412

    
413
        startTime = System.currentTimeMillis();
414
        String q = "{call project_delayed()}";
415

    
416
        CallableStatement st = con.prepareCall(q);
417
        st.execute();
418

    
419
        st.close();
420
        con.close();
421

    
422
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
423
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
424
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_delayed");
425
        log.info("Done.");
426
        endtime = System.currentTimeMillis();
427
        log.info("Time  for Updating  Project  Delayed  Publications : " + ((endtime - startTime) / 60000) + " minutes ");
428
    }
429

    
430
    public void cleanUp() throws Exception {
431

    
432
        startTime = System.currentTimeMillis();
433
        log.info("  Cleaning Up...");
434
        Connection con = statsDatasource.getConnection();
435

    
436
        startTime = System.currentTimeMillis();
437
        String q = "{call cleanTemps()}";
438

    
439
        CallableStatement st = con.prepareCall(q);
440
        st.execute();
441

    
442
        st.close();
443
        con.close();
444

    
445
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
446
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
447
        //new SimpleJdbcCall(statsDatasource).withProcedureName("cleanTemps");
448
        log.info("Done.");
449
        endtime = System.currentTimeMillis();
450
        log.info("Time  for  cleaning Up : " + ((endtime - startTime) / 60000) + " minutes ");
451
    }
452

    
453

    
454
    /**
455
     * Loads bulk data for Context( and category) tables as generated from the
456
     * IS xml
457
     *
458
     * @throws Exception
459
     */
460

    
461
    public void insertContexts(String fileDir) throws Exception {
462

    
463
        try {
464
            if (!fileDir.endsWith("/")) {
465
                fileDir += "/";
466
            }
467
            copy("context", fileDir + "context");
468
            copy("concept", fileDir + "concept");
469
            copy("category", fileDir + "category");
470

    
471
        } catch (Exception e) {
472
            log.error("Could not import  context in  new database  :" + e);
473
            throw new Exception("Could not import context in new database  :", e);
474
        }
475

    
476
        log.info(" Done.");
477
    }
478

    
479

    
480
    /**
481
     * Calls the COPY command for the given @table and inserts data from a file
482
     *
483
     * @param table
484
     * @throws Exception
485
     */
486

    
487
    public void copy(String table, String file) throws Exception {
488
        CopyManager copyManager;
489
        FileInputStream inputStream = null;
490
        FileSystem fs = FileSystem.get(new Configuration());
491
        Connection con = null;
492
        try {
493
            FSDataInputStream fin = fs.open(new Path(file));
494

    
495
            con = statsDatasource.getConnection();
496

    
497
            copyManager = new CopyManager((BaseConnection) con);
498

    
499
            inputStream = new FileInputStream(file);
500

    
501
            copyManager.copyIn("COPY " + table + " FROM STDIN ", fin);
502
            log.debug("Copy done for " + table);
503
            fin.close();
504

    
505
        } catch (Exception e) {
506
            log.error("Fail to execute copy command for table " + table + e);
507
            throw new Exception("Fail to execute copy command for table " + table, e);
508
        } finally {
509
            con.close();
510
            inputStream.close();
511
        }
512

    
513
    }
514

    
515

    
516
    public String getResults(ResultSet rs) throws Exception {
517
        String data = new String();
518
        try {
519

    
520
            log.info("Generated Report : ");
521

    
522
            ResultSetMetaData rsmd = rs.getMetaData();
523
            while (rs.next()) {
524
                for (int i = 1; i < rsmd.getColumnCount() - 1; i++) {
525
                    data += rsmd.getColumnName(i) + "," + rs.getInt(i) + "\n";
526
                }
527

    
528
            }
529
            log.info(" Generated Data :" + data);
530
            return data;
531

    
532
        } catch (Exception e) {
533
            log.error("Could not import  context in  new database :" + e);
534
            throw new Exception("Could not import context in new database :", e);
535
        } finally {
536
            rs.close();
537
        }
538

    
539
    }
540

    
541
    public DriverManagerDataSource getStatsDatasource() {
542
        return statsDatasource;
543
    }
544

    
545
    public void setStatsDatasource(DriverManagerDataSource statsDatasource) {
546
        this.statsDatasource = statsDatasource;
547
    }
548
}
    (1-1/1)