Project

General

Profile

1
package eu.dnetlib.usagestats.export;
2

    
3
import java.io.*;
4
import java.net.URLDecoder;
5
import java.sql.Connection;
6
import java.sql.PreparedStatement;
7
import java.sql.SQLException;
8
import java.sql.Statement;
9
import java.sql.Timestamp;
10
import java.text.SimpleDateFormat;
11
import java.util.*;
12
import java.util.regex.Matcher;
13
import java.util.regex.Pattern;
14
import java.util.stream.Stream;
15

    
16
import org.apache.hadoop.conf.Configuration;
17
import org.apache.hadoop.fs.LocatedFileStatus;
18
import org.apache.hadoop.fs.Path;
19
import org.apache.hadoop.fs.FileSystem;
20
import org.apache.hadoop.fs.RemoteIterator;
21
import org.apache.log4j.Logger;
22
import org.json.simple.JSONArray;
23
import org.json.simple.JSONObject;
24
import org.json.simple.parser.JSONParser;
25

    
26
public class PiwikStatsDB {
27

    
28
    private String logRepoPath;
29
    private String logPortalPath;
30

    
31
    private Statement stmt = null;
32

    
33
    private final Logger log = Logger.getLogger(this.getClass());
34
    private String CounterRobotsURL;
35
    private ArrayList robotsList;
36

    
37
    public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception {
38
        this.logRepoPath = logRepoPath;
39
        this.logPortalPath = logPortalPath;
40
        this.createTables();
41
        this.createTmpTables();
42
    }
43

    
44
    public void foo() {
45
        Stream<String> s = Arrays.stream(new String[]{"a", "b", "c", "d"});
46

    
47
        System.out.println(s.parallel().count());
48
    }
49

    
50
    public ArrayList getRobotsList() {
51
        return robotsList;
52
    }
53

    
54
    public void setRobotsList(ArrayList robotsList) {
55
        this.robotsList = robotsList;
56
    }
57

    
58
    public String getCounterRobotsURL() {
59
        return CounterRobotsURL;
60
    }
61

    
62
    public void setCounterRobotsURL(String CounterRobotsURL) {
63
        this.CounterRobotsURL = CounterRobotsURL;
64
    }
65

    
66
    private void createTables() throws Exception {
67
        try {
68
            stmt = ConnectDB.getConnection().createStatement();
69
            String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklog(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
70
            String sqlcreateRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
71
                    + " ON INSERT TO piwiklog "
72
                    + " WHERE (EXISTS ( SELECT piwiklog.source, piwiklog.id_visit,"
73
                    + "piwiklog.action, piwiklog.\"timestamp\", piwiklog.entity_id "
74
                    + "FROM piwiklog "
75
                    + "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
76
            String sqlCreateRuleIndexPiwikLog = "create index if not exists piwiklog_rule on piwiklog(source, id_visit, action, entity_id, \"timestamp\");";
77
            stmt.executeUpdate(sqlCreateTablePiwikLog);
78
            stmt.executeUpdate(sqlcreateRulePiwikLog);
79
            stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
80

    
81
            String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
82
            String sqlcreateRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
83
                    + " ON INSERT TO process_portal_log "
84
                    + " WHERE (EXISTS ( SELECT process_portal_log.source, process_portal_log.id_visit,"
85
                    + "process_portal_log.\"timestamp\" "
86
                    + "FROM process_portal_log "
87
                    + "WHERE process_portal_log.source = new.source AND process_portal_log.id_visit = new.id_visit AND process_portal_log.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
88
            String sqlCreateRuleIndexPortalLog = "create index if not exists process_portal_log_rule on process_portal_log(source, id_visit, \"timestamp\");";
89
            stmt.executeUpdate(sqlCreateTablePortalLog);
90
            stmt.executeUpdate(sqlcreateRulePortalLog);
91
            stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
92

    
93
            stmt.close();
94
            ConnectDB.getConnection().close();
95
            log.info("Usage Tables Created");
96

    
97
        } catch (Exception e) {
98
            log.error("Failed to create tables: " + e);
99
            throw new Exception("Failed to create tables: " + e.toString(), e);
100
        }
101
    }
102

    
103
    private void createTmpTables() throws Exception {
104
        try {
105
            Statement stmt = ConnectDB.getConnection().createStatement();
106
            String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklogtmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
107
            String sqlcreateTmpRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
108
                    + " ON INSERT TO piwiklogtmp "
109
                    + " WHERE (EXISTS ( SELECT piwiklogtmp.source, piwiklogtmp.id_visit,"
110
                    + "piwiklogtmp.action, piwiklogtmp.\"timestamp\", piwiklogtmp.entity_id "
111
                    + "FROM piwiklogtmp "
112
                    + "WHERE piwiklogtmp.source = new.source AND piwiklogtmp.id_visit = new.id_visit AND piwiklogtmp.action = new.action AND piwiklogtmp.entity_id = new.entity_id AND piwiklogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
113
            stmt.executeUpdate(sqlCreateTmpTablePiwikLog);
114
            stmt.executeUpdate(sqlcreateTmpRulePiwikLog);
115

    
116
            //String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;";
117
            //stmt.executeUpdate(sqlCopyPublicPiwiklog);
118
            String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
119
            String sqlcreateTmpRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
120
                    + " ON INSERT TO process_portal_log_tmp "
121
                    + " WHERE (EXISTS ( SELECT process_portal_log_tmp.source, process_portal_log_tmp.id_visit,"
122
                    + "process_portal_log_tmp.\"timestamp\" "
123
                    + "FROM process_portal_log_tmp "
124
                    + "WHERE process_portal_log_tmp.source = new.source AND process_portal_log_tmp.id_visit = new.id_visit AND process_portal_log_tmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
125
            stmt.executeUpdate(sqlCreateTmpTablePortalLog);
126
            stmt.executeUpdate(sqlcreateTmpRulePortalLog);
127

    
128
            stmt.close();
129
            log.info("Usage Tmp Tables Created");
130

    
131
        } catch (Exception e) {
132
            log.error("Failed to create tmptables: " + e);
133
            throw new Exception("Failed to create tmp tables: " + e.toString(), e);
134
            //System.exit(0);
135
        }
136
    }
137

    
138
    public void processLogs() throws Exception {
139
        try {
140
            ReadCounterRobotsList counterRobots = new ReadCounterRobotsList(this.getCounterRobotsURL());
141
            this.robotsList = counterRobots.getRobotsPatterns();
142

    
143
            processRepositoryLog();
144
            log.info("OpenAIRE repository process done");
145
            removeDoubleClicks();
146
            log.info("OpenAIRE removing double clicks done");
147

    
148
            processPortalLog();
149
            log.info("portal process done");
150

    
151
            portalStats();
152
            log.info("portal usagestats done");
153

    
154
            cleanOAI();
155
            log.info("OpenAIREcleaning oai done");
156

    
157
            viewsStats();
158
            log.info("OpenAIRE views stats done");
159

    
160
            downloadsStats();
161
            log.info("OpenAIRE downloads stats done");
162

    
163
            updateProdTables();
164
            log.info("updateProdTables done");
165

    
166
        } catch (Exception e) {
167
            log.error("Failed to process logs: " + e);
168
            throw new Exception("Failed to process logs: " + e.toString(), e);
169
        }
170
    }
171

    
172
//    public void usageStats() throws Exception {
173
//        try {
174
//            viewsStats();
175
//            downloadsStats();
176
//            log.info("stat tables and views done");
177
//        } catch (Exception e) {
178
//            log.error("Failed to create usage usagestats: " + e);
179
//            throw new Exception("Failed to create usage usagestats: " + e.toString(), e);
180
//        }
181
//    }
182
    public void processRepositoryLog() throws Exception {
183
        Statement stmt = ConnectDB.getConnection().createStatement();
184
        ConnectDB.getConnection().setAutoCommit(false);
185

    
186
        ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
187
//        File dir = new File(this.logRepoPath);
188
//        File[] jsonFiles = dir.listFiles();
189

    
190
        PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO piwiklogtmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
191
        int batch_size = 0;
192
        JSONParser parser = new JSONParser();
193
        for (String jsonFile : jsonFiles) {
194
            System.out.println(jsonFile);
195
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
196
            for (Object aJsonArray : jsonArray) {
197
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
198
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
199
                String idVisit = jsonObjectRow.get("idVisit").toString();
200
                String country = jsonObjectRow.get("country").toString();
201
                String referrerName = jsonObjectRow.get("referrerName").toString();
202
                String agent = jsonObjectRow.get("browser").toString();
203
                boolean botFound = false;
204
                Iterator it = robotsList.iterator();
205
                while (it.hasNext()) {
206
                    // Create a Pattern object
207
                    Pattern r = Pattern.compile(it.next().toString());
208
                    // Now create matcher object.
209
                    Matcher m = r.matcher(agent);
210
                    if (m.find()) {
211
                        //System.out.println("Found value: " + m.group(0));
212
                        botFound = true;
213
                        break;
214
                    }
215
                }
216
                if (botFound == false) {
217
                    String sourceItemType = "repItem";
218

    
219
                    JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
220
                    for (Object actionDetail : actionDetails) {
221
                        JSONObject actionDetailsObj = (JSONObject) actionDetail;
222

    
223
                        if (actionDetailsObj.get("customVariables") != null) {
224
                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
225
                            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
226
                            Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
227
                            String url = actionDetailsObj.get("url").toString();
228
                            String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1")).get("customVariablePageValue1").toString();
229
                            String action = actionDetailsObj.get("type").toString();
230

    
231
                            prepStatem.setInt(1, idSite);
232
                            prepStatem.setString(2, idVisit);
233
                            prepStatem.setString(3, country);
234
                            prepStatem.setString(4, action);
235
                            prepStatem.setString(5, url);
236
                            prepStatem.setString(6, oaipmh);
237
                            prepStatem.setString(7, sourceItemType);
238
                            prepStatem.setString(8, simpleDateFormat.format(timestamp));
239
                            prepStatem.setString(9, referrerName);
240
                            prepStatem.setString(10, agent);
241
                            prepStatem.addBatch();
242
                            batch_size++;
243
                            if (batch_size == 10000) {
244
                                prepStatem.executeBatch();
245
                                ConnectDB.getConnection().commit();
246
                                batch_size = 0;
247
                            }
248
                        }
249
                    }
250
                }
251
            }
252
        }
253
        prepStatem.executeBatch();
254
        ConnectDB.getConnection().commit();
255
        stmt.close();
256
    }
257

    
258
    public void removeDoubleClicks() throws Exception {
259
        Statement stmt = ConnectDB.getConnection().createStatement();
260
        ConnectDB.getConnection().setAutoCommit(false);
261

    
262
        //clean download double clicks
263
        String sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
264
        stmt.executeUpdate(sql);
265
        stmt.close();
266
        ConnectDB.getConnection().commit();
267

    
268
        stmt = ConnectDB.getConnection().createStatement();
269

    
270
        //clean view double clicks
271
        sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
272
        stmt.executeUpdate(sql);
273
        stmt.close();
274
        ConnectDB.getConnection().commit();
275
    }
276

    
277
    public void viewsStats() throws Exception {
278
        Statement stmt = ConnectDB.getConnection().createStatement();
279
        ConnectDB.getConnection().setAutoCommit(false);
280

    
281
        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
282
        String sql = "CREATE OR REPLACE VIEW result_views_monthly_tmp AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
283
        stmt.executeUpdate(sql);
284

    
285
        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
286
        sql = "CREATE TABLE IF NOT EXISTS views_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly_tmp p, public.datasource d, public.result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
287
        stmt.executeUpdate(sql);
288

    
289
        sql = "CREATE TABLE IF NOT EXISTS views_stats (like views_stats_tmp including all)";
290
        stmt.executeUpdate(sql);
291

    
292
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
293
        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly_tmp p, public.datasource d, public.result_oids ro where p.source='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
294
        stmt.executeUpdate(sql);
295

    
296
        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats (like pageviews_stats_tmp including all)";
297
        stmt.executeUpdate(sql);
298

    
299
        stmt.close();
300
        ConnectDB.getConnection().commit();
301
        ConnectDB.getConnection().close();
302
    }
303

    
304
//    public void viewsStats(String piwikid) throws Exception {
305
//        stmt = ConnectDB.getConnection().createStatement();
306
//        ConnectDB.getConnection().setAutoCommit(false);
307
//
308
//        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
309
//        String sql = "CREATE OR REPLACE VIEW result_views_monthly" + piwikid + " AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog" + piwikid + " where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
310
//        stmt.executeUpdate(sql);
311
//
312
//        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
313
//        sql = "CREATE TABLE IF NOT EXISTS views_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source!='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
314
//        stmt.executeUpdate(sql);
315
//
316
////        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
317
//        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
318
//        stmt.executeUpdate(sql);
319
//
320
//        sql = "DROP VIEW IF EXISTS result_views_monthly" + piwikid + ";";
321
//        stmt.executeUpdate(sql);
322
//
323
//        stmt.close();
324
//        ConnectDB.getConnection().commit();
325
//        ConnectDB.getConnection().close();
326
//    }
327
    private void downloadsStats() throws Exception {
328
        Statement stmt = ConnectDB.getConnection().createStatement();
329
        ConnectDB.getConnection().setAutoCommit(false);
330

    
331
        //String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
332
        String sql = "CREATE OR REPLACE VIEW result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
333
        stmt.executeUpdate(sql);
334

    
335
        //sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
336
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
337
        sql = "CREATE TABLE IF NOT EXISTS downloads_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM result_downloads_monthly_tmp p, public.datasource d, public.result_oids ro where p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
338
        stmt.executeUpdate(sql);
339

    
340
        sql = "CREATE TABLE IF NOT EXISTS downloads_stats (like downloads_stats_tmp including all)";
341
        stmt.executeUpdate(sql);
342

    
343
        //sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp;";
344
        //stmt.executeUpdate(sql);
345

    
346
        stmt.close();
347
        ConnectDB.getConnection().commit();
348
        ConnectDB.getConnection().close();
349
    }
350

    
351
    public void finalizeStats() throws Exception {
352
        stmt = ConnectDB.getConnection().createStatement();
353
        ConnectDB.getConnection().setAutoCommit(false);
354

    
355
        Calendar startCalendar = Calendar.getInstance();
356
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
357
        Calendar endCalendar = Calendar.getInstance();
358
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
359
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
360

    
361
//        String sql = "SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date INTO full_dates FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
362
        String sql = "CREATE TABLE IF NOT EXISTS full_dates(full_date TEXT)";
363
        stmt.executeUpdate(sql);
364
        
365
        sql = "INSERT INTO full_dates SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
366
        stmt.executeUpdate(sql);
367

    
368
        //sql = "INSERT INTO public.full_dates SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
369
        //stmt.executeUpdate(sql);
370
        
371
        sql = "CREATE INDEX IF NOT EXISTS full_dates_full_date ON full_dates USING btree(full_date);";
372
        stmt.executeUpdate(sql);
373

    
374
        sql = "CREATE INDEX IF NOT EXISTS views_stats_source ON views_stats USING btree(source);";
375
        stmt.executeUpdate(sql);
376

    
377
        sql = "CREATE INDEX IF NOT EXISTS views_stats_repository_id ON views_stats USING btree(repository_id);";
378
        stmt.executeUpdate(sql);
379

    
380
        sql = "CREATE INDEX IF NOT EXISTS views_stats_result_id ON views_stats USING btree(result_id);";
381
        stmt.executeUpdate(sql);
382

    
383
        sql = "CREATE INDEX IF NOT EXISTS views_stats_date ON views_stats USING btree(date);";
384
        stmt.executeUpdate(sql);
385

    
386
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_source ON pageviews_stats USING btree(source);";
387
        stmt.executeUpdate(sql);
388

    
389
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_repository_id ON pageviews_stats USING btree(repository_id);";
390
        stmt.executeUpdate(sql);
391

    
392
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_result_id ON pageviews_stats USING btree(result_id);";
393
        stmt.executeUpdate(sql);
394

    
395
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_date ON pageviews_stats USING btree(date);";
396
        stmt.executeUpdate(sql);
397

    
398
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_source ON downloads_stats USING btree(source);";
399
        stmt.executeUpdate(sql);
400

    
401
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_repository_id ON downloads_stats USING btree(repository_id);";
402
        stmt.executeUpdate(sql);
403

    
404
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_result_id ON downloads_stats USING btree(result_id);";
405
        stmt.executeUpdate(sql);
406

    
407
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_date ON downloads_stats USING btree(date);";
408
        stmt.executeUpdate(sql);
409

    
410
//        sql = "SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views INTO usage_stats FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
411
        sql = "CREATE TABLE IF NOT EXISTS usage_stats AS SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
412
        stmt.executeUpdate(sql);
413

    
414
        sql = "INSERT INTO usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats_tmp AS ds FULL OUTER JOIN views_stats_tmp AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
415
        stmt.executeUpdate(sql);
416

    
417
        sql = "INSERT INTO usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM la_downloads_stats_tmp AS ds FULL OUTER JOIN la_views_stats_tmp AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
418
        stmt.executeUpdate(sql);
419
        
420
        sql = "INSERT INTO public.usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats_tmp AS ds FULL OUTER JOIN views_stats_tmp AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
421
        stmt.executeUpdate(sql);
422

    
423
        sql = "INSERT INTO public.usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM la_downloads_stats_tmp AS ds FULL OUTER JOIN la_views_stats_tmp AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
424
        stmt.executeUpdate(sql);
425
        
426
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_source ON usage_stats USING btree(source);";
427
        stmt.executeUpdate(sql);
428

    
429
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_repository_id ON usage_stats USING btree(repository_id);";
430
        stmt.executeUpdate(sql);
431

    
432
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_result_id ON usage_stats USING btree(result_id);";
433
        stmt.executeUpdate(sql);
434

    
435
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_date ON usage_stats USING btree(date);";
436
        stmt.executeUpdate(sql);
437

    
438
        sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp";
439
        stmt.executeUpdate(sql);
440

    
441
        sql = "DROP VIEW IF EXISTS  result_views_monthly_tmp";
442
        stmt.executeUpdate(sql);
443
        
444
        
445
        sql = "DROP TABLE IF EXISTS views_stats_tmp;";
446
        stmt.executeUpdate(sql);
447

    
448
        sql = "DROP TABLE IF EXISTS downloads_stats_tmp;";
449
        stmt.executeUpdate(sql);
450
       
451
        sql = "DROP TABLE IF EXISTS pageviews_stats_tmp;";
452
        stmt.executeUpdate(sql);
453

    
454
        sql = "DROP TABLE IF EXISTS process_portal_log_tmp;";
455
        stmt.executeUpdate(sql);
456

    
457
        sql = "DROP TABLE IF EXISTS piwiklogtmp;";
458
        stmt.executeUpdate(sql);
459

    
460
        sql = "DROP TABLE IF EXISTS sushilogtmp;";
461
        stmt.executeUpdate(sql);
462

    
463
        sql = "DROP VIEW IF EXISTS la_result_views_monthly_tmp;";
464
        stmt.executeUpdate(sql);
465

    
466
        sql = "DROP VIEW IF EXISTS la_result_downloads_monthly_tmp;";
467
        stmt.executeUpdate(sql);
468

    
469
        sql = "DROP TABLE IF EXISTS la_downloads_stats_tmp;";
470
        stmt.executeUpdate(sql);
471
        
472
        sql = "DROP TABLE IF EXISTS la_views_stats_tmp;";
473
        stmt.executeUpdate(sql);
474

    
475
        
476
        sql = "DROP TABLE IF EXISTS lareferencialogtmp;";
477
        stmt.executeUpdate(sql);
478

    
479
        stmt.close();
480
        ConnectDB.getConnection().commit();
481
        ConnectDB.getConnection().close();
482
    }
483

    
484
    //Create repository Views statistics
485
    private void repositoryViewsStats() throws Exception {
486
        stmt = ConnectDB.getConnection().createStatement();
487
        ConnectDB.getConnection().setAutoCommit(false);
488

    
489
//        String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
490
        String sql = "CREATE TABLE IF NOT EXISTS repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
491
        stmt.executeUpdate(sql);
492

    
493
        sql = "CREATE INDEX repo_view_stats_id ON repo_view_stats USING btree (id)";
494
        stmt.executeUpdate(sql);
495

    
496
        sql = "CREATE INDEX repo_view_stats_date ON repo_view_stats USING btree(date)";
497
        stmt.executeUpdate(sql);
498

    
499
//        sql = "SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_view_stats_monthly_clean FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
500
        sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly_clean AS SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
501
        stmt.executeUpdate(sql);
502

    
503
        sql = "CREATE INDEX repo_view_stats_monthly_clean_id ON repo_view_stats_monthly_clean USING btree (id)";
504
        stmt.executeUpdate(sql);
505

    
506
        sql = "CREATE INDEX repo_view_stats_monthly_clean_month ON repo_view_stats_monthly_clean USING btree(month)";
507
        stmt.executeUpdate(sql);
508

    
509
        sql = "CREATE INDEX repo_view_stats_monthly_clean_source ON repo_view_stats_monthly_clean USING btree(source)";
510
        stmt.executeUpdate(sql);
511

    
512
        Calendar startCalendar = Calendar.getInstance();
513
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
514
        Calendar endCalendar = Calendar.getInstance();
515
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
516
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
517

    
518
        //sql="CREATE OR REPLACE view repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
519
//        sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_view_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
520
        sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
521
        stmt.executeUpdate(sql);
522

    
523
        sql = "CREATE INDEX repo_view_stats_monthly_id ON repo_view_stats_monthly USING btree (id)";
524
        stmt.executeUpdate(sql);
525

    
526
        sql = "CREATE INDEX repo_view_stats_monthly_month ON repo_view_stats_monthly USING btree(month)";
527
        stmt.executeUpdate(sql);
528

    
529
        sql = "CREATE INDEX repo_view_stats_monthly_source ON repo_view_stats_monthly USING btree(source)";
530
        stmt.executeUpdate(sql);
531

    
532
        sql = "CREATE OR REPLACE view repo_view_stats_monthly_sushi AS SELECT id, sum(number_of_views), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_view_stats group by id, month, source;";
533
        stmt.executeUpdate(sql);
534

    
535
        stmt.close();
536
        ConnectDB.getConnection().commit();
537
        ConnectDB.getConnection().close();
538
    }
539

    
540
    //Create repository downloads statistics
541
    private void repositoryDownloadsStats() throws Exception {
542
        stmt = ConnectDB.getConnection().createStatement();
543
        ConnectDB.getConnection().setAutoCommit(false);
544

    
545
//        String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
546
        String sql = "CREATE TABLE IF NOT EXISTS repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
547
        stmt.executeUpdate(sql);
548

    
549
        sql = "CREATE INDEX repo_download_stats_id ON repo_download_stats USING btree (id)";
550
        stmt.executeUpdate(sql);
551

    
552
        sql = "CREATE INDEX repo_download_stats_date ON repo_download_stats USING btree(date)";
553
        stmt.executeUpdate(sql);
554

    
555
//        sql = "SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_download_stats_monthly_clean FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
556
        sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly_clean AS SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
557
        stmt.executeUpdate(sql);
558

    
559
        sql = "CREATE INDEX repo_download_stats_monthly_clean_id ON repo_download_stats_monthly_clean USING btree (id)";
560
        stmt.executeUpdate(sql);
561

    
562
        sql = "CREATE INDEX repo_download_stats_monthly_clean_month ON repo_download_stats_monthly_clean USING btree(month)";
563
        stmt.executeUpdate(sql);
564

    
565
        sql = "CREATE INDEX repo_download_stats_monthly_clean_source ON repo_download_stats_monthly_clean USING btree(source)";
566
        stmt.executeUpdate(sql);
567

    
568
        Calendar startCalendar = Calendar.getInstance();
569
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
570
        Calendar endCalendar = Calendar.getInstance();
571
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
572
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
573

    
574
        //sql="CREATE OR REPLACE view repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
575
        // sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_download_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
576
        sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
577
        stmt.executeUpdate(sql);
578

    
579
        sql = "CREATE INDEX repo_download_stats_monthly_id ON repo_download_stats_monthly USING btree (id)";
580
        stmt.executeUpdate(sql);
581

    
582
        sql = "CREATE INDEX repo_download_stats_monthly_month ON repo_download_stats_monthly USING btree(month)";
583
        stmt.executeUpdate(sql);
584

    
585
        sql = "CREATE INDEX repo_download_stats_monthly_source ON repo_download_stats_monthly USING btree(source)";
586
        stmt.executeUpdate(sql);
587

    
588
        sql = "CREATE OR REPLACE view repo_download_stats_monthly_sushi AS SELECT id, sum(number_of_downloads), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_download_stats group by id, month, source;";
589
        stmt.executeUpdate(sql);
590

    
591
        stmt.close();
592
        ConnectDB.getConnection().commit();
593
        ConnectDB.getConnection().close();
594
    }
595

    
596
    // Import OPENAIRE Logs to DB
597
    public void processPortalLog() throws Exception {
598
        Statement stmt = ConnectDB.getConnection().createStatement();
599
        ConnectDB.getConnection().setAutoCommit(false);
600

    
601
        ArrayList<String> jsonFiles = listHdfsDir(this.logPortalPath);
602
//        File folder = new File(this.logPortalPath);
603
//        File[] jsonFiles = folder.listFiles();
604

    
605
        PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO process_portal_log_tmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
606
        int batch_size = 0;
607
        JSONParser parser = new JSONParser();
608
        for (String jsonFile : jsonFiles) {
609
            System.out.println(jsonFile);
610
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
611

    
612
            for (Object aJsonArray : jsonArray) {
613
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
614
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
615
                String idVisit = jsonObjectRow.get("idVisit").toString();
616
                String country = jsonObjectRow.get("country").toString();
617
                String referrerName = jsonObjectRow.get("referrerName").toString();
618
                String agent = jsonObjectRow.get("browser").toString();
619
                boolean botFound = false;
620
                Iterator it = robotsList.iterator();
621
                while (it.hasNext()) {
622
                    // Create a Pattern object
623
                    Pattern r = Pattern.compile(it.next().toString());
624
                    // Now create matcher object.
625
                    Matcher m = r.matcher(agent);
626
                    if (m.find()) {
627
                        botFound = true;
628
                        break;
629
                    }
630
                }
631
                if (botFound == false) {
632
                    JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
633
                    for (Object actionDetail : actionDetails) {
634
                        JSONObject actionDetailsObj = (JSONObject) actionDetail;
635

    
636
                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
637
                        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
638
                        Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
639

    
640
                        String action = actionDetailsObj.get("type").toString();
641
                        String url = actionDetailsObj.get("url").toString();
642

    
643
                        String entityID = processPortalURL(url);
644
                        String sourceItemType = "";
645

    
646
                        if (entityID.indexOf("|") > 0) {
647
                            sourceItemType = entityID.substring(0, entityID.indexOf("|"));
648
                            entityID = entityID.substring(entityID.indexOf("|") + 1);
649
                        }
650

    
651
                        prepStatem.setInt(1, idSite);
652
                        prepStatem.setString(2, idVisit);
653
                        prepStatem.setString(3, country);
654
                        prepStatem.setString(4, action);
655
                        prepStatem.setString(5, url);
656
                        prepStatem.setString(6, entityID);
657
                        prepStatem.setString(7, sourceItemType);
658
                        prepStatem.setString(8, simpleDateFormat.format(timestamp));
659
                        prepStatem.setString(9, referrerName);
660
                        prepStatem.setString(10, agent);
661

    
662
                        prepStatem.addBatch();
663
                        batch_size++;
664
                        if (batch_size == 10000) {
665
                            prepStatem.executeBatch();
666
                            ConnectDB.getConnection().commit();
667
                            batch_size = 0;
668
                        }
669
                    }
670
                }
671
            }
672
        }
673
        prepStatem.executeBatch();
674
        ConnectDB.getConnection().commit();
675

    
676
        stmt.close();
677
        ConnectDB.getConnection().close();
678
    }
679

    
680
    public void portalStats() throws SQLException {
681
        Connection con = ConnectDB.getConnection();
682
        Statement stmt = con.createStatement();
683
        con.setAutoCommit(false);
684

    
685
        String sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'oaItem\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.result_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
686
        stmt.executeUpdate(sql);
687
//        stmt.close();
688
//        con.commit();
689

    
690
        stmt = con.createStatement();
691
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'datasource\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.datasource_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
692
        stmt.executeUpdate(sql);
693
//        stmt.close();
694
//        con.commit();
695

    
696
        stmt = con.createStatement();
697
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'organization\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.organization_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
698
        stmt.executeUpdate(sql);
699
//        stmt.close();
700
//        con.commit();
701

    
702
        stmt = con.createStatement();
703
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'project\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.project_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
704
        stmt.executeUpdate(sql);
705
        //       stmt.close();
706
//        con.commit();
707
        stmt.close();
708
        ConnectDB.getConnection().commit();
709
        ConnectDB.getConnection().close();
710
    }
711

    
712
    private void cleanOAI() throws Exception {
713
        ConnectDB.getConnection().setAutoCommit(false);
714

    
715
        stmt = ConnectDB.getConnection().createStatement();
716
        String sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chlc.min-saude.pt/','oai:repositorio.chlc.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.chlc.min-saude.pt/%';";
717
        stmt.executeUpdate(sql);
718
        stmt.close();
719
        ConnectDB.getConnection().commit();
720

    
721
        stmt = ConnectDB.getConnection().createStatement();
722
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hospitaldebraga.pt/','oai:repositorio.hospitaldebraga.pt:') WHERE entity_id LIKE 'oai:repositorio.hospitaldebraga.pt/%';";
723
        stmt.executeUpdate(sql);
724
        stmt.close();
725
        ConnectDB.getConnection().commit();
726

    
727
        stmt = ConnectDB.getConnection().createStatement();
728
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipl.pt/','oai:repositorio.ipl.pt:') WHERE entity_id LIKE 'oai:repositorio.ipl.pt/%';";
729
        stmt.executeUpdate(sql);
730
        stmt.close();
731
        ConnectDB.getConnection().commit();
732

    
733
        stmt = ConnectDB.getConnection().createStatement();
734
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:bibliotecadigital.ipb.pt/','oai:bibliotecadigital.ipb.pt:') WHERE entity_id LIKE 'oai:bibliotecadigital.ipb.pt/%';";
735
        stmt.executeUpdate(sql);
736
        stmt.close();
737
        ConnectDB.getConnection().commit();
738

    
739
        stmt = ConnectDB.getConnection().createStatement();
740
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ismai.pt/','oai:repositorio.ismai.pt:') WHERE entity_id LIKE 'oai:repositorio.ismai.pt/%';";
741
        stmt.executeUpdate(sql);
742
        stmt.close();
743
        ConnectDB.getConnection().commit();
744

    
745
        stmt = ConnectDB.getConnection().createStatement();
746
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorioaberto.uab.pt/','oai:repositorioaberto.uab.pt:') WHERE entity_id LIKE 'oai:repositorioaberto.uab.pt/%';";
747
        stmt.executeUpdate(sql);
748
        stmt.close();
749
        ConnectDB.getConnection().commit();
750

    
751
        stmt = ConnectDB.getConnection().createStatement();
752
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.uac.pt/','oai:repositorio.uac.pt:') WHERE entity_id LIKE 'oai:repositorio.uac.pt/%';";
753
        stmt.executeUpdate(sql);
754
        stmt.close();
755
        ConnectDB.getConnection().commit();
756

    
757
        stmt = ConnectDB.getConnection().createStatement();
758
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.insa.pt/','oai:repositorio.insa.pt:') WHERE entity_id LIKE 'oai:repositorio.insa.pt/%';";
759
        stmt.executeUpdate(sql);
760
        stmt.close();
761
        ConnectDB.getConnection().commit();
762

    
763
        stmt = ConnectDB.getConnection().createStatement();
764
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipcb.pt/','oai:repositorio.ipcb.pt:') WHERE entity_id LIKE 'oai:repositorio.ipcb.pt/%';";
765
        stmt.executeUpdate(sql);
766
        stmt.close();
767
        ConnectDB.getConnection().commit();
768

    
769
        stmt = ConnectDB.getConnection().createStatement();
770
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ispa.pt/','oai:repositorio.ispa.pt:') WHERE entity_id LIKE 'oai:repositorio.ispa.pt/%';";
771
        stmt.executeUpdate(sql);
772
        stmt.close();
773
        ConnectDB.getConnection().commit();
774

    
775
        stmt = ConnectDB.getConnection().createStatement();
776
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chporto.pt/','oai:repositorio.chporto.pt:') WHERE entity_id LIKE 'oai:repositorio.chporto.pt/%';";
777
        stmt.executeUpdate(sql);
778
        stmt.close();
779
        ConnectDB.getConnection().commit();
780

    
781
        stmt = ConnectDB.getConnection().createStatement();
782
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ucp.pt/','oai:repositorio.ucp.pt:') WHERE entity_id LIKE 'oai:repositorio.ucp.pt/%';";
783
        stmt.executeUpdate(sql);
784
        stmt.close();
785
        ConnectDB.getConnection().commit();
786

    
787
        stmt = ConnectDB.getConnection().createStatement();
788
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:rihuc.huc.min-saude.pt/','oai:rihuc.huc.min-saude.pt:') WHERE entity_id LIKE 'oai:rihuc.huc.min-saude.pt/%';";
789
        stmt.executeUpdate(sql);
790
        stmt.close();
791
        ConnectDB.getConnection().commit();
792

    
793
        stmt = ConnectDB.getConnection().createStatement();
794
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipv.pt/','oai:repositorio.ipv.pt:') WHERE entity_id LIKE 'oai:repositorio.ipv.pt/%';";
795
        stmt.executeUpdate(sql);
796
        stmt.close();
797
        ConnectDB.getConnection().commit();
798

    
799
        stmt = ConnectDB.getConnection().createStatement();
800
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:www.repository.utl.pt/','oai:www.repository.utl.pt:') WHERE entity_id LIKE 'oai:www.repository.utl.pt/%';";
801
        stmt.executeUpdate(sql);
802
        stmt.close();
803
        ConnectDB.getConnection().commit();
804

    
805
        stmt = ConnectDB.getConnection().createStatement();
806
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:run.unl.pt/','oai:run.unl.pt:') WHERE entity_id LIKE 'oai:run.unl.pt/%';";
807
        stmt.executeUpdate(sql);
808
        stmt.close();
809
        ConnectDB.getConnection().commit();
810

    
811
        stmt = ConnectDB.getConnection().createStatement();
812
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:sapientia.ualg.pt/','oai:sapientia.ualg.pt:') WHERE entity_id LIKE 'oai:sapientia.ualg.pt/%';";
813
        stmt.executeUpdate(sql);
814
        stmt.close();
815
        ConnectDB.getConnection().commit();
816

    
817
        stmt = ConnectDB.getConnection().createStatement();
818
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipsantarem.pt/','oai:repositorio.ipsantarem.pt:') WHERE entity_id LIKE 'oai:repositorio.ipsantarem.pt/%';";
819
        stmt.executeUpdate(sql);
820
        stmt.close();
821
        ConnectDB.getConnection().commit();
822

    
823
        stmt = ConnectDB.getConnection().createStatement();
824
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:arca.igc.gulbenkian.pt/','oai:arca.igc.gulbenkian.pt:') WHERE entity_id LIKE 'oai:arca.igc.gulbenkian.pt/%';";
825
        stmt.executeUpdate(sql);
826
        stmt.close();
827
        ConnectDB.getConnection().commit();
828

    
829
        stmt = ConnectDB.getConnection().createStatement();
830
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:ubibliorum.ubi.pt/','oai:ubibliorum.ubi.pt:') WHERE entity_id LIKE 'oai:ubibliorum.ubi.pt/%';";
831
        stmt.executeUpdate(sql);
832
        stmt.close();
833
        ConnectDB.getConnection().commit();
834

    
835
        stmt = ConnectDB.getConnection().createStatement();
836
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:digituma.uma.pt/','oai:digituma.uma.pt:') WHERE entity_id LIKE 'oai:digituma.uma.pt/%';";
837
        stmt.executeUpdate(sql);
838
        stmt.close();
839
        ConnectDB.getConnection().commit();
840

    
841
        stmt = ConnectDB.getConnection().createStatement();
842
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ul.pt/','oai:repositorio.ul.pt:') WHERE entity_id LIKE 'oai:repositorio.ul.pt/%';";
843
        stmt.executeUpdate(sql);
844
        stmt.close();
845
        ConnectDB.getConnection().commit();
846

    
847
        stmt = ConnectDB.getConnection().createStatement();
848
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hff.min-saude.pt/','oai:repositorio.hff.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.hff.min-saude.pt/%';";
849
        stmt.executeUpdate(sql);
850
        stmt.close();
851
        ConnectDB.getConnection().commit();
852

    
853
        stmt = ConnectDB.getConnection().createStatement();
854
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorium.sdum.uminho.pt/','oai:repositorium.sdum.uminho.pt:') WHERE entity_id LIKE 'oai:repositorium.sdum.uminho.pt/%';";
855
        stmt.executeUpdate(sql);
856
        stmt.close();
857
        ConnectDB.getConnection().commit();
858

    
859
        stmt = ConnectDB.getConnection().createStatement();
860
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:recipp.ipp.pt/','oai:recipp.ipp.pt:') WHERE entity_id LIKE 'oai:recipp.ipp.pt/%';";
861
        stmt.executeUpdate(sql);
862
        stmt.close();
863
        ConnectDB.getConnection().commit();
864

    
865
        stmt = ConnectDB.getConnection().createStatement();
866
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:bdigital.ufp.pt/','oai:bdigital.ufp.pt:') WHERE entity_id LIKE 'oai:bdigital.ufp.pt/%';";
867
        stmt.executeUpdate(sql);
868
        stmt.close();
869
        ConnectDB.getConnection().commit();
870

    
871
        stmt = ConnectDB.getConnection().createStatement();
872
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.lneg.pt/','oai:repositorio.lneg.pt:') WHERE entity_id LIKE 'oai:repositorio.lneg.pt/%';";
873
        stmt.executeUpdate(sql);
874
        stmt.close();
875
        ConnectDB.getConnection().commit();
876

    
877
        stmt = ConnectDB.getConnection().createStatement();
878
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:iconline.ipleiria.pt/','oai:iconline.ipleiria.pt:') WHERE entity_id LIKE 'oai:iconline.ipleiria.pt/%';";
879
        stmt.executeUpdate(sql);
880
        stmt.close();
881
        ConnectDB.getConnection().commit();
882

    
883
        stmt = ConnectDB.getConnection().createStatement();
884
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:comum.rcaap.pt/','oai:comum.rcaap.pt:') WHERE entity_id LIKE 'oai:comum.rcaap.pt/%';";
885
        stmt.executeUpdate(sql);
886
        stmt.close();
887
        ConnectDB.getConnection().commit();
888

    
889
        ConnectDB.getConnection().close();
890
    }
891

    
892
    private String processPortalURL(String url) {
893

    
894
        if (url.indexOf("explore.openaire.eu") > 0) {
895
            try {
896
                url = URLDecoder.decode(url, "UTF-8");
897
            } catch (Exception e) {
898
                log.info(url);
899
            }
900
            if (url.indexOf("datasourceId=") > 0 && url.substring(url.indexOf("datasourceId=") + 13).length() >= 46) {
901
                url = "datasource|" + url.substring(url.indexOf("datasourceId=") + 13, url.indexOf("datasourceId=") + 59);
902
            } else if (url.indexOf("datasource=") > 0 && url.substring(url.indexOf("datasource=") + 11).length() >= 46) {
903
                url = "datasource|" + url.substring(url.indexOf("datasource=") + 11, url.indexOf("datasource=") + 57);
904
            } else if (url.indexOf("datasourceFilter=") > 0 && url.substring(url.indexOf("datasourceFilter=") + 17).length() >= 46) {
905
                url = "datasource|" + url.substring(url.indexOf("datasourceFilter=") + 17, url.indexOf("datasourceFilter=") + 63);
906
            } else if (url.indexOf("articleId=") > 0 && url.substring(url.indexOf("articleId=") + 10).length() >= 46) {
907
                url = "result|" + url.substring(url.indexOf("articleId=") + 10, url.indexOf("articleId=") + 56);
908
            } else if (url.indexOf("datasetId=") > 0 && url.substring(url.indexOf("datasetId=") + 10).length() >= 46) {
909
                url = "result|" + url.substring(url.indexOf("datasetId=") + 10, url.indexOf("datasetId=") + 56);
910
            } else if (url.indexOf("projectId=") > 0 && url.substring(url.indexOf("projectId=") + 10).length() >= 46 && !url.contains("oai:dnet:corda")) {
911
                url = "project|" + url.substring(url.indexOf("projectId=") + 10, url.indexOf("projectId=") + 56);
912
            } else if (url.indexOf("organizationId=") > 0 && url.substring(url.indexOf("organizationId=") + 15).length() >= 46) {
913
                url = "organization|" + url.substring(url.indexOf("organizationId=") + 15, url.indexOf("organizationId=") + 61);
914
            } else {
915
                url = "";
916
            }
917
        } else {
918
            url = "";
919
        }
920

    
921
        return url;
922
    }
923

    
924
    private void updateProdTables() throws SQLException {
925
        Statement stmt = ConnectDB.getConnection().createStatement();
926
        ConnectDB.getConnection().setAutoCommit(false);
927

    
928
        String sql = "insert into piwiklog select * from piwiklogtmp;";
929
        stmt.executeUpdate(sql);
930

    
931
        sql = "insert into views_stats select * from views_stats_tmp;";
932
        stmt.executeUpdate(sql);
933

    
934
        sql = "insert into public.views_stats select * from views_stats_tmp;";
935
        stmt.executeUpdate(sql);
936

    
937
        sql = "insert into downloads_stats select * from downloads_stats_tmp;";
938
        stmt.executeUpdate(sql);
939
        
940
        sql = "insert into public.downloads_stats select * from downloads_stats_tmp;";
941
        stmt.executeUpdate(sql);
942

    
943
        sql = "insert into pageviews_stats select * from pageviews_stats_tmp;";
944
        stmt.executeUpdate(sql);
945

    
946
        sql = "insert into public.pageviews_stats select * from pageviews_stats_tmp;";
947
        stmt.executeUpdate(sql);
948

    
949
        stmt.close();
950
        ConnectDB.getConnection().commit();
951
        ConnectDB.getConnection().close();
952

    
953
        log.info("updateProdTables done");
954
    }
955

    
956
    private ArrayList<String> listHdfsDir(String dir) throws Exception {
957

    
958
        FileSystem hdfs = FileSystem.get(new Configuration());
959
        RemoteIterator<LocatedFileStatus> Files;
960
        ArrayList<String> fileNames = new ArrayList<>();
961

    
962
        try {
963
            Path exportPath = new Path(hdfs.getUri() + dir);
964
            Files = hdfs.listFiles(exportPath, false);
965
            while (Files.hasNext()) {
966
                String fileName = Files.next().getPath().toString();
967
                fileNames.add(fileName);
968
            }
969

    
970
            hdfs.close();
971
        } catch (Exception e) {
972
            log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logRepoPath));
973
            throw new Exception("HDFS file path with exported data does not exist :   " + logRepoPath, e);
974
        }
975

    
976
        return fileNames;
977
    }
978

    
979
    private String readHDFSFile(String filename) throws Exception {
980
        String result;
981
        try {
982

    
983
            FileSystem fs = FileSystem.get(new Configuration());
984
            //log.info("reading file : " + filename);
985

    
986
            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
987

    
988
            StringBuilder sb = new StringBuilder();
989
            String line = br.readLine();
990

    
991
            while (line != null) {
992
                if (!line.equals("[]")) {
993
                    sb.append(line);
994
                }
995
                //sb.append(line);
996
                line = br.readLine();
997
            }
998
            result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
999
            if (result.equals("")) {
1000
                result = "[]";
1001
            }
1002

    
1003
            //fs.close();
1004
        } catch (Exception e) {
1005
            log.error(e);
1006
            throw new Exception(e);
1007
        }
1008

    
1009
        return result;
1010
    }
1011

    
1012
    private Connection getConnection() throws SQLException {
1013
        return ConnectDB.getConnection();
1014
    }
1015
}
(7-7/11)