Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.daos;
2

    
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.fs.FSDataInputStream;
5
import org.apache.hadoop.fs.FileSystem;
6
import org.apache.hadoop.fs.Path;
7
import org.apache.log4j.Logger;
8
import org.junit.Test;
9
import org.postgresql.copy.CopyManager;
10
import org.postgresql.core.BaseConnection;
11
import org.springframework.jdbc.core.JdbcTemplate;
12
import org.springframework.jdbc.core.simple.SimpleJdbcCall;
13
import org.springframework.jdbc.datasource.DriverManagerDataSource;
14

    
15
import java.io.FileInputStream;
16
import java.io.InputStream;
17
import java.sql.CallableStatement;
18
import java.sql.Connection;
19
import java.sql.ResultSet;
20
import java.sql.ResultSetMetaData;
21

    
22
/**
23
 * @author eri
24
 *         <p/>
25
 *         DB DAO with methods for creating the temporary shadow schema of the
26
 *         statsDB, executing extra inserts, creating views and indexes .
27
 */
28
public class StatsDAO {
29
    private org.springframework.jdbc.datasource.DriverManagerDataSource statsDatasource;
30

    
31
    private Logger log = Logger.getLogger(this.getClass());
32
    private long startTime;
33
    private long endtime;
34

    
35
    public StatsDAO(String dbUrl, String dbUser, String dbPassword, String dbDriver) {
36
        statsDatasource = new DriverManagerDataSource(dbUrl, dbUser, dbPassword);
37
        statsDatasource.setDriverClassName(dbDriver);
38
    }
39

    
40
    /**
41
     * Reads create schema script from a file and executes it. Used to create a
42
     * temporary Shadow schema for data import
43
     *
44
     * @throws Exception
45
     */
46

    
47
    public void createSchema() throws Exception {
48

    
49
        InputStream in = ClassLoader.getSystemResourceAsStream("eu/dnetlib/data/mapreduce/hbase/statsExport/stats_db_schema.sql");
50
        byte[] b = new byte[in.available()];
51
        in.read(b);
52
        String q = new String(b);
53

    
54
        in.close();
55
        Connection con = null;
56
        log.info(" Creating Shadow Schema Database   ....");
57

    
58
        try {
59

    
60
            con = statsDatasource.getConnection();
61
            CallableStatement st = con.prepareCall(q);
62
            st.execute();
63
            st.close();
64

    
65
        } catch (Exception e) {
66
            log.error("Could not   Create Tables in  Temporary database  :" + e.getMessage());
67
            throw new Exception("Could not Create Tables in Temporary database  :", e);
68
        } finally {
69
            con.close();
70
        }
71

    
72
        log.info("Shadow Schema created.");
73
    }
74

    
75

    
76
    /**
77
     * Sets the default schema (search path ) for the sqoop user.
78
     *
79
     * @param user
80
     * @throws Exception
81
     */
82
    public void setSearchPathDB(String user) throws Exception {
83

    
84

    
85
        String q = "ALTER ROLE " + user + " SET search_path TO shadow,public; ";
86
        Connection con = null;
87
        try {
88
            con = statsDatasource.getConnection();
89

    
90
            CallableStatement st = con.prepareCall(q);
91
            st.execute();
92
            st.close();
93
        } catch (Exception e) {
94
            log.error("Could not change default schema for user :" + e.getMessage());
95
            throw new Exception("Could not change default schema for user ", e);
96
        } finally {
97
            con.close();
98
        }
99

    
100
        log.info(" Default user schema set to shadow");
101
    }
102

    
103

    
104
    /**
105
     * Keeps a backup of the current public schema and then replaces it with the new  Shadow one.
106
     *
107
     * @throws Exception
108
     */
109
    public void renameSchema() throws Exception {
110
        Connection con = statsDatasource.getConnection();
111
        String q = "DROP SCHEMA IF EXISTS backup CASCADE;" +
112
                "DROP SCHEMA IF EXISTS public CASCADE;" +
113
                " ALTER SCHEMA shadow RENAME TO public; ";
114

    
115
        try {
116
            CallableStatement st = con.prepareCall(q);
117
            log.info("Renaming  Shadow  schema to  Public,,,");
118
            st.execute();
119
            st.close();
120
            con.close();
121

    
122
            log.info("Renaming done");
123

    
124
        } catch (Exception e) {
125
            log.error("Could not rename schema " + e.getMessage());
126
            throw new Exception("Could not rename schema  ", e);
127
        } finally {
128
            con.close();
129
        }
130
        log.info("Schema renamed");
131
    }
132

    
133
    public void addArrayColumns() throws Exception {
134
        log.info("Adding new columns ...");
135
        Connection con = statsDatasource.getConnection();
136

    
137
        startTime = System.currentTimeMillis();
138
        String q = "{call shadow.create_arrays()}";
139

    
140
        CallableStatement st = con.prepareCall(q);
141
        st.execute();
142

    
143
        st.close();
144
        con.close();
145

    
146
        endtime = System.currentTimeMillis();
147
        log.info("Time to add columns: " + ((endtime - startTime) / 60000) + " minutes ");
148
    }
149

    
150
    public void cleanTables() throws Exception {
151
        log.info(" Cleaning Tables...");
152
        Connection con = statsDatasource.getConnection();
153
        startTime = System.currentTimeMillis();
154
        String q = "CREATE TABLE \"shadow\".rd_distinct AS SELECT DISTINCT * FROM \"shadow\".result_datasources;";
155
        CallableStatement st = con.prepareCall(q);
156
        st.execute();
157
        st.close();
158

    
159
        q = "TRUNCATE \"shadow\".result_datasources;";
160
        st = con.prepareCall(q);
161
        st.execute();
162
        st.close();
163

    
164
        q = "INSERT INTO \"shadow\".result_datasources SELECT * FROM \"shadow\".rd_distinct;";
165
        st = con.prepareCall(q);
166
        st.execute();
167
        st.close();
168

    
169
        q = "DROP TABLE \"shadow\".rd_distinct;";
170
        st = con.prepareCall(q);
171
        st.execute();
172
        st.close();
173

    
174
        con.close();
175
        endtime = System.currentTimeMillis();
176
        log.info("Time to clean tables : " + ((endtime - startTime) / 60000) + " minutes ");
177
    }
178

    
179
    /**
180
     * Create charts.
181
     *
182
     * @throws Exception
183
     */
184
    public void createCharts() throws Exception {
185
        log.info(" Creating Chart Tables...");
186
        Connection con = statsDatasource.getConnection();
187

    
188
        startTime = System.currentTimeMillis();
189
        String q = "{call shadow.create_charts()}";
190

    
191
        CallableStatement st = con.prepareCall(q);
192
        st.execute();
193

    
194
        st.close();
195
        con.close();
196

    
197
        endtime = System.currentTimeMillis();
198
        log.info("Time to create chart tables: " + ((endtime - startTime) / 60000) + " minutes ");
199
    }
200

    
201
    /**
202
     * Create chart indexes.
203
     *
204
     * @throws Exception
205
     */
206
    public void createChartIndexes() throws Exception {
207
        log.info(" Create Chart Indexes...");
208
        Connection con = statsDatasource.getConnection();
209

    
210
        startTime = System.currentTimeMillis();
211
        String q = "{call shadow.create_chart_indexes()}";
212

    
213
        CallableStatement st = con.prepareCall(q);
214
        st.execute();
215

    
216
        st.close();
217
        con.close();
218

    
219
        endtime = System.currentTimeMillis();
220
        log.info("Time to create chart indexes : " + ((endtime - startTime) / 60000) + " minutes ");
221
    }
222

    
223
    /**
224
     * Builds indexes.
225
     *
226
     * @throws Exception
227
     */
228
    public void buildIndexes() throws Exception {
229
        log.info(" Building Database Indexes...");
230
        Connection con = statsDatasource.getConnection();
231
        startTime = System.currentTimeMillis();
232
        String q = "{call shadow.create_indexes()}";
233
        CallableStatement st = con.prepareCall(q);
234
        st.execute();
235
        st.close();
236
        con.close();
237
        endtime = System.currentTimeMillis();
238
        log.info("Time to build indexes : " + ((endtime - startTime) / 60000) + " minutes ");
239
    }
240

    
241
    /**
242
     * Builds views.
243
     *
244
     * @throws Exception
245
     */
246
    public void buildViews() throws Exception {
247
        log.info(" Building Database Views...");
248
        Connection con = statsDatasource.getConnection();
249

    
250
        startTime = System.currentTimeMillis();
251
        String q = "{call shadow.create_views()}";
252

    
253
        CallableStatement st = con.prepareCall(q);
254
        st.execute();
255

    
256
        st.close();
257
        con.close();
258

    
259
        endtime = System.currentTimeMillis();
260
        log.info("Time to build Views : " + ((endtime - startTime) / 60000) + " minutes ");
261
    }
262

    
263
    /**
264
     * Executes extra inserts ( table updates )
265
     *
266
     * @throws Exception
267
     */
268
    public void executeExtraInserts() throws Exception {
269
        log.info("  Executing Extra Inserts...");
270

    
271
        startTime = System.currentTimeMillis();
272

    
273
        populateDefaults();
274
        //update_project_results();
275
        update_project_has_pubs();
276
        update_project_pubs_count();
277
        update_project_delated_pubs();
278
        update_project_daysforlastpub();
279
        update_project_delayed_pubs();
280
        cleanUp();
281

    
282
        endtime = System.currentTimeMillis();
283
        log.info("Total time for Extra Inserts : " + ((endtime - startTime) / 60000) + " minutes ");
284
    }
285

    
286

    
287
    public void populateDefaults() throws Exception {
288
        startTime = System.currentTimeMillis();
289
        log.info("  Populating defaults table and Updating Datasources...");
290

    
291
        Connection con = statsDatasource.getConnection();
292

    
293
        startTime = System.currentTimeMillis();
294
        String q = "{call shadow.extra_defaults_datasource()}";
295

    
296
        CallableStatement st = con.prepareCall(q);
297
        st.execute();
298

    
299
        st.close();
300
        con.close();
301
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
302
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
303
        //new SimpleJdbcCall(statsDatasource).withProcedureName("extra_defaults_datasource");
304
        endtime = System.currentTimeMillis();
305
        log.info("Time  for   Populating defaults : " + ((endtime - startTime) / 60000) + " minutes ");
306
    }
307

    
308

    
309
    public void update_project_results() throws Exception {
310
        startTime = System.currentTimeMillis();
311
        log.info("   Updating Project Results...");
312

    
313
        Connection con = statsDatasource.getConnection();
314

    
315
        startTime = System.currentTimeMillis();
316
        String q = "{call shadow.update_project_results()}";
317

    
318
        CallableStatement st = con.prepareCall(q);
319
        st.execute();
320

    
321
        st.close();
322
        con.close();
323
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
324
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
325
        //new SimpleJdbcCall(statsDatasource).withProcedureName("update_project_results");
326
        log.info("Done.");
327
        endtime = System.currentTimeMillis();
328
        log.info("Time  for  Updating Project Results: " + ((endtime - startTime) / 60000) + " minutes ");
329
    }
330

    
331

    
332
    public void update_project_has_pubs() throws Exception {
333

    
334
        startTime = System.currentTimeMillis();
335
        log.info("   Updating Project with Publications..");
336
        Connection con = statsDatasource.getConnection();
337

    
338
        startTime = System.currentTimeMillis();
339
        String q = "{call shadow.project_has_pubs()}";
340

    
341
        CallableStatement st = con.prepareCall(q);
342
        st.execute();
343

    
344
        st.close();
345
        con.close();
346
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
347
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
348
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_has_pubs");
349
        log.info("Done.");
350
        endtime = System.currentTimeMillis();
351
        log.info("Time  for Updating Project with Publications : " + ((endtime - startTime) / 60000) + " minutes ");
352
    }
353

    
354
    public void update_project_pubs_count() throws Exception {
355

    
356
        startTime = System.currentTimeMillis();
357
        log.info("   Updating Project  Publications Count..");
358
        Connection con = statsDatasource.getConnection();
359

    
360
        startTime = System.currentTimeMillis();
361
        String q = "{call shadow.project_pubs_count()}";
362

    
363
        CallableStatement st = con.prepareCall(q);
364
        st.execute();
365

    
366
        st.close();
367
        con.close();
368
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
369
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
370
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_pubs_count");
371
        log.info("Done.");
372
        endtime = System.currentTimeMillis();
373
        log.info("Time  for Updating Project  Publications Count : " + ((endtime - startTime) / 60000) + " minutes ");
374
    }
375

    
376
    public void update_project_delated_pubs() throws Exception {
377

    
378
        startTime = System.currentTimeMillis();
379
        log.info("   Updating Project  Delayed Publications...");
380
        Connection con = statsDatasource.getConnection();
381

    
382
        startTime = System.currentTimeMillis();
383
        String q = "{call shadow.project_delayedpubs()}";
384

    
385
        CallableStatement st = con.prepareCall(q);
386
        st.execute();
387

    
388
        st.close();
389
        con.close();
390
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
391
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
392
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_delayedpubs");
393

    
394
        log.info("Done.");
395
        endtime = System.currentTimeMillis();
396
        log.info("Time  for Updating Project  Delayed Publications : " + ((endtime - startTime) / 60000) + " minutes ");
397
    }
398

    
399

    
400
    public void update_project_daysforlastpub() throws Exception {
401

    
402
        startTime = System.currentTimeMillis();
403
        log.info("Updating Project Days For Last Publication...");
404
        Connection con = statsDatasource.getConnection();
405

    
406
        startTime = System.currentTimeMillis();
407
        String q = "{call shadow.project_daysforlastpub()}";
408

    
409
        CallableStatement st = con.prepareCall(q);
410
        st.execute();
411

    
412
        st.close();
413
        con.close();
414

    
415
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
416
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
417
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_daysforlastpub");
418
        log.info("Done.");
419

    
420
        endtime = System.currentTimeMillis();
421
        log.info("Time  for Updating Project Days For Last Publication   : " + ((endtime - startTime) / 60000) + " minutes ");
422
    }
423

    
424
    public void update_project_delayed_pubs() throws Exception {
425

    
426
        startTime = System.currentTimeMillis();
427
        log.info("   Updating Project  Delayed  Publications...");
428
        Connection con = statsDatasource.getConnection();
429

    
430
        startTime = System.currentTimeMillis();
431
        String q = "{call shadow.project_delayed()}";
432

    
433
        CallableStatement st = con.prepareCall(q);
434
        st.execute();
435

    
436
        st.close();
437
        con.close();
438

    
439
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
440
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
441
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_delayed");
442
        log.info("Done.");
443
        endtime = System.currentTimeMillis();
444
        log.info("Time  for Updating  Project  Delayed  Publications : " + ((endtime - startTime) / 60000) + " minutes ");
445
    }
446

    
447
    public void insertCountries() throws Exception {
448

    
449
        startTime = System.currentTimeMillis();
450
        log.info("Inserting countries...");
451
        Connection con = statsDatasource.getConnection();
452

    
453
        startTime = System.currentTimeMillis();
454
        String q = "{call shadow.insert_countries()}";
455

    
456
        CallableStatement st = con.prepareCall(q);
457
        st.execute();
458

    
459
        st.close();
460
        con.close();
461

    
462
        log.info("Done.");
463
        endtime = System.currentTimeMillis();
464
        log.info("Time  for inserting countries: " + ((endtime - startTime) / 60000) + " minutes ");
465
    }
466

    
467
    public void backwardsCompatibility() throws Exception {
468

    
469
        startTime = System.currentTimeMillis();
470
        log.info("Ensuring backwards compatibility");
471
        Connection con = statsDatasource.getConnection();
472

    
473
        startTime = System.currentTimeMillis();
474
        String q = "{call shadow.backwards_compatibility()}";
475

    
476
        CallableStatement st = con.prepareCall(q);
477
        st.execute();
478

    
479
        st.close();
480
        con.close();
481

    
482
        log.info("Done.");
483
        endtime = System.currentTimeMillis();
484
        log.info("Time  for ensuring backwards compatibility: " + ((endtime - startTime) / 60000) + " minutes ");
485
    }
486

    
487
    public void cleanUp() throws Exception {
488

    
489
        startTime = System.currentTimeMillis();
490
        log.info("  Cleaning Up...");
491
        Connection con = statsDatasource.getConnection();
492

    
493
        startTime = System.currentTimeMillis();
494
        String q = "{call shadow.cleanTemps()}";
495

    
496
        CallableStatement st = con.prepareCall(q);
497
        st.execute();
498

    
499
        st.close();
500
        con.close();
501

    
502
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
503
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
504
        //new SimpleJdbcCall(statsDatasource).withProcedureName("cleanTemps");
505
        log.info("Done.");
506
        endtime = System.currentTimeMillis();
507
        log.info("Time  for  cleaning Up : " + ((endtime - startTime) / 60000) + " minutes ");
508
    }
509

    
510

    
511
    /**
512
     * Loads bulk data for Context( and category) tables as generated from the
513
     * IS xml
514
     *
515
     * @throws Exception
516
     */
517

    
518
    public void insertContexts(String fileDir) throws Exception {
519

    
520
        try {
521
            if (!fileDir.endsWith("/")) {
522
                fileDir += "/";
523
            }
524
            copy("context", fileDir + "context");
525
            copy("concept", fileDir + "concept");
526
            copy("category", fileDir + "category");
527

    
528
        } catch (Exception e) {
529
            log.error("Could not import  context in  new database  :" + e);
530
            throw new Exception("Could not import context in new database  :", e);
531
        }
532

    
533
        log.info(" Done.");
534
    }
535

    
536

    
537
    /**
538
     * Calls the COPY command for the given @table and inserts data from a file
539
     *
540
     * @param table
541
     * @throws Exception
542
     */
543

    
544
    public void copy(String table, String file) throws Exception {
545
        CopyManager copyManager;
546
        FileInputStream inputStream = null;
547
        FileSystem fs = FileSystem.get(new Configuration());
548
        Connection con = null;
549
        try {
550
            FSDataInputStream fin = fs.open(new Path(file));
551

    
552
            con = statsDatasource.getConnection();
553

    
554
            copyManager = new CopyManager((BaseConnection) con);
555

    
556
            inputStream = new FileInputStream(file);
557

    
558
            copyManager.copyIn("COPY " + table + " FROM STDIN ", fin);
559
            log.debug("Copy done for " + table);
560
            fin.close();
561

    
562
        } catch (Exception e) {
563
            log.error("Fail to execute copy command for table " + table + e);
564
            throw new Exception("Fail to execute copy command for table " + table, e);
565
        } finally {
566
            con.close();
567
            inputStream.close();
568
        }
569

    
570
    }
571

    
572

    
573
    public String getResults(ResultSet rs) throws Exception {
574
        String data = new String();
575
        try {
576

    
577
            log.info("Generated Report : ");
578

    
579
            ResultSetMetaData rsmd = rs.getMetaData();
580
            while (rs.next()) {
581
                for (int i = 1; i < rsmd.getColumnCount() - 1; i++) {
582
                    data += rsmd.getColumnName(i) + "," + rs.getInt(i) + "\n";
583
                }
584

    
585
            }
586
            log.info(" Generated Data :" + data);
587
            return data;
588

    
589
        } catch (Exception e) {
590
            log.error("Could not import  context in  new database :" + e);
591
            throw new Exception("Could not import context in new database :", e);
592
        } finally {
593
            rs.close();
594
        }
595

    
596
    }
597

    
598
    public DriverManagerDataSource getStatsDatasource() {
599
        return statsDatasource;
600
    }
601

    
602
    public void setStatsDatasource(DriverManagerDataSource statsDatasource) {
603
        this.statsDatasource = statsDatasource;
604
    }
605
}
    (1-1/1)