Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport.daos;
2

    
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.fs.FSDataInputStream;
5
import org.apache.hadoop.fs.FileSystem;
6
import org.apache.hadoop.fs.Path;
7
import org.apache.log4j.Logger;
8
import org.junit.Test;
9
import org.postgresql.copy.CopyManager;
10
import org.postgresql.core.BaseConnection;
11
import org.springframework.jdbc.core.JdbcTemplate;
12
import org.springframework.jdbc.core.simple.SimpleJdbcCall;
13
import org.springframework.jdbc.datasource.DriverManagerDataSource;
14

    
15
import java.io.FileInputStream;
16
import java.io.InputStream;
17
import java.sql.*;
18

    
19
/**
20
 * @author eri
21
 *         <p/>
22
 *         DB DAO with methods for creating the temporary shadow schema of the
23
 *         statsDB, executing extra inserts, creating views and indexes .
24
 */
25
public class StatsDAO {
26
    private org.springframework.jdbc.datasource.DriverManagerDataSource statsDatasource;
27

    
28
    private Logger log = Logger.getLogger(this.getClass());
29
    private long startTime;
30
    private long endtime;
31

    
32
    public StatsDAO(String dbUrl, String dbUser, String dbPassword, String dbDriver) {
33
        statsDatasource = new DriverManagerDataSource(dbUrl, dbUser, dbPassword);
34
        statsDatasource.setDriverClassName(dbDriver);
35
    }
36

    
37
    /**
38
     * Reads create schema script from a file and executes it. Used to create a
39
     * temporary Shadow schema for data import
40
     *
41
     * @throws Exception
42
     */
43

    
44
    public void createSchema() throws Exception {
45

    
46
        InputStream in = ClassLoader.getSystemResourceAsStream("eu/dnetlib/data/mapreduce/hbase/statsExport/stats_db_schema.sql");
47
        byte[] b = new byte[in.available()];
48
        in.read(b);
49
        String q = new String(b);
50

    
51
        in.close();
52
        Connection con = null;
53
        log.info(" Creating Shadow Schema Database   ....");
54

    
55
        try {
56

    
57
            con = statsDatasource.getConnection();
58
            CallableStatement st = con.prepareCall(q);
59
            st.execute();
60
            st.close();
61

    
62
        } catch (Exception e) {
63
            log.error("Could not   Create Tables in  Temporary database  :" + e.getMessage());
64
            throw new Exception("Could not Create Tables in Temporary database  :", e);
65
        } finally {
66
            con.close();
67
        }
68

    
69
        log.info("Shadow Schema created.");
70
    }
71

    
72

    
73
    /**
74
     * Sets the default schema (search path ) for the sqoop user.
75
     *
76
     * @param user
77
     * @throws Exception
78
     */
79
    public void setSearchPathDB(String user) throws Exception {
80

    
81

    
82
        String q = "ALTER ROLE " + user + " SET search_path TO shadow,public; ";
83
        Connection con = null;
84
        try {
85
            con = statsDatasource.getConnection();
86

    
87
            CallableStatement st = con.prepareCall(q);
88
            st.execute();
89
            st.close();
90
        } catch (Exception e) {
91
            log.error("Could not change default schema for user :" + e.getMessage());
92
            throw new Exception("Could not change default schema for user ", e);
93
        } finally {
94
            con.close();
95
        }
96

    
97
        log.info(" Default user schema set to shadow");
98
    }
99

    
100

    
101
    /**
102
     * Keeps a backup of the current public schema and then replaces it with the new  Shadow one.
103
     *
104
     * @throws Exception
105
     */
106
    public void renameSchema() throws Exception {
107
        Connection con = statsDatasource.getConnection();
108
        String q = "DROP SCHEMA IF EXISTS backup CASCADE;" +
109
                "DROP SCHEMA IF EXISTS public CASCADE;" +
110
                " ALTER SCHEMA shadow RENAME TO public; ";
111

    
112
        try {
113
            CallableStatement st = con.prepareCall(q);
114
            log.info("Renaming  Shadow  schema to  Public,,,");
115
            st.execute();
116
            st.close();
117
            con.close();
118

    
119
            log.info("Renaming done");
120

    
121
        } catch (Exception e) {
122
            log.error("Could not rename schema " + e.getMessage());
123
            throw new Exception("Could not rename schema  ", e);
124
        } finally {
125
            con.close();
126
        }
127
        log.info("Schema renamed");
128
    }
129

    
130
    public void addArrayColumns() throws Exception {
131
        log.info("Adding new columns ...");
132
        Connection con = statsDatasource.getConnection();
133

    
134
        startTime = System.currentTimeMillis();
135
        String q = "{call shadow.create_arrays()}";
136

    
137
        CallableStatement st = con.prepareCall(q);
138
        st.execute();
139

    
140
        st.close();
141
        con.close();
142

    
143
        endtime = System.currentTimeMillis();
144
        log.info("Time to add columns: " + ((endtime - startTime) / 60000) + " minutes ");
145
    }
146

    
147
    public void cleanTables() throws Exception {
148
        log.info(" Cleaning Tables...");
149
        Connection con = statsDatasource.getConnection();
150
        startTime = System.currentTimeMillis();
151
        String q = "CREATE TABLE \"shadow\".rd_distinct AS SELECT DISTINCT * FROM \"shadow\".result_datasources;";
152
        CallableStatement st = con.prepareCall(q);
153
        st.execute();
154
        st.close();
155

    
156
        q = "TRUNCATE \"shadow\".result_datasources;";
157
        st = con.prepareCall(q);
158
        st.execute();
159
        st.close();
160

    
161
        q = "INSERT INTO \"shadow\".result_datasources SELECT * FROM \"shadow\".rd_distinct;";
162
        st = con.prepareCall(q);
163
        st.execute();
164
        st.close();
165

    
166
        q = "DROP TABLE \"shadow\".rd_distinct;";
167
        st = con.prepareCall(q);
168
        st.execute();
169
        st.close();
170

    
171
        con.close();
172
        endtime = System.currentTimeMillis();
173
        log.info("Time to clean tables : " + ((endtime - startTime) / 60000) + " minutes ");
174
    }
175

    
176
    /**
177
     * Create charts.
178
     *
179
     * @throws Exception
180
     */
181
    public void createCharts() throws Exception {
182
        log.info(" Creating Chart Tables...");
183
        Connection con = statsDatasource.getConnection();
184

    
185
        startTime = System.currentTimeMillis();
186
        String q = "{call shadow.create_charts()}";
187

    
188
        CallableStatement st = con.prepareCall(q);
189
        st.execute();
190

    
191
        st.close();
192
        con.close();
193

    
194
        endtime = System.currentTimeMillis();
195
        log.info("Time to create chart tables: " + ((endtime - startTime) / 60000) + " minutes ");
196
    }
197

    
198
    /**
199
     * Create chart indexes.
200
     *
201
     * @throws Exception
202
     */
203
    public void createChartIndexes() throws Exception {
204
        log.info(" Create Chart Indexes...");
205
        Connection con = statsDatasource.getConnection();
206

    
207
        startTime = System.currentTimeMillis();
208
        String q = "{call shadow.create_chart_indexes()}";
209

    
210
        CallableStatement st = con.prepareCall(q);
211
        st.execute();
212

    
213
        st.close();
214
        con.close();
215

    
216
        endtime = System.currentTimeMillis();
217
        log.info("Time to create chart indexes : " + ((endtime - startTime) / 60000) + " minutes ");
218
    }
219

    
220
    /**
221
     * Builds indexes.
222
     *
223
     * @throws Exception
224
     */
225
    public void buildIndexes() throws Exception {
226
        log.info(" Building Database Indexes...");
227
        Connection con = statsDatasource.getConnection();
228
        startTime = System.currentTimeMillis();
229
        String q = "{call shadow.create_indexes()}";
230
        CallableStatement st = con.prepareCall(q);
231
        st.execute();
232
        st.close();
233
        con.close();
234
        endtime = System.currentTimeMillis();
235
        log.info("Time to build indexes : " + ((endtime - startTime) / 60000) + " minutes ");
236
    }
237

    
238
    /**
239
     * Builds views.
240
     *
241
     * @throws Exception
242
     */
243
    public void buildViews() throws Exception {
244
        log.info(" Building Database Views...");
245
        Connection con = statsDatasource.getConnection();
246

    
247
        startTime = System.currentTimeMillis();
248
        String q = "{call shadow.create_views()}";
249

    
250
        CallableStatement st = con.prepareCall(q);
251
        st.execute();
252

    
253
        st.close();
254
        con.close();
255

    
256
        endtime = System.currentTimeMillis();
257
        log.info("Time to build Views : " + ((endtime - startTime) / 60000) + " minutes ");
258
    }
259

    
260
    /**
261
     * Executes extra inserts ( table updates )
262
     *
263
     * @throws Exception
264
     */
265
    public void executeExtraInserts() throws Exception {
266
        log.info("  Executing Extra Inserts...");
267

    
268
        startTime = System.currentTimeMillis();
269

    
270
        populateDefaults();
271
        //update_project_results();
272
        update_project_has_pubs();
273
        update_project_pubs_count();
274
        update_project_delated_pubs();
275
        update_project_daysforlastpub();
276
        update_project_delayed_pubs();
277
        cleanUp();
278

    
279
        endtime = System.currentTimeMillis();
280
        log.info("Total time for Extra Inserts : " + ((endtime - startTime) / 60000) + " minutes ");
281
    }
282

    
283

    
284
    public void populateDefaults() throws Exception {
285
        startTime = System.currentTimeMillis();
286
        log.info("  Populating defaults table and Updating Datasources...");
287

    
288
        Connection con = statsDatasource.getConnection();
289

    
290
        startTime = System.currentTimeMillis();
291
        String q = "{call shadow.extra_defaults_datasource()}";
292

    
293
        CallableStatement st = con.prepareCall(q);
294
        st.execute();
295

    
296
        st.close();
297
        con.close();
298
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
299
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
300
        //new SimpleJdbcCall(statsDatasource).withProcedureName("extra_defaults_datasource");
301
        endtime = System.currentTimeMillis();
302
        log.info("Time  for   Populating defaults : " + ((endtime - startTime) / 60000) + " minutes ");
303
    }
304

    
305

    
306
    public void update_project_results() throws Exception {
307
        startTime = System.currentTimeMillis();
308
        log.info("   Updating Project Results...");
309

    
310
        Connection con = statsDatasource.getConnection();
311

    
312
        startTime = System.currentTimeMillis();
313
        String q = "{call shadow.update_project_results()}";
314

    
315
        CallableStatement st = con.prepareCall(q);
316
        st.execute();
317

    
318
        st.close();
319
        con.close();
320
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
321
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
322
        //new SimpleJdbcCall(statsDatasource).withProcedureName("update_project_results");
323
        log.info("Done.");
324
        endtime = System.currentTimeMillis();
325
        log.info("Time  for  Updating Project Results: " + ((endtime - startTime) / 60000) + " minutes ");
326
    }
327

    
328

    
329
    public void update_project_has_pubs() throws Exception {
330

    
331
        startTime = System.currentTimeMillis();
332
        log.info("   Updating Project with Publications..");
333
        Connection con = statsDatasource.getConnection();
334

    
335
        startTime = System.currentTimeMillis();
336
        String q = "{call shadow.project_has_pubs()}";
337

    
338
        CallableStatement st = con.prepareCall(q);
339
        st.execute();
340

    
341
        st.close();
342
        con.close();
343
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
344
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
345
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_has_pubs");
346
        log.info("Done.");
347
        endtime = System.currentTimeMillis();
348
        log.info("Time  for Updating Project with Publications : " + ((endtime - startTime) / 60000) + " minutes ");
349
    }
350

    
351
    public void update_project_pubs_count() throws Exception {
352

    
353
        startTime = System.currentTimeMillis();
354
        log.info("   Updating Project  Publications Count..");
355
        Connection con = statsDatasource.getConnection();
356

    
357
        startTime = System.currentTimeMillis();
358
        String q = "{call shadow.project_pubs_count()}";
359

    
360
        CallableStatement st = con.prepareCall(q);
361
        st.execute();
362

    
363
        st.close();
364
        con.close();
365
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
366
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
367
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_pubs_count");
368
        log.info("Done.");
369
        endtime = System.currentTimeMillis();
370
        log.info("Time  for Updating Project  Publications Count : " + ((endtime - startTime) / 60000) + " minutes ");
371
    }
372

    
373
    public void update_project_delated_pubs() throws Exception {
374

    
375
        startTime = System.currentTimeMillis();
376
        log.info("   Updating Project  Delayed Publications...");
377
        Connection con = statsDatasource.getConnection();
378

    
379
        startTime = System.currentTimeMillis();
380
        String q = "{call shadow.project_delayedpubs()}";
381

    
382
        CallableStatement st = con.prepareCall(q);
383
        st.execute();
384

    
385
        st.close();
386
        con.close();
387
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
388
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
389
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_delayedpubs");
390

    
391
        log.info("Done.");
392
        endtime = System.currentTimeMillis();
393
        log.info("Time  for Updating Project  Delayed Publications : " + ((endtime - startTime) / 60000) + " minutes ");
394
    }
395

    
396

    
397
    public void update_project_daysforlastpub() throws Exception {
398

    
399
        startTime = System.currentTimeMillis();
400
        log.info("Updating Project Days For Last Publication...");
401
        Connection con = statsDatasource.getConnection();
402

    
403
        startTime = System.currentTimeMillis();
404
        String q = "{call shadow.project_daysforlastpub()}";
405

    
406
        CallableStatement st = con.prepareCall(q);
407
        st.execute();
408

    
409
        st.close();
410
        con.close();
411

    
412
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
413
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
414
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_daysforlastpub");
415
        log.info("Done.");
416

    
417
        endtime = System.currentTimeMillis();
418
        log.info("Time  for Updating Project Days For Last Publication   : " + ((endtime - startTime) / 60000) + " minutes ");
419
    }
420

    
421
    public void update_project_delayed_pubs() throws Exception {
422

    
423
        startTime = System.currentTimeMillis();
424
        log.info("   Updating Project  Delayed  Publications...");
425
        Connection con = statsDatasource.getConnection();
426

    
427
        startTime = System.currentTimeMillis();
428
        String q = "{call shadow.project_delayed()}";
429

    
430
        CallableStatement st = con.prepareCall(q);
431
        st.execute();
432

    
433
        st.close();
434
        con.close();
435

    
436
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
437
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
438
        //new SimpleJdbcCall(statsDatasource).withProcedureName("project_delayed");
439
        log.info("Done.");
440
        endtime = System.currentTimeMillis();
441
        log.info("Time  for Updating  Project  Delayed  Publications : " + ((endtime - startTime) / 60000) + " minutes ");
442
    }
443

    
444
    public void insertCountries() throws Exception {
445

    
446
        startTime = System.currentTimeMillis();
447
        log.info("Inserting countries...");
448
        Connection con = statsDatasource.getConnection();
449

    
450
        startTime = System.currentTimeMillis();
451
        String q = "{call shadow.insert_countries()}";
452

    
453
        CallableStatement st = con.prepareCall(q);
454
        st.execute();
455

    
456
        st.close();
457
        con.close();
458

    
459
        log.info("Done.");
460
        endtime = System.currentTimeMillis();
461
        log.info("Time  for inserting countries: " + ((endtime - startTime) / 60000) + " minutes ");
462
    }
463

    
464
    public void backwardsCompatibility() throws Exception {
465

    
466
        startTime = System.currentTimeMillis();
467
        log.info("Ensuring backwards compatibility");
468
        Connection con = statsDatasource.getConnection();
469

    
470
        startTime = System.currentTimeMillis();
471
        String q = "{call shadow.backwards_compatibility()}";
472

    
473
        CallableStatement st = con.prepareCall(q);
474
        st.execute();
475

    
476
        st.close();
477
        con.close();
478

    
479
        log.info("Done.");
480
        endtime = System.currentTimeMillis();
481
        log.info("Time  for ensuring backwards compatibility: " + ((endtime - startTime) / 60000) + " minutes ");
482
    }
483

    
484
    public void copyUsageStats() throws Exception {
485
        startTime = System.currentTimeMillis();
486
        log.info("Copying usage statistics from public schema");
487
        Connection con = statsDatasource.getConnection();
488

    
489
        startTime = System.currentTimeMillis();
490
        Statement st = con.createStatement();
491
        st.executeUpdate("create table shadow.piwiklog as (select * from public.piwiklog)");
492
        st.executeUpdate("create table shadow.usage_stats as (select * from public.usage_stats)");
493

    
494
        st.close();
495
        con.close();
496

    
497
        log.info("Done.");
498
        endtime = System.currentTimeMillis();
499
        log.info("Time for copying usage statistics: " + ((endtime - startTime) / 60000) + " minutes ");
500
    }
501

    
502
    public void cleanUp() throws Exception {
503

    
504
        startTime = System.currentTimeMillis();
505
        log.info("  Cleaning Up...");
506
        Connection con = statsDatasource.getConnection();
507

    
508
        startTime = System.currentTimeMillis();
509
        String q = "{call shadow.cleanTemps()}";
510

    
511
        CallableStatement st = con.prepareCall(q);
512
        st.execute();
513

    
514
        st.close();
515
        con.close();
516

    
517
        //JdbcTemplate jdbcTemplate = new JdbcTemplate(statsDatasource);
518
        //jdbcTemplate.setResultsMapCaseInsensitive(true);
519
        //new SimpleJdbcCall(statsDatasource).withProcedureName("cleanTemps");
520
        log.info("Done.");
521
        endtime = System.currentTimeMillis();
522
        log.info("Time  for  cleaning Up : " + ((endtime - startTime) / 60000) + " minutes ");
523
    }
524

    
525

    
526
    /**
527
     * Loads bulk data for Context( and category) tables as generated from the
528
     * IS xml
529
     *
530
     * @throws Exception
531
     */
532

    
533
    public void insertContexts(String fileDir) throws Exception {
534

    
535
        try {
536
            if (!fileDir.endsWith("/")) {
537
                fileDir += "/";
538
            }
539
            copy("context", fileDir + "context");
540
            copy("concept", fileDir + "concept");
541
            copy("category", fileDir + "category");
542

    
543
        } catch (Exception e) {
544
            log.error("Could not import  context in  new database  :" + e);
545
            throw new Exception("Could not import context in new database  :", e);
546
        }
547

    
548
        log.info(" Done.");
549
    }
550

    
551

    
552
    /**
553
     * Calls the COPY command for the given @table and inserts data from a file
554
     *
555
     * @param table
556
     * @throws Exception
557
     */
558

    
559
    public void copy(String table, String file) throws Exception {
560
        CopyManager copyManager;
561
        FileInputStream inputStream = null;
562
        FileSystem fs = FileSystem.get(new Configuration());
563
        Connection con = null;
564
        try {
565
            FSDataInputStream fin = fs.open(new Path(file));
566

    
567
            con = statsDatasource.getConnection();
568

    
569
            copyManager = new CopyManager((BaseConnection) con);
570

    
571
            inputStream = new FileInputStream(file);
572

    
573
            copyManager.copyIn("COPY " + table + " FROM STDIN ", fin);
574
            log.debug("Copy done for " + table);
575
            fin.close();
576

    
577
        } catch (Exception e) {
578
            log.error("Fail to execute copy command for table " + table + e);
579
            throw new Exception("Fail to execute copy command for table " + table, e);
580
        } finally {
581
            con.close();
582
            inputStream.close();
583
        }
584

    
585
    }
586

    
587

    
588
    public String getResults(ResultSet rs) throws Exception {
589
        String data = new String();
590
        try {
591

    
592
            log.info("Generated Report : ");
593

    
594
            ResultSetMetaData rsmd = rs.getMetaData();
595
            while (rs.next()) {
596
                for (int i = 1; i < rsmd.getColumnCount() - 1; i++) {
597
                    data += rsmd.getColumnName(i) + "," + rs.getInt(i) + "\n";
598
                }
599

    
600
            }
601
            log.info(" Generated Data :" + data);
602
            return data;
603

    
604
        } catch (Exception e) {
605
            log.error("Could not import  context in  new database :" + e);
606
            throw new Exception("Could not import context in new database :", e);
607
        } finally {
608
            rs.close();
609
        }
610

    
611
    }
612

    
613
    public DriverManagerDataSource getStatsDatasource() {
614
        return statsDatasource;
615
    }
616

    
617
    public void setStatsDatasource(DriverManagerDataSource statsDatasource) {
618
        this.statsDatasource = statsDatasource;
619
    }
620
}
    (1-1/1)