Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.daos;
2

    
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.fs.FSDataInputStream;
5
import org.apache.hadoop.fs.FileSystem;
6
import org.apache.hadoop.fs.Path;
7
import org.apache.log4j.Logger;
8
import org.junit.Test;
9
import org.postgresql.copy.CopyManager;
10
import org.postgresql.core.BaseConnection;
11
import org.springframework.jdbc.core.JdbcTemplate;
12
import org.springframework.jdbc.core.simple.SimpleJdbcCall;
13
import org.springframework.jdbc.datasource.DriverManagerDataSource;
14

    
15
import java.io.FileInputStream;
16
import java.io.InputStream;
17
import java.sql.CallableStatement;
18
import java.sql.Connection;
19
import java.sql.ResultSet;
20
import java.sql.ResultSetMetaData;
21

    
22
/**
23
 * @author eri
24
 *         <p/>
25
 *         DB DAO with methods for creating the temporary shadow schema of the
26
 *         statsDB, executing extra inserts, creating views and indexes .
27
 */
28
public class StatsDAO {
29
    private org.springframework.jdbc.datasource.DriverManagerDataSource statsDatasource;
30

    
31
    private Logger log = Logger.getLogger(this.getClass());
32
    private long startTime;
33
    private long endtime;
34

    
35
    public StatsDAO(String dbUrl, String dbUser, String dbPassword, String dbDriver) {
36
        statsDatasource = new DriverManagerDataSource(dbUrl, dbUser, dbPassword);
37
        statsDatasource.setDriverClassName(dbDriver);
38
    }
39

    
40
    /**
41
     * Reads create schema script from a file and executes it. Used to create a
42
     * temporary Shadow schema for data import
43
     *
44
     * @throws Exception
45
     */
46

    
47
    public void createSchema() throws Exception {
48

    
49
        InputStream in = ClassLoader.getSystemResourceAsStream("eu/dnetlib/data/mapreduce/hbase/statsExport/stats_db_schema.sql");
50
        byte[] b = new byte[in.available()];
51
        in.read(b);
52
        String q = new String(b);
53

    
54
        in.close();
55
        Connection con = null;
56
        log.info(" Creating Shadow Schema Database   ....");
57

    
58
        try {
59

    
60
            con = statsDatasource.getConnection();
61
            CallableStatement st = con.prepareCall(q);
62
            st.execute();
63
            st.close();
64

    
65
        } catch (Exception e) {
66
            log.error("Could not   Create Tables in  Temporary database  :" + e.getMessage());
67
            throw new Exception("Could not Create Tables in Temporary database  :", e);
68
        } finally {
69
            con.close();
70
        }
71

    
72
        log.info("Shadow Schema created.");
73
    }
74

    
75

    
76
    /**
77
     * Sets the default schema (search path ) for the sqoop user.
78
     *
79
     * @param user
80
     * @throws Exception
81
     */
82
    public void setSearchPathDB(String user) throws Exception {
83

    
84

    
85
        String q = "ALTER ROLE " + user + " SET search_path TO shadow,public; ";
86
        Connection con = null;
87
        try {
88
            con = statsDatasource.getConnection();
89

    
90
            CallableStatement st = con.prepareCall(q);
91
            st.execute();
92
            st.close();
93
        } catch (Exception e) {
94
            log.error("Could not change default schema for user :" + e.getMessage());
95
            throw new Exception("Could not change default schema for user ", e);
96
        } finally {
97
            con.close();
98
        }
99

    
100
        log.info(" Default user schema set to shadow");
101
    }
102

    
103

    
104
    /**
105
     * Keeps a backup of the current public schema and then replaces it with the new  Shadow one.
106
     *
107
     * @throws Exception
108
     */
109
    public void renameSchema() throws Exception {
110
        Connection con = statsDatasource.getConnection();
111
        String q = "DROP SCHEMA IF EXISTS backup CASCADE;" +
112
                "DROP SCHEMA IF EXISTS public CASCADE;" +
113
                " ALTER SCHEMA shadow RENAME TO public; ";
114

    
115
        try {
116
            CallableStatement st = con.prepareCall(q);
117
            log.info("Renaming  Shadow  schema to  Public,,,");
118
            st.execute();
119
            st.close();
120
            con.close();
121

    
122
            log.info("Renaming done");
123

    
124
        } catch (Exception e) {
125
            log.error("Could not rename schema " + e.getMessage());
126
            throw new Exception("Could not rename schema  ", e);
127
        } finally {
128
            con.close();
129
        }
130
        log.info("Schema renamed");
131
    }
132

    
133
    public void fix_integer_ids() throws Exception {
134
        log.info("fixing integer ids...");
135
        Connection con = statsDatasource.getConnection();
136

    
137
        startTime = System.currentTimeMillis();
138
        String q = "{call shadow.fix_result_ids()}";
139

    
140
        CallableStatement st = con.prepareCall(q);
141
        st.execute();
142

    
143
        st.close();
144
        con.close();
145

    
146
        endtime = System.currentTimeMillis();
147
        log.info("Time to fix integer ids: " + ((endtime - startTime) / 60000) + " minutes ");
148
    }
149

    
150
    public void addArrayColumns() throws Exception {
151
        log.info("Adding new columns ...");
152
        Connection con = statsDatasource.getConnection();
153

    
154
        startTime = System.currentTimeMillis();
155
        String q = "{call shadow.create_arrays()}";
156

    
157
        CallableStatement st = con.prepareCall(q);
158
        st.execute();
159

    
160
        st.close();
161
        con.close();
162

    
163
        endtime = System.currentTimeMillis();
164
        log.info("Time to add columns: " + ((endtime - startTime) / 60000) + " minutes ");
165
    }
166

    
167
    public void cleanTables() throws Exception {
168
        log.info(" Cleaning Tables...");
169
        Connection con = statsDatasource.getConnection();
170
        startTime = System.currentTimeMillis();
171
        String q = "CREATE TABLE \"shadow\".rd_distinct AS SELECT DISTINCT * FROM \"shadow\".result_datasources;";
172
        CallableStatement st = con.prepareCall(q);
173
        st.execute();
174
        st.close();
175

    
176
        q = "TRUNCATE \"shadow\".result_datasources;";
177
        st = con.prepareCall(q);
178
        st.execute();
179
        st.close();
180

    
181
        q = "INSERT INTO \"shadow\".result_datasources SELECT * FROM \"shadow\".rd_distinct;";
182
        st = con.prepareCall(q);
183
        st.execute();
184
        st.close();
185

    
186
        q = "DROP TABLE \"shadow\".rd_distinct;";
187
        st = con.prepareCall(q);
188
        st.execute();
189
        st.close();
190

    
191
        con.close();
192
        endtime = System.currentTimeMillis();
193
        log.info("Time to clean tables : " + ((endtime - startTime) / 60000) + " minutes ");
194
    }
195

    
196
    /**
197
     * Create charts.
198
     *
199
     * @throws Exception
200
     */
201
    public void createCharts() throws Exception {
202
        log.info(" Creating Chart Tables...");
203
        Connection con = statsDatasource.getConnection();
204

    
205
        startTime = System.currentTimeMillis();
206
        String q = "{call shadow.create_charts()}";
207

    
208
        CallableStatement st = con.prepareCall(q);
209
        st.execute();
210

    
211
        st.close();
212
        con.close();
213

    
214
        endtime = System.currentTimeMillis();
215
        log.info("Time to create chart tables: " + ((endtime - startTime) / 60000) + " minutes ");
216
    }
217

    
218
    /**
219
     * Create chart indexes.
220
     *
221
     * @throws Exception
222
     */
223
    public void createChartIndexes() throws Exception {
224
        log.info(" Create Chart Indexes...");
225
        Connection con = statsDatasource.getConnection();
226

    
227
        startTime = System.currentTimeMillis();
228
        String q = "{call shadow.create_chart_indexes()}";
229

    
230
        CallableStatement st = con.prepareCall(q);
231
        st.execute();
232

    
233
        st.close();
234
        con.close();
235

    
236
        endtime = System.currentTimeMillis();
237
        log.info("Time to create chart indexes : " + ((endtime - startTime) / 60000) + " minutes ");
238
    }
239

    
240
    /**
241
     * Builds indexes.
242
     *
243
     * @throws Exception
244
     */
245
    public void buildIndexes() throws Exception {
246
        log.info(" Building Database Indexes...");
247
        Connection con = statsDatasource.getConnection();
248
        startTime = System.currentTimeMillis();
249
        String q = "{call shadow.create_indexes()}";
250
        CallableStatement st = con.prepareCall(q);
251
        st.execute();
252
        st.close();
253
        con.close();
254
        endtime = System.currentTimeMillis();
255
        log.info("Time to build indexes : " + ((endtime - startTime) / 60000) + " minutes ");
256
    }
257

    
258
    /**
259
     * Builds views.
260
     *
261
     * @throws Exception
262
     */
263
    public void buildViews() throws Exception {
264
        log.info(" Building Database Views...");
265
        Connection con = statsDatasource.getConnection();
266

    
267
        startTime = System.currentTimeMillis();
268
        String q = "{call shadow.create_views()}";
269

    
270
        CallableStatement st = con.prepareCall(q);
271
        st.execute();
272

    
273
        st.close();
274
        con.close();
275

    
276
        endtime = System.currentTimeMillis();
277
        log.info("Time to build Views : " + ((endtime - startTime) / 60000) + " minutes ");
278
    }
279

    
280
    /**
281
     * Executes extra inserts ( table updates )
282
     *
283
     * @throws Exception
284
     */
285
    public void executeExtraInserts() throws Exception {
286
        log.info("  Executing Extra Inserts...");
287

    
288
        startTime = System.currentTimeMillis();
289

    
290
        populateDefaults();
291
        //update_project_results();
292
        update_project_has_pubs();
293
        update_project_pubs_count();
294
        update_project_delated_pubs();
295
        update_project_daysforlastpub();
296
        update_project_delayed_pubs();
297
        cleanUp();
298

    
299
        endtime = System.currentTimeMillis();
300
        log.info("Total time for Extra Inserts : " + ((endtime - startTime) / 60000) + " minutes ");
301
    }
302

    
303

    
304
    public void populateDefaults() throws Exception {
305
        startTime = System.currentTimeMillis();
306
        log.info("  Populating defaults table and Updating Datasources...");
307

    
308
        Connection con = statsDatasource.getConnection();
309

    
310
        startTime = System.currentTimeMillis();
311
        String q = "{call shadow.extra_defaults_datasource()}";
312

    
313
        CallableStatement st = con.prepareCall(q);
314
        st.execute();
315

    
316
        st.close();
317
        con.close();
318
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
319
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
320
        //new SimpleJdbcCall(statsDatasource).withProcedureName("extra_defaults_datasource");
321
        endtime = System.currentTimeMillis();
322
        log.info("Time  for   Populating defaults : " + ((endtime - startTime) / 60000) + " minutes ");
323
    }
324

    
325

    
326
    public void update_project_results() throws Exception {
327
        startTime = System.currentTimeMillis();
328
        log.info("   Updating Project Results...");
329

    
330
        Connection con = statsDatasource.getConnection();
331

    
332
        startTime = System.currentTimeMillis();
333
        String q = "{call shadow.update_project_results()}";
334

    
335
        CallableStatement st = con.prepareCall(q);
336
        st.execute();
337

    
338
        st.close();
339
        con.close();
340
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
341
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
342
        //new SimpleJdbcCall(statsDatasource).withProcedureName("update_project_results");
343
        log.info("Done.");
344
        endtime = System.currentTimeMillis();
345
        log.info("Time  for  Updating Project Results: " + ((endtime - startTime) / 60000) + " minutes ");
346
    }
347

    
348

    
349
    public void update_project_has_pubs() throws Exception {
350

    
351
        startTime = System.currentTimeMillis();
352
        log.info("   Updating Project with Publications..");
353
        Connection con = statsDatasource.getConnection();
354

    
355
        startTime = System.currentTimeMillis();
356
        String q = "{call shadow.project_has_pubs()}";
357

    
358
        CallableStatement st = con.prepareCall(q);
359
        st.execute();
360

    
361
        st.close();
362
        con.close();
363
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
364
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
365
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_has_pubs");
366
        log.info("Done.");
367
        endtime = System.currentTimeMillis();
368
        log.info("Time  for Updating Project with Publications : " + ((endtime - startTime) / 60000) + " minutes ");
369
    }
370

    
371
    public void update_project_pubs_count() throws Exception {
372

    
373
        startTime = System.currentTimeMillis();
374
        log.info("   Updating Project  Publications Count..");
375
        Connection con = statsDatasource.getConnection();
376

    
377
        startTime = System.currentTimeMillis();
378
        String q = "{call shadow.project_pubs_count()}";
379

    
380
        CallableStatement st = con.prepareCall(q);
381
        st.execute();
382

    
383
        st.close();
384
        con.close();
385
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
386
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
387
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_pubs_count");
388
        log.info("Done.");
389
        endtime = System.currentTimeMillis();
390
        log.info("Time  for Updating Project  Publications Count : " + ((endtime - startTime) / 60000) + " minutes ");
391
    }
392

    
393
    public void update_project_delated_pubs() throws Exception {
394

    
395
        startTime = System.currentTimeMillis();
396
        log.info("   Updating Project  Delayed Publications...");
397
        Connection con = statsDatasource.getConnection();
398

    
399
        startTime = System.currentTimeMillis();
400
        String q = "{call shadow.project_delayedpubs()}";
401

    
402
        CallableStatement st = con.prepareCall(q);
403
        st.execute();
404

    
405
        st.close();
406
        con.close();
407
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
408
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
409
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_delayedpubs");
410

    
411
        log.info("Done.");
412
        endtime = System.currentTimeMillis();
413
        log.info("Time  for Updating Project  Delayed Publications : " + ((endtime - startTime) / 60000) + " minutes ");
414
    }
415

    
416

    
417
    public void update_project_daysforlastpub() throws Exception {
418

    
419
        startTime = System.currentTimeMillis();
420
        log.info("Updating Project Days For Last Publication...");
421
        Connection con = statsDatasource.getConnection();
422

    
423
        startTime = System.currentTimeMillis();
424
        String q = "{call shadow.project_daysforlastpub()}";
425

    
426
        CallableStatement st = con.prepareCall(q);
427
        st.execute();
428

    
429
        st.close();
430
        con.close();
431

    
432
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
433
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
434
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_daysforlastpub");
435
        log.info("Done.");
436

    
437
        endtime = System.currentTimeMillis();
438
        log.info("Time  for Updating Project Days For Last Publication   : " + ((endtime - startTime) / 60000) + " minutes ");
439
    }
440

    
441
    public void update_project_delayed_pubs() throws Exception {
442

    
443
        startTime = System.currentTimeMillis();
444
        log.info("   Updating Project  Delayed  Publications...");
445
        Connection con = statsDatasource.getConnection();
446

    
447
        startTime = System.currentTimeMillis();
448
        String q = "{call shadow.project_delayed()}";
449

    
450
        CallableStatement st = con.prepareCall(q);
451
        st.execute();
452

    
453
        st.close();
454
        con.close();
455

    
456
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
457
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
458
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_delayed");
459
        log.info("Done.");
460
        endtime = System.currentTimeMillis();
461
        log.info("Time  for Updating  Project  Delayed  Publications : " + ((endtime - startTime) / 60000) + " minutes ");
462
    }
463

    
464
    public void insertCountries() throws Exception {
465

    
466
        startTime = System.currentTimeMillis();
467
        log.info("Inserting countries...");
468
        Connection con = statsDatasource.getConnection();
469

    
470
        startTime = System.currentTimeMillis();
471
        String q = "{call shadow.insert_countries()}";
472

    
473
        CallableStatement st = con.prepareCall(q);
474
        st.execute();
475

    
476
        st.close();
477
        con.close();
478

    
479
        log.info("Done.");
480
        endtime = System.currentTimeMillis();
481
        log.info("Time  for inserting countries: " + ((endtime - startTime) / 60000) + " minutes ");
482
    }
483

    
484
    public void backwardsCompatibility() throws Exception {
485

    
486
        startTime = System.currentTimeMillis();
487
        log.info("Ensuring backwards compatibility");
488
        Connection con = statsDatasource.getConnection();
489

    
490
        startTime = System.currentTimeMillis();
491
        String q = "{call shadow.backwards_compatibility()}";
492

    
493
        CallableStatement st = con.prepareCall(q);
494
        st.execute();
495

    
496
        st.close();
497
        con.close();
498

    
499
        log.info("Done.");
500
        endtime = System.currentTimeMillis();
501
        log.info("Time  for ensuring backwards compatibility: " + ((endtime - startTime) / 60000) + " minutes ");
502
    }
503

    
504
    public void cleanUp() throws Exception {
505

    
506
        startTime = System.currentTimeMillis();
507
        log.info("  Cleaning Up...");
508
        Connection con = statsDatasource.getConnection();
509

    
510
        startTime = System.currentTimeMillis();
511
        String q = "{call shadow.cleanTemps()}";
512

    
513
        CallableStatement st = con.prepareCall(q);
514
        st.execute();
515

    
516
        st.close();
517
        con.close();
518

    
519
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
520
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
521
        //new SimpleJdbcCall(statsDatasource).withProcedureName("cleanTemps");
522
        log.info("Done.");
523
        endtime = System.currentTimeMillis();
524
        log.info("Time  for  cleaning Up : " + ((endtime - startTime) / 60000) + " minutes ");
525
    }
526

    
527

    
528
    /**
529
     * Loads bulk data for Context( and category) tables as generated from the
530
     * IS xml
531
     *
532
     * @throws Exception
533
     */
534

    
535
    public void insertContexts(String fileDir) throws Exception {
536

    
537
        try {
538
            if (!fileDir.endsWith("/")) {
539
                fileDir += "/";
540
            }
541
            copy("context", fileDir + "context");
542
            copy("concept", fileDir + "concept");
543
            copy("category", fileDir + "category");
544

    
545
        } catch (Exception e) {
546
            log.error("Could not import  context in  new database  :" + e);
547
            throw new Exception("Could not import context in new database  :", e);
548
        }
549

    
550
        log.info(" Done.");
551
    }
552

    
553

    
554
    /**
555
     * Calls the COPY command for the given @table and inserts data from a file
556
     *
557
     * @param table
558
     * @throws Exception
559
     */
560

    
561
    public void copy(String table, String file) throws Exception {
562
        CopyManager copyManager;
563
        FileInputStream inputStream = null;
564
        FileSystem fs = FileSystem.get(new Configuration());
565
        Connection con = null;
566
        try {
567
            FSDataInputStream fin = fs.open(new Path(file));
568

    
569
            con = statsDatasource.getConnection();
570

    
571
            copyManager = new CopyManager((BaseConnection) con);
572

    
573
            inputStream = new FileInputStream(file);
574

    
575
            copyManager.copyIn("COPY " + table + " FROM STDIN ", fin);
576
            log.debug("Copy done for " + table);
577
            fin.close();
578

    
579
        } catch (Exception e) {
580
            log.error("Fail to execute copy command for table " + table + e);
581
            throw new Exception("Fail to execute copy command for table " + table, e);
582
        } finally {
583
            con.close();
584
            inputStream.close();
585
        }
586

    
587
    }
588

    
589

    
590
    public String getResults(ResultSet rs) throws Exception {
591
        String data = new String();
592
        try {
593

    
594
            log.info("Generated Report : ");
595

    
596
            ResultSetMetaData rsmd = rs.getMetaData();
597
            while (rs.next()) {
598
                for (int i = 1; i < rsmd.getColumnCount() - 1; i++) {
599
                    data += rsmd.getColumnName(i) + "," + rs.getInt(i) + "\n";
600
                }
601

    
602
            }
603
            log.info(" Generated Data :" + data);
604
            return data;
605

    
606
        } catch (Exception e) {
607
            log.error("Could not import  context in  new database :" + e);
608
            throw new Exception("Could not import context in new database :", e);
609
        } finally {
610
            rs.close();
611
        }
612

    
613
    }
614

    
615
    public DriverManagerDataSource getStatsDatasource() {
616
        return statsDatasource;
617
    }
618

    
619
    public void setStatsDatasource(DriverManagerDataSource statsDatasource) {
620
        this.statsDatasource = statsDatasource;
621
    }
622
}
    (1-1/1)