Project

General

Profile

1
package eu.dnetlib.usagestats.export;
2

    
3
import java.io.*;
4
import java.net.URLDecoder;
5
import java.sql.Connection;
6
import java.sql.PreparedStatement;
7
import java.sql.SQLException;
8
import java.sql.Statement;
9
import java.sql.Timestamp;
10
import java.text.SimpleDateFormat;
11
import java.util.*;
12
import java.util.regex.Matcher;
13
import java.util.regex.Pattern;
14
import java.util.stream.Stream;
15

    
16
import org.apache.hadoop.conf.Configuration;
17
import org.apache.hadoop.fs.LocatedFileStatus;
18
import org.apache.hadoop.fs.Path;
19
import org.apache.hadoop.fs.FileSystem;
20
import org.apache.hadoop.fs.RemoteIterator;
21
import org.apache.log4j.Logger;
22
import org.json.simple.JSONArray;
23
import org.json.simple.JSONObject;
24
import org.json.simple.parser.JSONParser;
25

    
26
public class PiwikStatsDB {
27

    
28
    private String logPath;
29
    private String logRepoPath;
30
    private String logPortalPath;
31

    
32
    private Statement stmt = null;
33

    
34
    private final Logger log = Logger.getLogger(this.getClass());
35
    private String CounterRobotsURL;
36
    private ArrayList robotsList;
37

    
38

    
39
    public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception {
40
        this.logRepoPath = logRepoPath;
41
        this.logPortalPath = logPortalPath;
42
        this.createTables();
43
        this.createTmpTables();
44
    }
45

    
46
    public void foo() {
47
        Stream<String> s = Arrays.stream(new String[] {"a", "b", "c", "d"});
48

    
49
        System.out.println(s.parallel().count());
50
    }
51

    
52
    public ArrayList getRobotsList() {
53
        return robotsList;
54
    }
55

    
56
    public void setRobotsList(ArrayList robotsList) {
57
        this.robotsList = robotsList;
58
    }
59

    
60
    public String getCounterRobotsURL() {
61
        return CounterRobotsURL;
62
    }
63

    
64
    public void setCounterRobotsURL(String CounterRobotsURL) {
65
        this.CounterRobotsURL = CounterRobotsURL;
66
    }
67

    
68
    private void createTables() throws Exception {
69
        try {
70
            stmt = ConnectDB.getConnection().createStatement();
71
            String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklog(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
72
            String sqlcreateRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
73
                    + " ON INSERT TO piwiklog "
74
                    + " WHERE (EXISTS ( SELECT piwiklog.source, piwiklog.id_visit,"
75
                    + "piwiklog.action, piwiklog.\"timestamp\", piwiklog.entity_id "
76
                    + "FROM piwiklog "
77
                    + "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
78
            String sqlCreateRuleIndexPiwikLog = "create index if not exists piwiklog_rule on piwiklog(source, id_visit, action, entity_id, \"timestamp\");";
79
            stmt.executeUpdate(sqlCreateTablePiwikLog);
80
            stmt.executeUpdate(sqlcreateRulePiwikLog);
81
            stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
82

    
83
            String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
84
            String sqlcreateRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
85
                    + " ON INSERT TO process_portal_log "
86
                    + " WHERE (EXISTS ( SELECT process_portal_log.source, process_portal_log.id_visit,"
87
                    + "process_portal_log.\"timestamp\" "
88
                    + "FROM process_portal_log "
89
                    + "WHERE process_portal_log.source = new.source AND process_portal_log.id_visit = new.id_visit AND process_portal_log.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
90
            String sqlCreateRuleIndexPortalLog = "create index if not exists process_portal_log_rule on process_portal_log(source, id_visit, \"timestamp\");";
91
            stmt.executeUpdate(sqlCreateTablePortalLog);
92
            stmt.executeUpdate(sqlcreateRulePortalLog);
93
            stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
94

    
95
            stmt.close();
96
            ConnectDB.getConnection().close();
97
            log.info("Usage Tables Created");
98

    
99
        } catch (Exception e) {
100
            log.error("Failed to create tables: " + e);
101
            throw new Exception("Failed to create tables: " + e.toString(), e);
102
        }
103
    }
104

    
105
    private void createTmpTables() throws Exception {
106
        try {
107
            Statement stmt = ConnectDB.getConnection().createStatement();
108
            String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklogtmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
109
            String sqlcreateTmpRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
110
                    + " ON INSERT TO piwiklogtmp "
111
                    + " WHERE (EXISTS ( SELECT piwiklogtmp.source, piwiklogtmp.id_visit,"
112
                    + "piwiklogtmp.action, piwiklogtmp.\"timestamp\", piwiklogtmp.entity_id "
113
                    + "FROM piwiklogtmp "
114
                    + "WHERE piwiklogtmp.source = new.source AND piwiklogtmp.id_visit = new.id_visit AND piwiklogtmp.action = new.action AND piwiklogtmp.entity_id = new.entity_id AND piwiklogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
115
            stmt.executeUpdate(sqlCreateTmpTablePiwikLog);
116
            stmt.executeUpdate(sqlcreateTmpRulePiwikLog);
117

    
118
            //String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;";
119
            //stmt.executeUpdate(sqlCopyPublicPiwiklog);
120
            String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
121
            String sqlcreateTmpRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
122
                    + " ON INSERT TO process_portal_log_tmp "
123
                    + " WHERE (EXISTS ( SELECT process_portal_log_tmp.source, process_portal_log_tmp.id_visit,"
124
                    + "process_portal_log_tmp.\"timestamp\" "
125
                    + "FROM process_portal_log_tmp "
126
                    + "WHERE process_portal_log_tmp.source = new.source AND process_portal_log_tmp.id_visit = new.id_visit AND process_portal_log_tmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
127
            stmt.executeUpdate(sqlCreateTmpTablePortalLog);
128
            stmt.executeUpdate(sqlcreateTmpRulePortalLog);
129

    
130
            stmt.close();
131
            log.info("Usage Tmp Tables Created");
132

    
133
        } catch (Exception e) {
134
            log.error("Failed to create tmptables: " + e);
135
            throw new Exception("Failed to create tmp tables: " + e.toString(), e);
136
            //System.exit(0);
137
        }
138
    }
139

    
140
    public void processLogs() throws Exception {
141
        try {
142
            ReadCounterRobotsList counterRobots = new ReadCounterRobotsList(this.getCounterRobotsURL());
143
            this.robotsList = counterRobots.getRobotsPatterns();
144

    
145
            processRepositoryLog();
146
            log.info("repository process done");
147
            removeDoubleClicks();
148
            log.info("removing double clicks done");
149
            cleanOAI();
150
            log.info("cleaning oai done");
151

    
152
            viewsStats();
153
            downloadsStats();
154

    
155
            processPortalLog();
156
            log.info("portal process done");
157
            
158
            portalStats();
159
            log.info("portal usagestats done");
160

    
161
            updateProdTables();
162
            log.info("updateProdTables done");
163

    
164
        } catch (Exception e) {
165
            log.error("Failed to process logs: " + e);
166
            throw new Exception("Failed to process logs: " + e.toString(), e);
167
        }
168
    }
169

    
170
//    public void usageStats() throws Exception {
171
//        try {
172
//            viewsStats();
173
//            downloadsStats();
174
//            log.info("stat tables and views done");
175
//        } catch (Exception e) {
176
//            log.error("Failed to create usage usagestats: " + e);
177
//            throw new Exception("Failed to create usage usagestats: " + e.toString(), e);
178
//        }
179
//    }
180

    
181
    public void processRepositoryLog() throws Exception {
182
        Statement stmt = ConnectDB.getConnection().createStatement();
183
        ConnectDB.getConnection().setAutoCommit(false);
184

    
185
        ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
186
//        File dir = new File(this.logRepoPath);
187
//        File[] jsonFiles = dir.listFiles();
188

    
189
        PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO piwiklogtmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
190
        int batch_size = 0;
191
        JSONParser parser = new JSONParser();
192
        for (String jsonFile : jsonFiles) {
193
            System.out.println(jsonFile);
194
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
195
            for (Object aJsonArray : jsonArray) {
196
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
197
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
198
                String idVisit = jsonObjectRow.get("idVisit").toString();
199
                String country = jsonObjectRow.get("country").toString();
200
                String referrerName = jsonObjectRow.get("referrerName").toString();
201
                String agent = jsonObjectRow.get("browser").toString();
202
                boolean botFound = false;
203
                Iterator it = robotsList.iterator();
204
                while (it.hasNext()) {
205
                    // Create a Pattern object
206
                    Pattern r = Pattern.compile(it.next().toString());
207
                    // Now create matcher object.
208
                    Matcher m = r.matcher(agent);
209
                    if (m.find()) {
210
                        //System.out.println("Found value: " + m.group(0));
211
                        botFound = true;
212
                        break;
213
                    }
214
                }
215
                if (botFound == false) {
216
                    String sourceItemType = "repItem";
217

    
218
                    JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
219
                    for (Object actionDetail : actionDetails) {
220
                        JSONObject actionDetailsObj = (JSONObject) actionDetail;
221

    
222
                        if (actionDetailsObj.get("customVariables") != null) {
223
                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
224
                            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
225
                            Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
226
                            String url = actionDetailsObj.get("url").toString();
227
                            String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1")).get("customVariablePageValue1").toString();
228
                            String action = actionDetailsObj.get("type").toString();
229

    
230
                            prepStatem.setInt(1, idSite);
231
                            prepStatem.setString(2, idVisit);
232
                            prepStatem.setString(3, country);
233
                            prepStatem.setString(4, action);
234
                            prepStatem.setString(5, url);
235
                            prepStatem.setString(6, oaipmh);
236
                            prepStatem.setString(7, sourceItemType);
237
                            prepStatem.setString(8, simpleDateFormat.format(timestamp));
238
                            prepStatem.setString(9, referrerName);
239
                            prepStatem.setString(10, agent);
240
                            prepStatem.addBatch();
241
                            batch_size++;
242
                            if (batch_size == 10000) {
243
                                prepStatem.executeBatch();
244
                                ConnectDB.getConnection().commit();
245
                                batch_size = 0;
246
                            }
247
                        }
248
                    }
249
                }
250
            }
251
        }
252
        prepStatem.executeBatch();
253
        ConnectDB.getConnection().commit();
254
        stmt.close();
255
    }
256

    
257
    public void removeDoubleClicks() throws Exception {
258
        Statement stmt = ConnectDB.getConnection().createStatement();
259
        ConnectDB.getConnection().setAutoCommit(false);
260

    
261
        //clean download double clicks
262
        String sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
263
        stmt.executeUpdate(sql);
264
        stmt.close();
265
        ConnectDB.getConnection().commit();
266

    
267
        stmt = ConnectDB.getConnection().createStatement();
268

    
269
        //clean view double clicks
270
        sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
271
        stmt.executeUpdate(sql);
272
        stmt.close();
273
        ConnectDB.getConnection().commit();
274
    }
275

    
276
    public void viewsStats() throws Exception {
277
        Statement stmt = ConnectDB.getConnection().createStatement();
278
        ConnectDB.getConnection().setAutoCommit(false);
279

    
280
        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
281
        String sql = "CREATE OR REPLACE VIEW result_views_monthly_tmp AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
282
        stmt.executeUpdate(sql);
283

    
284
        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
285
        sql = "CREATE TABLE IF NOT EXISTS views_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly_tmp p, public.datasource d, public.result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
286
        stmt.executeUpdate(sql);
287

    
288
        sql = "CREATE TABLE IF NOT EXISTS views_stats (like views_stats_tmp including all)";
289
        stmt.executeUpdate(sql);
290

    
291
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
292
        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly_tmp p, public.datasource d, public.result_oids ro where p.source='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
293
        stmt.executeUpdate(sql);
294

    
295
        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats (like pageviews_stats_tmp including all)";
296
        stmt.executeUpdate(sql);
297
        
298
        stmt.close();
299
        ConnectDB.getConnection().commit();
300
        ConnectDB.getConnection().close();
301
    }
302

    
303
//    public void viewsStats(String piwikid) throws Exception {
304
//        stmt = ConnectDB.getConnection().createStatement();
305
//        ConnectDB.getConnection().setAutoCommit(false);
306
//
307
//        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
308
//        String sql = "CREATE OR REPLACE VIEW result_views_monthly" + piwikid + " AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog" + piwikid + " where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
309
//        stmt.executeUpdate(sql);
310
//
311
//        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
312
//        sql = "CREATE TABLE IF NOT EXISTS views_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source!='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
313
//        stmt.executeUpdate(sql);
314
//
315
////        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
316
//        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
317
//        stmt.executeUpdate(sql);
318
//
319
//        sql = "DROP VIEW IF EXISTS result_views_monthly" + piwikid + ";";
320
//        stmt.executeUpdate(sql);
321
//
322
//        stmt.close();
323
//        ConnectDB.getConnection().commit();
324
//        ConnectDB.getConnection().close();
325
//    }
326

    
327
     private void downloadsStats() throws Exception {
328
        Statement stmt = ConnectDB.getConnection().createStatement();
329
        ConnectDB.getConnection().setAutoCommit(false);
330

    
331
         //String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
332
        String sql = "CREATE OR REPLACE VIEW result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
333
        stmt.executeUpdate(sql);
334

    
335
        //sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
336
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
337
        sql = "CREATE TABLE IF NOT EXISTS downloads_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM result_downloads_monthly_tmp p, public.datasource d, public.result_oids ro where p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
338
        stmt.executeUpdate(sql);
339
        
340
        sql = "CREATE TABLE IF NOT EXISTS downloads_stats (like downloads_stats_tmp including all)";
341
        stmt.executeUpdate(sql);
342
        
343
        sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp;";
344
        stmt.executeUpdate(sql);
345

    
346
        stmt.close();
347
        ConnectDB.getConnection().commit();
348
        ConnectDB.getConnection().close();
349
    }
350

    
351
    public void finalizeStats() throws Exception {
352
        stmt = ConnectDB.getConnection().createStatement();
353
        ConnectDB.getConnection().setAutoCommit(false);
354

    
355
        Calendar startCalendar = Calendar.getInstance();
356
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
357
        Calendar endCalendar = Calendar.getInstance();
358
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
359
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
360

    
361
//        String sql = "SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date INTO full_dates FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
362
        String sql = "CREATE TABLE IF NOT EXISTS full_dates AS SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
363
        stmt.executeUpdate(sql);
364

    
365
        sql = "CREATE INDEX IF NOT EXISTS full_dates_full_date ON full_dates USING btree(full_date);";
366
        stmt.executeUpdate(sql);
367

    
368
        sql = "CREATE INDEX IF NOT EXISTS views_stats_source ON views_stats USING btree(source);";
369
        stmt.executeUpdate(sql);
370

    
371
        sql = "CREATE INDEX IF NOT EXISTS views_stats_repository_id ON views_stats USING btree(repository_id);";
372
        stmt.executeUpdate(sql);
373

    
374
        sql = "CREATE INDEX IF NOT EXISTS views_stats_result_id ON views_stats USING btree(result_id);";
375
        stmt.executeUpdate(sql);
376

    
377
        sql = "CREATE INDEX IF NOT EXISTS views_stats_date ON views_stats USING btree(date);";
378
        stmt.executeUpdate(sql);
379

    
380
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_source ON pageviews_stats USING btree(source);";
381
        stmt.executeUpdate(sql);
382

    
383
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_repository_id ON pageviews_stats USING btree(repository_id);";
384
        stmt.executeUpdate(sql);
385

    
386
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_result_id ON pageviews_stats USING btree(result_id);";
387
        stmt.executeUpdate(sql);
388

    
389
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_date ON pageviews_stats USING btree(date);";
390
        stmt.executeUpdate(sql);
391

    
392
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_source ON downloads_stats USING btree(source);";
393
        stmt.executeUpdate(sql);
394

    
395
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_repository_id ON downloads_stats USING btree(repository_id);";
396
        stmt.executeUpdate(sql);
397

    
398
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_result_id ON downloads_stats USING btree(result_id);";
399
        stmt.executeUpdate(sql);
400

    
401
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_date ON downloads_stats USING btree(date);";
402
        stmt.executeUpdate(sql);
403

    
404
//        sql = "SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views INTO usage_stats FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
405
        sql = "CREATE TABLE IF NOT EXISTS usage_stats AS SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
406
        stmt.executeUpdate(sql);
407
        
408
        sql = "INSERT INTO usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats_tmp AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
409
        stmt.executeUpdate(sql);
410

    
411
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_source ON usage_stats USING btree(source);";
412
        stmt.executeUpdate(sql);
413

    
414
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_repository_id ON usage_stats USING btree(repository_id);";
415
        stmt.executeUpdate(sql);
416

    
417
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_result_id ON usage_stats USING btree(result_id);";
418
        stmt.executeUpdate(sql);
419

    
420
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_date ON usage_stats USING btree(date);";
421
        stmt.executeUpdate(sql);
422
       
423
        sql = "DROP TABLE IF EXISTS process_portal_log_tmp;";
424
        stmt.executeUpdate(sql);
425

    
426
        sql = "DROP TABLE IF EXISTS pageviews_stats_tmp;";
427
        stmt.executeUpdate(sql);
428
        
429
        sql = "DROP VIEW IF EXISTS  result_views_monthly_tmp";
430
        stmt.executeUpdate(sql);
431

    
432
        sql = "DROP TABLE IF EXISTS views_stats_tmp;";
433
        stmt.executeUpdate(sql);
434
        
435
        sql = "DROP TABLE IF EXISTS downloads_stats_tmp;";
436
        stmt.executeUpdate(sql);
437
        
438
        sql = "DROP TABLE IF EXISTS process_portal_log_tmp;";
439
        stmt.executeUpdate(sql);
440

    
441
        sql = "DROP TABLE IF EXISTS piwiklogtmp;";
442
        stmt.executeUpdate(sql);
443

    
444
        sql = "DROP TABLE IF EXISTS sushilogtmp;";
445
        stmt.executeUpdate(sql);
446

    
447
        stmt.close();
448
        ConnectDB.getConnection().commit();
449
        ConnectDB.getConnection().close();
450
    }
451

    
452
    //Create repository Views statistics
453
    private void repositoryViewsStats() throws Exception {
454
        stmt = ConnectDB.getConnection().createStatement();
455
        ConnectDB.getConnection().setAutoCommit(false);
456

    
457
//        String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
458
        String sql = "CREATE TABLE IF NOT EXISTS repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
459
        stmt.executeUpdate(sql);
460

    
461
        sql = "CREATE INDEX repo_view_stats_id ON repo_view_stats USING btree (id)";
462
        stmt.executeUpdate(sql);
463

    
464
        sql = "CREATE INDEX repo_view_stats_date ON repo_view_stats USING btree(date)";
465
        stmt.executeUpdate(sql);
466

    
467
//        sql = "SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_view_stats_monthly_clean FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
468
        sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly_clean AS SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
469
        stmt.executeUpdate(sql);
470

    
471
        sql = "CREATE INDEX repo_view_stats_monthly_clean_id ON repo_view_stats_monthly_clean USING btree (id)";
472
        stmt.executeUpdate(sql);
473

    
474
        sql = "CREATE INDEX repo_view_stats_monthly_clean_month ON repo_view_stats_monthly_clean USING btree(month)";
475
        stmt.executeUpdate(sql);
476

    
477
        sql = "CREATE INDEX repo_view_stats_monthly_clean_source ON repo_view_stats_monthly_clean USING btree(source)";
478
        stmt.executeUpdate(sql);
479

    
480
        Calendar startCalendar = Calendar.getInstance();
481
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
482
        Calendar endCalendar = Calendar.getInstance();
483
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
484
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
485

    
486
        //sql="CREATE OR REPLACE view repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
487
//        sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_view_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
488
        sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
489
        stmt.executeUpdate(sql);
490

    
491
        sql = "CREATE INDEX repo_view_stats_monthly_id ON repo_view_stats_monthly USING btree (id)";
492
        stmt.executeUpdate(sql);
493

    
494
        sql = "CREATE INDEX repo_view_stats_monthly_month ON repo_view_stats_monthly USING btree(month)";
495
        stmt.executeUpdate(sql);
496

    
497
        sql = "CREATE INDEX repo_view_stats_monthly_source ON repo_view_stats_monthly USING btree(source)";
498
        stmt.executeUpdate(sql);
499

    
500
        sql = "CREATE OR REPLACE view repo_view_stats_monthly_sushi AS SELECT id, sum(number_of_views), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_view_stats group by id, month, source;";
501
        stmt.executeUpdate(sql);
502

    
503
        stmt.close();
504
        ConnectDB.getConnection().commit();
505
        ConnectDB.getConnection().close();
506
    }
507

    
508
    //Create repository downloads statistics
509
    private void repositoryDownloadsStats() throws Exception {
510
        stmt = ConnectDB.getConnection().createStatement();
511
        ConnectDB.getConnection().setAutoCommit(false);
512

    
513
//        String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
514
        String sql = "CREATE TABLE IF NOT EXISTS repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
515
        stmt.executeUpdate(sql);
516

    
517
        sql = "CREATE INDEX repo_download_stats_id ON repo_download_stats USING btree (id)";
518
        stmt.executeUpdate(sql);
519

    
520
        sql = "CREATE INDEX repo_download_stats_date ON repo_download_stats USING btree(date)";
521
        stmt.executeUpdate(sql);
522

    
523
//        sql = "SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_download_stats_monthly_clean FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
524
        sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly_clean AS SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
525
        stmt.executeUpdate(sql);
526

    
527
        sql = "CREATE INDEX repo_download_stats_monthly_clean_id ON repo_download_stats_monthly_clean USING btree (id)";
528
        stmt.executeUpdate(sql);
529

    
530
        sql = "CREATE INDEX repo_download_stats_monthly_clean_month ON repo_download_stats_monthly_clean USING btree(month)";
531
        stmt.executeUpdate(sql);
532

    
533
        sql = "CREATE INDEX repo_download_stats_monthly_clean_source ON repo_download_stats_monthly_clean USING btree(source)";
534
        stmt.executeUpdate(sql);
535

    
536
        Calendar startCalendar = Calendar.getInstance();
537
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
538
        Calendar endCalendar = Calendar.getInstance();
539
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
540
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
541

    
542
        //sql="CREATE OR REPLACE view repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
543
        // sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_download_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
544
        sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
545
        stmt.executeUpdate(sql);
546

    
547
        sql = "CREATE INDEX repo_download_stats_monthly_id ON repo_download_stats_monthly USING btree (id)";
548
        stmt.executeUpdate(sql);
549

    
550
        sql = "CREATE INDEX repo_download_stats_monthly_month ON repo_download_stats_monthly USING btree(month)";
551
        stmt.executeUpdate(sql);
552

    
553
        sql = "CREATE INDEX repo_download_stats_monthly_source ON repo_download_stats_monthly USING btree(source)";
554
        stmt.executeUpdate(sql);
555

    
556
        sql = "CREATE OR REPLACE view repo_download_stats_monthly_sushi AS SELECT id, sum(number_of_downloads), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_download_stats group by id, month, source;";
557
        stmt.executeUpdate(sql);
558

    
559
        stmt.close();
560
        ConnectDB.getConnection().commit();
561
        ConnectDB.getConnection().close();
562
    }
563

    
564
    // Import OPENAIRE Logs to DB
565
    public void processPortalLog() throws Exception {
566
        Statement stmt = ConnectDB.getConnection().createStatement();
567
        ConnectDB.getConnection().setAutoCommit(false);
568

    
569
        ArrayList<String> jsonFiles = listHdfsDir(this.logPortalPath);
570
//        File folder = new File(this.logPortalPath);
571
//        File[] jsonFiles = folder.listFiles();
572

    
573
        PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO process_portal_log_tmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
574
        int batch_size = 0;
575
        JSONParser parser = new JSONParser();
576
        for (String jsonFile : jsonFiles) {
577
            System.out.println(jsonFile);
578
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
579

    
580
            for (Object aJsonArray : jsonArray) {
581
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
582
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
583
                String idVisit = jsonObjectRow.get("idVisit").toString();
584
                String country = jsonObjectRow.get("country").toString();
585
                String referrerName = jsonObjectRow.get("referrerName").toString();
586
                String agent = jsonObjectRow.get("browser").toString();
587
                boolean botFound = false;
588
                Iterator it = robotsList.iterator();
589
                while (it.hasNext()) {
590
                    // Create a Pattern object
591
                    Pattern r = Pattern.compile(it.next().toString());
592
                    // Now create matcher object.
593
                    Matcher m = r.matcher(agent);
594
                    if (m.find()) {
595
                        botFound = true;
596
                        break;
597
                    }
598
                }
599
                if (botFound == false) {
600
                    JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
601
                    for (Object actionDetail : actionDetails) {
602
                        JSONObject actionDetailsObj = (JSONObject) actionDetail;
603

    
604
                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
605
                        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
606
                        Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
607

    
608
                        String action = actionDetailsObj.get("type").toString();
609
                        String url = actionDetailsObj.get("url").toString();
610

    
611
                        String entityID = processPortalURL(url);
612
                        String sourceItemType = "";
613

    
614
                        if (entityID.indexOf("|") > 0) {
615
                            sourceItemType = entityID.substring(0, entityID.indexOf("|"));
616
                            entityID = entityID.substring(entityID.indexOf("|") + 1);
617
                        }
618

    
619
                        prepStatem.setInt(1, idSite);
620
                        prepStatem.setString(2, idVisit);
621
                        prepStatem.setString(3, country);
622
                        prepStatem.setString(4, action);
623
                        prepStatem.setString(5, url);
624
                        prepStatem.setString(6, entityID);
625
                        prepStatem.setString(7, sourceItemType);
626
                        prepStatem.setString(8, simpleDateFormat.format(timestamp));
627
                        prepStatem.setString(9, referrerName);
628
                        prepStatem.setString(10, agent);
629

    
630
                        prepStatem.addBatch();
631
                        batch_size++;
632
                        if (batch_size == 10000) {
633
                            prepStatem.executeBatch();
634
                            ConnectDB.getConnection().commit();
635
                            batch_size = 0;
636
                        }
637
                    }
638
                }
639
            }
640
        }
641
        prepStatem.executeBatch();
642
        ConnectDB.getConnection().commit();
643

    
644
        stmt.close();
645
        ConnectDB.getConnection().close();
646
    }
647
 
648
    public void portalStats() throws SQLException {
649
        Connection con = ConnectDB.getConnection();
650
        Statement stmt = con.createStatement();
651
        con.setAutoCommit(false);
652

    
653
        String sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'oaItem\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.result_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
654
        stmt.executeUpdate(sql);
655
//        stmt.close();
656
//        con.commit();
657

    
658
        stmt = con.createStatement();
659
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'datasource\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.datasource_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
660
        stmt.executeUpdate(sql);
661
//        stmt.close();
662
//        con.commit();
663

    
664
        stmt = con.createStatement();
665
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'organization\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.organization_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
666
        stmt.executeUpdate(sql);
667
//        stmt.close();
668
//        con.commit();
669

    
670
        stmt = con.createStatement();
671
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'project\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.project_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
672
        stmt.executeUpdate(sql);
673
 //       stmt.close();
674
//        con.commit();
675
        stmt.close();
676
        ConnectDB.getConnection().commit();
677
        ConnectDB.getConnection().close();
678
    }
679

    
680
    private void cleanOAI() throws Exception {
681
        ConnectDB.getConnection().setAutoCommit(false);
682

    
683
        stmt = ConnectDB.getConnection().createStatement();
684
        String sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chlc.min-saude.pt/','oai:repositorio.chlc.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.chlc.min-saude.pt/%';";
685
        stmt.executeUpdate(sql);
686
        stmt.close();
687
        ConnectDB.getConnection().commit();
688

    
689
        stmt = ConnectDB.getConnection().createStatement();
690
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hospitaldebraga.pt/','oai:repositorio.hospitaldebraga.pt:') WHERE entity_id LIKE 'oai:repositorio.hospitaldebraga.pt/%';";
691
        stmt.executeUpdate(sql);
692
        stmt.close();
693
        ConnectDB.getConnection().commit();
694

    
695
        stmt = ConnectDB.getConnection().createStatement();
696
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipl.pt/','oai:repositorio.ipl.pt:') WHERE entity_id LIKE 'oai:repositorio.ipl.pt/%';";
697
        stmt.executeUpdate(sql);
698
        stmt.close();
699
        ConnectDB.getConnection().commit();
700

    
701
        stmt = ConnectDB.getConnection().createStatement();
702
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:bibliotecadigital.ipb.pt/','oai:bibliotecadigital.ipb.pt:') WHERE entity_id LIKE 'oai:bibliotecadigital.ipb.pt/%';";
703
        stmt.executeUpdate(sql);
704
        stmt.close();
705
        ConnectDB.getConnection().commit();
706

    
707
        stmt = ConnectDB.getConnection().createStatement();
708
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ismai.pt/','oai:repositorio.ismai.pt:') WHERE entity_id LIKE 'oai:repositorio.ismai.pt/%';";
709
        stmt.executeUpdate(sql);
710
        stmt.close();
711
        ConnectDB.getConnection().commit();
712

    
713
        stmt = ConnectDB.getConnection().createStatement();
714
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorioaberto.uab.pt/','oai:repositorioaberto.uab.pt:') WHERE entity_id LIKE 'oai:repositorioaberto.uab.pt/%';";
715
        stmt.executeUpdate(sql);
716
        stmt.close();
717
        ConnectDB.getConnection().commit();
718

    
719
        stmt = ConnectDB.getConnection().createStatement();
720
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.uac.pt/','oai:repositorio.uac.pt:') WHERE entity_id LIKE 'oai:repositorio.uac.pt/%';";
721
        stmt.executeUpdate(sql);
722
        stmt.close();
723
        ConnectDB.getConnection().commit();
724

    
725
        stmt = ConnectDB.getConnection().createStatement();
726
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.insa.pt/','oai:repositorio.insa.pt:') WHERE entity_id LIKE 'oai:repositorio.insa.pt/%';";
727
        stmt.executeUpdate(sql);
728
        stmt.close();
729
        ConnectDB.getConnection().commit();
730

    
731
        stmt = ConnectDB.getConnection().createStatement();
732
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipcb.pt/','oai:repositorio.ipcb.pt:') WHERE entity_id LIKE 'oai:repositorio.ipcb.pt/%';";
733
        stmt.executeUpdate(sql);
734
        stmt.close();
735
        ConnectDB.getConnection().commit();
736

    
737
        stmt = ConnectDB.getConnection().createStatement();
738
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ispa.pt/','oai:repositorio.ispa.pt:') WHERE entity_id LIKE 'oai:repositorio.ispa.pt/%';";
739
        stmt.executeUpdate(sql);
740
        stmt.close();
741
        ConnectDB.getConnection().commit();
742

    
743
        stmt = ConnectDB.getConnection().createStatement();
744
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chporto.pt/','oai:repositorio.chporto.pt:') WHERE entity_id LIKE 'oai:repositorio.chporto.pt/%';";
745
        stmt.executeUpdate(sql);
746
        stmt.close();
747
        ConnectDB.getConnection().commit();
748

    
749
        stmt = ConnectDB.getConnection().createStatement();
750
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ucp.pt/','oai:repositorio.ucp.pt:') WHERE entity_id LIKE 'oai:repositorio.ucp.pt/%';";
751
        stmt.executeUpdate(sql);
752
        stmt.close();
753
        ConnectDB.getConnection().commit();
754

    
755
        stmt = ConnectDB.getConnection().createStatement();
756
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:rihuc.huc.min-saude.pt/','oai:rihuc.huc.min-saude.pt:') WHERE entity_id LIKE 'oai:rihuc.huc.min-saude.pt/%';";
757
        stmt.executeUpdate(sql);
758
        stmt.close();
759
        ConnectDB.getConnection().commit();
760

    
761
        stmt = ConnectDB.getConnection().createStatement();
762
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipv.pt/','oai:repositorio.ipv.pt:') WHERE entity_id LIKE 'oai:repositorio.ipv.pt/%';";
763
        stmt.executeUpdate(sql);
764
        stmt.close();
765
        ConnectDB.getConnection().commit();
766

    
767
        stmt = ConnectDB.getConnection().createStatement();
768
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:www.repository.utl.pt/','oai:www.repository.utl.pt:') WHERE entity_id LIKE 'oai:www.repository.utl.pt/%';";
769
        stmt.executeUpdate(sql);
770
        stmt.close();
771
        ConnectDB.getConnection().commit();
772

    
773
        stmt = ConnectDB.getConnection().createStatement();
774
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:run.unl.pt/','oai:run.unl.pt:') WHERE entity_id LIKE 'oai:run.unl.pt/%';";
775
        stmt.executeUpdate(sql);
776
        stmt.close();
777
        ConnectDB.getConnection().commit();
778

    
779
        stmt = ConnectDB.getConnection().createStatement();
780
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:sapientia.ualg.pt/','oai:sapientia.ualg.pt:') WHERE entity_id LIKE 'oai:sapientia.ualg.pt/%';";
781
        stmt.executeUpdate(sql);
782
        stmt.close();
783
        ConnectDB.getConnection().commit();
784

    
785
        stmt = ConnectDB.getConnection().createStatement();
786
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipsantarem.pt/','oai:repositorio.ipsantarem.pt:') WHERE entity_id LIKE 'oai:repositorio.ipsantarem.pt/%';";
787
        stmt.executeUpdate(sql);
788
        stmt.close();
789
        ConnectDB.getConnection().commit();
790

    
791
        stmt = ConnectDB.getConnection().createStatement();
792
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:arca.igc.gulbenkian.pt/','oai:arca.igc.gulbenkian.pt:') WHERE entity_id LIKE 'oai:arca.igc.gulbenkian.pt/%';";
793
        stmt.executeUpdate(sql);
794
        stmt.close();
795
        ConnectDB.getConnection().commit();
796

    
797
        stmt = ConnectDB.getConnection().createStatement();
798
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:ubibliorum.ubi.pt/','oai:ubibliorum.ubi.pt:') WHERE entity_id LIKE 'oai:ubibliorum.ubi.pt/%';";
799
        stmt.executeUpdate(sql);
800
        stmt.close();
801
        ConnectDB.getConnection().commit();
802

    
803
        stmt = ConnectDB.getConnection().createStatement();
804
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:digituma.uma.pt/','oai:digituma.uma.pt:') WHERE entity_id LIKE 'oai:digituma.uma.pt/%';";
805
        stmt.executeUpdate(sql);
806
        stmt.close();
807
        ConnectDB.getConnection().commit();
808

    
809
        stmt = ConnectDB.getConnection().createStatement();
810
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ul.pt/','oai:repositorio.ul.pt:') WHERE entity_id LIKE 'oai:repositorio.ul.pt/%';";
811
        stmt.executeUpdate(sql);
812
        stmt.close();
813
        ConnectDB.getConnection().commit();
814

    
815
        stmt = ConnectDB.getConnection().createStatement();
816
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hff.min-saude.pt/','oai:repositorio.hff.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.hff.min-saude.pt/%';";
817
        stmt.executeUpdate(sql);
818
        stmt.close();
819
        ConnectDB.getConnection().commit();
820

    
821
        stmt = ConnectDB.getConnection().createStatement();
822
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorium.sdum.uminho.pt/','oai:repositorium.sdum.uminho.pt:') WHERE entity_id LIKE 'oai:repositorium.sdum.uminho.pt/%';";
823
        stmt.executeUpdate(sql);
824
        stmt.close();
825
        ConnectDB.getConnection().commit();
826

    
827
        stmt = ConnectDB.getConnection().createStatement();
828
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:recipp.ipp.pt/','oai:recipp.ipp.pt:') WHERE entity_id LIKE 'oai:recipp.ipp.pt/%';";
829
        stmt.executeUpdate(sql);
830
        stmt.close();
831
        ConnectDB.getConnection().commit();
832

    
833
        stmt = ConnectDB.getConnection().createStatement();
834
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:bdigital.ufp.pt/','oai:bdigital.ufp.pt:') WHERE entity_id LIKE 'oai:bdigital.ufp.pt/%';";
835
        stmt.executeUpdate(sql);
836
        stmt.close();
837
        ConnectDB.getConnection().commit();
838

    
839
        stmt = ConnectDB.getConnection().createStatement();
840
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.lneg.pt/','oai:repositorio.lneg.pt:') WHERE entity_id LIKE 'oai:repositorio.lneg.pt/%';";
841
        stmt.executeUpdate(sql);
842
        stmt.close();
843
        ConnectDB.getConnection().commit();
844

    
845
        stmt = ConnectDB.getConnection().createStatement();
846
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:iconline.ipleiria.pt/','oai:iconline.ipleiria.pt:') WHERE entity_id LIKE 'oai:iconline.ipleiria.pt/%';";
847
        stmt.executeUpdate(sql);
848
        stmt.close();
849
        ConnectDB.getConnection().commit();
850

    
851
        stmt = ConnectDB.getConnection().createStatement();
852
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:comum.rcaap.pt/','oai:comum.rcaap.pt:') WHERE entity_id LIKE 'oai:comum.rcaap.pt/%';";
853
        stmt.executeUpdate(sql);
854
        stmt.close();
855
        ConnectDB.getConnection().commit();
856

    
857
        ConnectDB.getConnection().close();
858
    }
859

    
860
    private String processPortalURL(String url) {
861

    
862
        if (url.indexOf("explore.openaire.eu") > 0) {
863
            try {
864
                url = URLDecoder.decode(url, "UTF-8");
865
            } catch (Exception e) {
866
                log.info(url);
867
            }
868
            if (url.indexOf("datasourceId=") > 0 && url.substring(url.indexOf("datasourceId=") + 13).length() >= 46) {
869
                url = "datasource|" + url.substring(url.indexOf("datasourceId=") + 13, url.indexOf("datasourceId=") + 59);
870
            } else if (url.indexOf("datasource=") > 0 && url.substring(url.indexOf("datasource=") + 11).length() >= 46) {
871
                url = "datasource|" + url.substring(url.indexOf("datasource=") + 11, url.indexOf("datasource=") + 57);
872
            } else if (url.indexOf("datasourceFilter=") > 0 && url.substring(url.indexOf("datasourceFilter=") + 17).length() >= 46) {
873
                url = "datasource|" + url.substring(url.indexOf("datasourceFilter=") + 17, url.indexOf("datasourceFilter=") + 63);
874
            } else if (url.indexOf("articleId=") > 0 && url.substring(url.indexOf("articleId=") + 10).length() >= 46) {
875
                url = "result|" + url.substring(url.indexOf("articleId=") + 10, url.indexOf("articleId=") + 56);
876
            } else if (url.indexOf("datasetId=") > 0 && url.substring(url.indexOf("datasetId=") + 10).length() >= 46) {
877
                url = "result|" + url.substring(url.indexOf("datasetId=") + 10, url.indexOf("datasetId=") + 56);
878
            } else if (url.indexOf("projectId=") > 0 && url.substring(url.indexOf("projectId=") + 10).length() >= 46 && !url.contains("oai:dnet:corda")) {
879
                url = "project|" + url.substring(url.indexOf("projectId=") + 10, url.indexOf("projectId=") + 56);
880
            } else if (url.indexOf("organizationId=") > 0 && url.substring(url.indexOf("organizationId=") + 15).length() >= 46) {
881
                url = "organization|" + url.substring(url.indexOf("organizationId=") + 15, url.indexOf("organizationId=") + 61);
882
            } else {
883
                url = "";
884
            }
885
        } else {
886
            url = "";
887
        }
888

    
889
        return url;
890
    }
891

    
892
    private void updateProdTables() throws SQLException {
893
        Statement stmt = ConnectDB.getConnection().createStatement();
894
        ConnectDB.getConnection().setAutoCommit(false);
895

    
896
       String sql = "insert into piwiklog select * from piwiklogtmp;";
897
       stmt.executeUpdate(sql);
898
        
899
        sql = "insert into views_stats select * from views_stats_tmp;";
900
        stmt.executeUpdate(sql);
901
        
902
        sql = "insert into downloads_stats select * from downloads_stats_tmp;";
903
        stmt.executeUpdate(sql);
904
        
905
        sql = "insert into pageviews_stats select * from pageviews_stats_tmp;";
906
        stmt.executeUpdate(sql);
907

    
908
        stmt.close();
909
        ConnectDB.getConnection().commit();
910
        ConnectDB.getConnection().close();
911

    
912
        log.info("updateProdTables done");
913
    }
914

    
915
    private ArrayList<String> listHdfsDir(String dir) throws Exception {
916

    
917
        FileSystem hdfs = FileSystem.get(new Configuration());
918
        RemoteIterator<LocatedFileStatus> Files;
919
        ArrayList<String> fileNames = new ArrayList<>();
920

    
921
        try {
922
            Path exportPath = new Path(hdfs.getUri() + dir);
923
            Files = hdfs.listFiles(exportPath, false);
924
            while (Files.hasNext()) {
925
                String fileName = Files.next().getPath().toString();
926
                fileNames.add(fileName);
927
            }
928

    
929
            hdfs.close();
930
        } catch (Exception e) {
931
            log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logPath));
932
            throw new Exception("HDFS file path with exported data does not exist :   " + logPath, e);
933
        }
934

    
935
        return fileNames;
936
    }
937

    
938
    private String readHDFSFile(String filename) throws Exception {
939
        String result;
940
        try {
941

    
942
            FileSystem fs = FileSystem.get(new Configuration());
943
            //log.info("reading file : " + filename);
944

    
945
            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
946

    
947
            StringBuilder sb = new StringBuilder();
948
            String line = br.readLine();
949

    
950
            while (line != null) {
951
                if (!line.equals("[]")) {
952
                    sb.append(line);
953
                }
954
                //sb.append(line);
955
                line = br.readLine();
956
            }
957
            result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
958
            if (result.equals("")) {
959
                result = "[]";
960
            }
961

    
962
            //fs.close();
963
        } catch (Exception e) {
964
            log.error(e);
965
            throw new Exception(e);
966
        }
967

    
968
        return result;
969
    }
970

    
971
    private Connection getConnection() throws SQLException {
972
        return ConnectDB.getConnection();
973
    }
974
}
(5-5/9)