Project

General

Profile

1
package eu.dnetlib.usagestats.export;
2

    
3
import java.io.*;
4
import java.net.URLDecoder;
5
import java.sql.Connection;
6
import java.sql.PreparedStatement;
7
import java.sql.SQLException;
8
import java.sql.Statement;
9
import java.sql.Timestamp;
10
import java.text.SimpleDateFormat;
11
import java.util.*;
12
import java.util.regex.Matcher;
13
import java.util.regex.Pattern;
14
import java.util.stream.Stream;
15

    
16
import org.apache.hadoop.conf.Configuration;
17
import org.apache.hadoop.fs.LocatedFileStatus;
18
import org.apache.hadoop.fs.Path;
19
import org.apache.hadoop.fs.FileSystem;
20
import org.apache.hadoop.fs.RemoteIterator;
21
import org.apache.log4j.Logger;
22
import org.json.simple.JSONArray;
23
import org.json.simple.JSONObject;
24
import org.json.simple.parser.JSONParser;
25

    
26
public class PiwikStatsDB {
27

    
28
    private String logPath;
29
    private String logRepoPath;
30
    private String logPortalPath;
31

    
32
    private Statement stmt = null;
33

    
34
    private final Logger log = Logger.getLogger(this.getClass());
35
    private String CounterRobotsURL;
36
    private ArrayList robotsList;
37

    
38

    
39
    public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception {
40
        this.logRepoPath = logRepoPath;
41
        this.logPortalPath = logPortalPath;
42
        this.createTables();
43
        this.createTmpTables();
44
    }
45

    
46
    public void foo() {
47
        Stream<String> s = Arrays.stream(new String[] {"a", "b", "c", "d"});
48

    
49
        System.out.println(s.parallel().count());
50
    }
51

    
52
    public ArrayList getRobotsList() {
53
        return robotsList;
54
    }
55

    
56
    public void setRobotsList(ArrayList robotsList) {
57
        this.robotsList = robotsList;
58
    }
59

    
60
    public String getCounterRobotsURL() {
61
        return CounterRobotsURL;
62
    }
63

    
64
    public void setCounterRobotsURL(String CounterRobotsURL) {
65
        this.CounterRobotsURL = CounterRobotsURL;
66
    }
67

    
68
    private void createTables() throws Exception {
69
        try {
70
            stmt = ConnectDB.getConnection().createStatement();
71
            String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklog(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
72
            String sqlcreateRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
73
                    + " ON INSERT TO piwiklog "
74
                    + " WHERE (EXISTS ( SELECT piwiklog.source, piwiklog.id_visit,"
75
                    + "piwiklog.action, piwiklog.\"timestamp\", piwiklog.entity_id "
76
                    + "FROM piwiklog "
77
                    + "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
78
            String sqlCreateRuleIndexPiwikLog = "create index if not exists piwiklog_rule on piwiklog(source, id_visit, action, entity_id, \"timestamp\");";
79
            stmt.executeUpdate(sqlCreateTablePiwikLog);
80
            stmt.executeUpdate(sqlcreateRulePiwikLog);
81
            stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
82

    
83
            String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
84
            String sqlcreateRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
85
                    + " ON INSERT TO process_portal_log "
86
                    + " WHERE (EXISTS ( SELECT process_portal_log.source, process_portal_log.id_visit,"
87
                    + "process_portal_log.\"timestamp\" "
88
                    + "FROM process_portal_log "
89
                    + "WHERE process_portal_log.source = new.source AND process_portal_log.id_visit = new.id_visit AND process_portal_log.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
90
            String sqlCreateRuleIndexPortalLog = "create index if not exists process_portal_log_rule on process_portal_log(source, id_visit, \"timestamp\");";
91
            stmt.executeUpdate(sqlCreateTablePortalLog);
92
            stmt.executeUpdate(sqlcreateRulePortalLog);
93
            stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
94

    
95
            stmt.close();
96
            ConnectDB.getConnection().close();
97
            log.info("Usage Tables Created");
98

    
99
        } catch (Exception e) {
100
            log.error("Failed to create tables: " + e);
101
            throw new Exception("Failed to create tables: " + e.toString(), e);
102
        }
103
    }
104

    
105
    private void createTmpTables() throws Exception {
106
        try {
107
            Statement stmt = ConnectDB.getConnection().createStatement();
108
            String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklogtmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
109
            String sqlcreateTmpRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
110
                    + " ON INSERT TO piwiklogtmp "
111
                    + " WHERE (EXISTS ( SELECT piwiklogtmp.source, piwiklogtmp.id_visit,"
112
                    + "piwiklogtmp.action, piwiklogtmp.\"timestamp\", piwiklogtmp.entity_id "
113
                    + "FROM piwiklogtmp "
114
                    + "WHERE piwiklogtmp.source = new.source AND piwiklogtmp.id_visit = new.id_visit AND piwiklogtmp.action = new.action AND piwiklogtmp.entity_id = new.entity_id AND piwiklogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
115
            stmt.executeUpdate(sqlCreateTmpTablePiwikLog);
116
            stmt.executeUpdate(sqlcreateTmpRulePiwikLog);
117

    
118
            //String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;";
119
            //stmt.executeUpdate(sqlCopyPublicPiwiklog);
120
            String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
121
            String sqlcreateTmpRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
122
                    + " ON INSERT TO process_portal_log_tmp "
123
                    + " WHERE (EXISTS ( SELECT process_portal_log_tmp.source, process_portal_log_tmp.id_visit,"
124
                    + "process_portal_log_tmp.\"timestamp\" "
125
                    + "FROM process_portal_log_tmp "
126
                    + "WHERE process_portal_log_tmp.source = new.source AND process_portal_log_tmp.id_visit = new.id_visit AND process_portal_log_tmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
127
            stmt.executeUpdate(sqlCreateTmpTablePortalLog);
128
            stmt.executeUpdate(sqlcreateTmpRulePortalLog);
129

    
130
            stmt.close();
131
            log.info("Usage Tmp Tables Created");
132

    
133
        } catch (Exception e) {
134
            log.error("Failed to create tmptables: " + e);
135
            throw new Exception("Failed to create tmp tables: " + e.toString(), e);
136
            //System.exit(0);
137
        }
138
    }
139

    
140
    public void processLogs() throws Exception {
141
        try {
142
            ReadCounterRobotsList counterRobots = new ReadCounterRobotsList(this.getCounterRobotsURL());
143
            this.robotsList = counterRobots.getRobotsPatterns();
144

    
145
            processRepositoryLog();
146
            log.info("repository process done");
147
            removeDoubleClicks();
148
            log.info("removing double clicks done");
149
            cleanOAI();
150
            log.info("cleaning oai done");
151

    
152
            viewsStats();
153
            downloadsStats();
154

    
155
            processPortalLog();
156
            log.info("portal process done");
157
            
158
            portalStats();
159
            log.info("portal usagestats done");
160

    
161
            updateProdTables();
162
            log.info("updateProdTables done");
163

    
164
        } catch (Exception e) {
165
            log.error("Failed to process logs: " + e);
166
            throw new Exception("Failed to process logs: " + e.toString(), e);
167
        }
168
    }
169

    
170
//    public void usageStats() throws Exception {
171
//        try {
172
//            viewsStats();
173
//            downloadsStats();
174
//            log.info("stat tables and views done");
175
//        } catch (Exception e) {
176
//            log.error("Failed to create usage usagestats: " + e);
177
//            throw new Exception("Failed to create usage usagestats: " + e.toString(), e);
178
//        }
179
//    }
180

    
181
    public void processRepositoryLog() throws Exception {
182
        Statement stmt = ConnectDB.getConnection().createStatement();
183
        ConnectDB.getConnection().setAutoCommit(false);
184

    
185
        ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
186
//        File dir = new File(this.logRepoPath);
187
//        File[] jsonFiles = dir.listFiles();
188

    
189
        PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO piwiklogtmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
190
        int batch_size = 0;
191
        JSONParser parser = new JSONParser();
192
        for (String jsonFile : jsonFiles) {
193
            System.out.println(jsonFile);
194
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
195
            for (Object aJsonArray : jsonArray) {
196
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
197
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
198
                String idVisit = jsonObjectRow.get("idVisit").toString();
199
                String country = jsonObjectRow.get("country").toString();
200
                String referrerName = jsonObjectRow.get("referrerName").toString();
201
                String agent = jsonObjectRow.get("browser").toString();
202
                boolean botFound = false;
203
                Iterator it = robotsList.iterator();
204
                while (it.hasNext()) {
205
                    // Create a Pattern object
206
                    Pattern r = Pattern.compile(it.next().toString());
207
                    // Now create matcher object.
208
                    Matcher m = r.matcher(agent);
209
                    if (m.find()) {
210
                        //System.out.println("Found value: " + m.group(0));
211
                        botFound = true;
212
                        break;
213
                    }
214
                }
215
                if (botFound == false) {
216
                    String sourceItemType = "repItem";
217

    
218
                    JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
219
                    for (Object actionDetail : actionDetails) {
220
                        JSONObject actionDetailsObj = (JSONObject) actionDetail;
221

    
222
                        if (actionDetailsObj.get("customVariables") != null) {
223
                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
224
                            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
225
                            Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
226
                            String url = actionDetailsObj.get("url").toString();
227
                            String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1")).get("customVariablePageValue1").toString();
228
                            String action = actionDetailsObj.get("type").toString();
229

    
230
                            prepStatem.setInt(1, idSite);
231
                            prepStatem.setString(2, idVisit);
232
                            prepStatem.setString(3, country);
233
                            prepStatem.setString(4, action);
234
                            prepStatem.setString(5, url);
235
                            prepStatem.setString(6, oaipmh);
236
                            prepStatem.setString(7, sourceItemType);
237
                            prepStatem.setString(8, simpleDateFormat.format(timestamp));
238
                            prepStatem.setString(9, referrerName);
239
                            prepStatem.setString(10, agent);
240
                            prepStatem.addBatch();
241
                            batch_size++;
242
                            if (batch_size == 10000) {
243
                                prepStatem.executeBatch();
244
                                ConnectDB.getConnection().commit();
245
                                batch_size = 0;
246
                            }
247
                        }
248
                    }
249
                }
250
            }
251
        }
252
        prepStatem.executeBatch();
253
        ConnectDB.getConnection().commit();
254
        stmt.close();
255
    }
256

    
257
    public void removeDoubleClicks() throws Exception {
258
        Statement stmt = ConnectDB.getConnection().createStatement();
259
        ConnectDB.getConnection().setAutoCommit(false);
260

    
261
        //clean download double clicks
262
        String sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
263
        stmt.executeUpdate(sql);
264
        stmt.close();
265
        ConnectDB.getConnection().commit();
266

    
267
        stmt = ConnectDB.getConnection().createStatement();
268

    
269
        //clean view double clicks
270
        sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
271
        stmt.executeUpdate(sql);
272
        stmt.close();
273
        ConnectDB.getConnection().commit();
274
    }
275

    
276
    public void viewsStats() throws Exception {
277
        Statement stmt = ConnectDB.getConnection().createStatement();
278
        ConnectDB.getConnection().setAutoCommit(false);
279

    
280
        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
281
        String sql = "CREATE OR REPLACE VIEW result_views_monthly_tmp AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
282
        stmt.executeUpdate(sql);
283

    
284
        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
285
        sql = "CREATE TABLE IF NOT EXISTS views_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly_tmp p, public.datasource d, public.result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
286
        stmt.executeUpdate(sql);
287

    
288
        sql = "CREATE TABLE IF NOT EXISTS views_stats (like views_stats_tmp including all)";
289
        stmt.executeUpdate(sql);
290

    
291
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
292
        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly_tmp p, public.datasource d, public.result_oids ro where p.source='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
293
        stmt.executeUpdate(sql);
294

    
295
        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats (like pageviews_stats_tmp including all)";
296
        stmt.executeUpdate(sql);
297
        
298
        stmt.close();
299
        ConnectDB.getConnection().commit();
300
        ConnectDB.getConnection().close();
301
    }
302

    
303
//    public void viewsStats(String piwikid) throws Exception {
304
//        stmt = ConnectDB.getConnection().createStatement();
305
//        ConnectDB.getConnection().setAutoCommit(false);
306
//
307
//        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
308
//        String sql = "CREATE OR REPLACE VIEW result_views_monthly" + piwikid + " AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog" + piwikid + " where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
309
//        stmt.executeUpdate(sql);
310
//
311
//        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
312
//        sql = "CREATE TABLE IF NOT EXISTS views_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source!='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
313
//        stmt.executeUpdate(sql);
314
//
315
////        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
316
//        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
317
//        stmt.executeUpdate(sql);
318
//
319
//        sql = "DROP VIEW IF EXISTS result_views_monthly" + piwikid + ";";
320
//        stmt.executeUpdate(sql);
321
//
322
//        stmt.close();
323
//        ConnectDB.getConnection().commit();
324
//        ConnectDB.getConnection().close();
325
//    }
326

    
327
     private void downloadsStats() throws Exception {
328
        Statement stmt = ConnectDB.getConnection().createStatement();
329
        ConnectDB.getConnection().setAutoCommit(false);
330

    
331
         //String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
332
        String sql = "CREATE OR REPLACE VIEW result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
333
        stmt.executeUpdate(sql);
334

    
335
        //sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
336
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
337
        sql = "CREATE TABLE IF NOT EXISTS downloads_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM result_downloads_monthly_tmp p, public.datasource d, public.result_oids ro where p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
338
        stmt.executeUpdate(sql);
339
        
340
        sql = "CREATE TABLE IF NOT EXISTS downloads_stats (like downloads_stats_tmp including all)";
341
        stmt.executeUpdate(sql);
342
        
343
        sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp;";
344
        stmt.executeUpdate(sql);
345

    
346
        stmt.close();
347
        ConnectDB.getConnection().commit();
348
        ConnectDB.getConnection().close();
349
    }
350

    
351
    public void finalizeStats() throws Exception {
352
        stmt = ConnectDB.getConnection().createStatement();
353
        ConnectDB.getConnection().setAutoCommit(false);
354

    
355
        Calendar startCalendar = Calendar.getInstance();
356
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
357
        Calendar endCalendar = Calendar.getInstance();
358
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
359
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
360

    
361
//        String sql = "SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date INTO full_dates FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
362
        String sql = "CREATE TABLE IF NOT EXISTS full_dates AS SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
363
        stmt.executeUpdate(sql);
364

    
365
        sql = "CREATE INDEX IF NOT EXISTS full_dates_full_date ON full_dates USING btree(full_date);";
366
        stmt.executeUpdate(sql);
367

    
368
        sql = "CREATE INDEX IF NOT EXISTS views_stats_source ON views_stats USING btree(source);";
369
        stmt.executeUpdate(sql);
370

    
371
        sql = "CREATE INDEX IF NOT EXISTS views_stats_repository_id ON views_stats USING btree(repository_id);";
372
        stmt.executeUpdate(sql);
373

    
374
        sql = "CREATE INDEX IF NOT EXISTS views_stats_result_id ON views_stats USING btree(result_id);";
375
        stmt.executeUpdate(sql);
376

    
377
        sql = "CREATE INDEX IF NOT EXISTS views_stats_date ON views_stats USING btree(date);";
378
        stmt.executeUpdate(sql);
379

    
380
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_source ON pageviews_stats USING btree(source);";
381
        stmt.executeUpdate(sql);
382

    
383
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_repository_id ON pageviews_stats USING btree(repository_id);";
384
        stmt.executeUpdate(sql);
385

    
386
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_result_id ON pageviews_stats USING btree(result_id);";
387
        stmt.executeUpdate(sql);
388

    
389
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_date ON pageviews_stats USING btree(date);";
390
        stmt.executeUpdate(sql);
391

    
392
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_source ON downloads_stats USING btree(source);";
393
        stmt.executeUpdate(sql);
394

    
395
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_repository_id ON downloads_stats USING btree(repository_id);";
396
        stmt.executeUpdate(sql);
397

    
398
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_result_id ON downloads_stats USING btree(result_id);";
399
        stmt.executeUpdate(sql);
400

    
401
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_date ON downloads_stats USING btree(date);";
402
        stmt.executeUpdate(sql);
403

    
404
//        sql = "SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views INTO usage_stats FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
405
        sql = "CREATE TABLE IF NOT EXISTS usage_stats AS SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
406
        stmt.executeUpdate(sql);
407

    
408
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_source ON usage_stats USING btree(source);";
409
        stmt.executeUpdate(sql);
410

    
411
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_repository_id ON usage_stats USING btree(repository_id);";
412
        stmt.executeUpdate(sql);
413

    
414
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_result_id ON usage_stats USING btree(result_id);";
415
        stmt.executeUpdate(sql);
416

    
417
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_date ON usage_stats USING btree(date);";
418
        stmt.executeUpdate(sql);
419
       
420
        sql = "DROP TABLE IF EXISTS process_portal_log_tmp;";
421
        stmt.executeUpdate(sql);
422

    
423
        sql = "DROP TABLE IF EXISTS pageviews_stats_tmp;";
424
        stmt.executeUpdate(sql);
425
        
426
        sql = "DROP VIEW IF EXISTS  result_views_monthly_tmp";
427
        stmt.executeUpdate(sql);
428

    
429
        sql = "DROP TABLE IF EXISTS piwiklogtmp;";
430
        stmt.executeUpdate(sql);
431

    
432
        sql = "DROP TABLE IF EXISTS sushilogtmp;";
433
        stmt.executeUpdate(sql);
434

    
435
        stmt.close();
436
        ConnectDB.getConnection().commit();
437
        ConnectDB.getConnection().close();
438
    }
439

    
440
    //Create repository Views statistics
441
    private void repositoryViewsStats() throws Exception {
442
        stmt = ConnectDB.getConnection().createStatement();
443
        ConnectDB.getConnection().setAutoCommit(false);
444

    
445
//        String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
446
        String sql = "CREATE TABLE IF NOT EXISTS repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
447
        stmt.executeUpdate(sql);
448

    
449
        sql = "CREATE INDEX repo_view_stats_id ON repo_view_stats USING btree (id)";
450
        stmt.executeUpdate(sql);
451

    
452
        sql = "CREATE INDEX repo_view_stats_date ON repo_view_stats USING btree(date)";
453
        stmt.executeUpdate(sql);
454

    
455
//        sql = "SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_view_stats_monthly_clean FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
456
        sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly_clean AS SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
457
        stmt.executeUpdate(sql);
458

    
459
        sql = "CREATE INDEX repo_view_stats_monthly_clean_id ON repo_view_stats_monthly_clean USING btree (id)";
460
        stmt.executeUpdate(sql);
461

    
462
        sql = "CREATE INDEX repo_view_stats_monthly_clean_month ON repo_view_stats_monthly_clean USING btree(month)";
463
        stmt.executeUpdate(sql);
464

    
465
        sql = "CREATE INDEX repo_view_stats_monthly_clean_source ON repo_view_stats_monthly_clean USING btree(source)";
466
        stmt.executeUpdate(sql);
467

    
468
        Calendar startCalendar = Calendar.getInstance();
469
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
470
        Calendar endCalendar = Calendar.getInstance();
471
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
472
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
473

    
474
        //sql="CREATE OR REPLACE view repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
475
//        sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_view_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
476
        sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
477
        stmt.executeUpdate(sql);
478

    
479
        sql = "CREATE INDEX repo_view_stats_monthly_id ON repo_view_stats_monthly USING btree (id)";
480
        stmt.executeUpdate(sql);
481

    
482
        sql = "CREATE INDEX repo_view_stats_monthly_month ON repo_view_stats_monthly USING btree(month)";
483
        stmt.executeUpdate(sql);
484

    
485
        sql = "CREATE INDEX repo_view_stats_monthly_source ON repo_view_stats_monthly USING btree(source)";
486
        stmt.executeUpdate(sql);
487

    
488
        sql = "CREATE OR REPLACE view repo_view_stats_monthly_sushi AS SELECT id, sum(number_of_views), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_view_stats group by id, month, source;";
489
        stmt.executeUpdate(sql);
490

    
491
        stmt.close();
492
        ConnectDB.getConnection().commit();
493
        ConnectDB.getConnection().close();
494
    }
495

    
496
    //Create repository downloads statistics
497
    private void repositoryDownloadsStats() throws Exception {
498
        stmt = ConnectDB.getConnection().createStatement();
499
        ConnectDB.getConnection().setAutoCommit(false);
500

    
501
//        String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
502
        String sql = "CREATE TABLE IF NOT EXISTS repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
503
        stmt.executeUpdate(sql);
504

    
505
        sql = "CREATE INDEX repo_download_stats_id ON repo_download_stats USING btree (id)";
506
        stmt.executeUpdate(sql);
507

    
508
        sql = "CREATE INDEX repo_download_stats_date ON repo_download_stats USING btree(date)";
509
        stmt.executeUpdate(sql);
510

    
511
//        sql = "SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_download_stats_monthly_clean FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
512
        sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly_clean AS SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
513
        stmt.executeUpdate(sql);
514

    
515
        sql = "CREATE INDEX repo_download_stats_monthly_clean_id ON repo_download_stats_monthly_clean USING btree (id)";
516
        stmt.executeUpdate(sql);
517

    
518
        sql = "CREATE INDEX repo_download_stats_monthly_clean_month ON repo_download_stats_monthly_clean USING btree(month)";
519
        stmt.executeUpdate(sql);
520

    
521
        sql = "CREATE INDEX repo_download_stats_monthly_clean_source ON repo_download_stats_monthly_clean USING btree(source)";
522
        stmt.executeUpdate(sql);
523

    
524
        Calendar startCalendar = Calendar.getInstance();
525
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
526
        Calendar endCalendar = Calendar.getInstance();
527
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
528
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
529

    
530
        //sql="CREATE OR REPLACE view repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
531
        // sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_download_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
532
        sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
533
        stmt.executeUpdate(sql);
534

    
535
        sql = "CREATE INDEX repo_download_stats_monthly_id ON repo_download_stats_monthly USING btree (id)";
536
        stmt.executeUpdate(sql);
537

    
538
        sql = "CREATE INDEX repo_download_stats_monthly_month ON repo_download_stats_monthly USING btree(month)";
539
        stmt.executeUpdate(sql);
540

    
541
        sql = "CREATE INDEX repo_download_stats_monthly_source ON repo_download_stats_monthly USING btree(source)";
542
        stmt.executeUpdate(sql);
543

    
544
        sql = "CREATE OR REPLACE view repo_download_stats_monthly_sushi AS SELECT id, sum(number_of_downloads), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_download_stats group by id, month, source;";
545
        stmt.executeUpdate(sql);
546

    
547
        stmt.close();
548
        ConnectDB.getConnection().commit();
549
        ConnectDB.getConnection().close();
550
    }
551

    
552
    // Import OPENAIRE Logs to DB
553
    public void processPortalLog() throws Exception {
554
        Statement stmt = ConnectDB.getConnection().createStatement();
555
        ConnectDB.getConnection().setAutoCommit(false);
556

    
557
        ArrayList<String> jsonFiles = listHdfsDir(this.logPortalPath);
558
//        File folder = new File(this.logPortalPath);
559
//        File[] jsonFiles = folder.listFiles();
560

    
561
        PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO process_portal_log_tmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
562
        int batch_size = 0;
563
        JSONParser parser = new JSONParser();
564
        for (String jsonFile : jsonFiles) {
565
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
566

    
567
            for (Object aJsonArray : jsonArray) {
568
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
569
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
570
                String idVisit = jsonObjectRow.get("idVisit").toString();
571
                String country = jsonObjectRow.get("country").toString();
572
                String referrerName = jsonObjectRow.get("referrerName").toString();
573
                String agent = jsonObjectRow.get("browser").toString();
574
                boolean botFound = false;
575
                Iterator it = robotsList.iterator();
576
                while (it.hasNext()) {
577
                    // Create a Pattern object
578
                    Pattern r = Pattern.compile(it.next().toString());
579
                    // Now create matcher object.
580
                    Matcher m = r.matcher(agent);
581
                    if (m.find()) {
582
                        botFound = true;
583
                        break;
584
                    }
585
                }
586
                if (botFound == false) {
587
                    JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
588
                    for (Object actionDetail : actionDetails) {
589
                        JSONObject actionDetailsObj = (JSONObject) actionDetail;
590

    
591
                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
592
                        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
593
                        Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
594

    
595
                        String action = actionDetailsObj.get("type").toString();
596
                        String url = actionDetailsObj.get("url").toString();
597

    
598
                        String entityID = processPortalURL(url);
599
                        String sourceItemType = "";
600

    
601
                        if (entityID.indexOf("|") > 0) {
602
                            sourceItemType = entityID.substring(0, entityID.indexOf("|"));
603
                            entityID = entityID.substring(entityID.indexOf("|") + 1);
604
                        }
605

    
606
                        prepStatem.setInt(1, idSite);
607
                        prepStatem.setString(2, idVisit);
608
                        prepStatem.setString(3, country);
609
                        prepStatem.setString(4, action);
610
                        prepStatem.setString(5, url);
611
                        prepStatem.setString(6, entityID);
612
                        prepStatem.setString(7, sourceItemType);
613
                        prepStatem.setString(8, simpleDateFormat.format(timestamp));
614
                        prepStatem.setString(9, referrerName);
615
                        prepStatem.setString(10, agent);
616

    
617
                        prepStatem.addBatch();
618
                        batch_size++;
619
                        if (batch_size == 10000) {
620
                            prepStatem.executeBatch();
621
                            ConnectDB.getConnection().commit();
622
                            batch_size = 0;
623
                        }
624
                    }
625
                }
626
            }
627
        }
628
        prepStatem.executeBatch();
629
        ConnectDB.getConnection().commit();
630

    
631
        stmt.close();
632
        ConnectDB.getConnection().close();
633
    }
634
 
635
    public void portalStats() throws SQLException {
636
        Connection con = ConnectDB.getConnection();
637
        Statement stmt = con.createStatement();
638
        con.setAutoCommit(false);
639

    
640
        String sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'oaItem\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.result_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
641
        stmt.executeUpdate(sql);
642
        stmt.close();
643
//        con.commit();
644

    
645
        stmt = con.createStatement();
646
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'datasource\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.datasource_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
647
        stmt.executeUpdate(sql);
648
        stmt.close();
649
//        con.commit();
650

    
651
        stmt = con.createStatement();
652
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'organization\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.organization_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
653
        stmt.executeUpdate(sql);
654
        stmt.close();
655
//        con.commit();
656

    
657
        stmt = con.createStatement();
658
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'project\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.project_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
659
        stmt.executeUpdate(sql);
660
        stmt.close();
661
//        con.commit();
662

    
663
        con.close();
664
    }
665

    
666
    private void cleanOAI() throws Exception {
667
        ConnectDB.getConnection().setAutoCommit(false);
668

    
669
        stmt = ConnectDB.getConnection().createStatement();
670
        String sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chlc.min-saude.pt/','oai:repositorio.chlc.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.chlc.min-saude.pt/%';";
671
        stmt.executeUpdate(sql);
672
        stmt.close();
673
        ConnectDB.getConnection().commit();
674

    
675
        stmt = ConnectDB.getConnection().createStatement();
676
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hospitaldebraga.pt/','oai:repositorio.hospitaldebraga.pt:') WHERE entity_id LIKE 'oai:repositorio.hospitaldebraga.pt/%';";
677
        stmt.executeUpdate(sql);
678
        stmt.close();
679
        ConnectDB.getConnection().commit();
680

    
681
        stmt = ConnectDB.getConnection().createStatement();
682
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipl.pt/','oai:repositorio.ipl.pt:') WHERE entity_id LIKE 'oai:repositorio.ipl.pt/%';";
683
        stmt.executeUpdate(sql);
684
        stmt.close();
685
        ConnectDB.getConnection().commit();
686

    
687
        stmt = ConnectDB.getConnection().createStatement();
688
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:bibliotecadigital.ipb.pt/','oai:bibliotecadigital.ipb.pt:') WHERE entity_id LIKE 'oai:bibliotecadigital.ipb.pt/%';";
689
        stmt.executeUpdate(sql);
690
        stmt.close();
691
        ConnectDB.getConnection().commit();
692

    
693
        stmt = ConnectDB.getConnection().createStatement();
694
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ismai.pt/','oai:repositorio.ismai.pt:') WHERE entity_id LIKE 'oai:repositorio.ismai.pt/%';";
695
        stmt.executeUpdate(sql);
696
        stmt.close();
697
        ConnectDB.getConnection().commit();
698

    
699
        stmt = ConnectDB.getConnection().createStatement();
700
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorioaberto.uab.pt/','oai:repositorioaberto.uab.pt:') WHERE entity_id LIKE 'oai:repositorioaberto.uab.pt/%';";
701
        stmt.executeUpdate(sql);
702
        stmt.close();
703
        ConnectDB.getConnection().commit();
704

    
705
        stmt = ConnectDB.getConnection().createStatement();
706
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.uac.pt/','oai:repositorio.uac.pt:') WHERE entity_id LIKE 'oai:repositorio.uac.pt/%';";
707
        stmt.executeUpdate(sql);
708
        stmt.close();
709
        ConnectDB.getConnection().commit();
710

    
711
        stmt = ConnectDB.getConnection().createStatement();
712
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.insa.pt/','oai:repositorio.insa.pt:') WHERE entity_id LIKE 'oai:repositorio.insa.pt/%';";
713
        stmt.executeUpdate(sql);
714
        stmt.close();
715
        ConnectDB.getConnection().commit();
716

    
717
        stmt = ConnectDB.getConnection().createStatement();
718
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipcb.pt/','oai:repositorio.ipcb.pt:') WHERE entity_id LIKE 'oai:repositorio.ipcb.pt/%';";
719
        stmt.executeUpdate(sql);
720
        stmt.close();
721
        ConnectDB.getConnection().commit();
722

    
723
        stmt = ConnectDB.getConnection().createStatement();
724
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ispa.pt/','oai:repositorio.ispa.pt:') WHERE entity_id LIKE 'oai:repositorio.ispa.pt/%';";
725
        stmt.executeUpdate(sql);
726
        stmt.close();
727
        ConnectDB.getConnection().commit();
728

    
729
        stmt = ConnectDB.getConnection().createStatement();
730
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chporto.pt/','oai:repositorio.chporto.pt:') WHERE entity_id LIKE 'oai:repositorio.chporto.pt/%';";
731
        stmt.executeUpdate(sql);
732
        stmt.close();
733
        ConnectDB.getConnection().commit();
734

    
735
        stmt = ConnectDB.getConnection().createStatement();
736
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ucp.pt/','oai:repositorio.ucp.pt:') WHERE entity_id LIKE 'oai:repositorio.ucp.pt/%';";
737
        stmt.executeUpdate(sql);
738
        stmt.close();
739
        ConnectDB.getConnection().commit();
740

    
741
        stmt = ConnectDB.getConnection().createStatement();
742
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:rihuc.huc.min-saude.pt/','oai:rihuc.huc.min-saude.pt:') WHERE entity_id LIKE 'oai:rihuc.huc.min-saude.pt/%';";
743
        stmt.executeUpdate(sql);
744
        stmt.close();
745
        ConnectDB.getConnection().commit();
746

    
747
        stmt = ConnectDB.getConnection().createStatement();
748
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipv.pt/','oai:repositorio.ipv.pt:') WHERE entity_id LIKE 'oai:repositorio.ipv.pt/%';";
749
        stmt.executeUpdate(sql);
750
        stmt.close();
751
        ConnectDB.getConnection().commit();
752

    
753
        stmt = ConnectDB.getConnection().createStatement();
754
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:www.repository.utl.pt/','oai:www.repository.utl.pt:') WHERE entity_id LIKE 'oai:www.repository.utl.pt/%';";
755
        stmt.executeUpdate(sql);
756
        stmt.close();
757
        ConnectDB.getConnection().commit();
758

    
759
        stmt = ConnectDB.getConnection().createStatement();
760
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:run.unl.pt/','oai:run.unl.pt:') WHERE entity_id LIKE 'oai:run.unl.pt/%';";
761
        stmt.executeUpdate(sql);
762
        stmt.close();
763
        ConnectDB.getConnection().commit();
764

    
765
        stmt = ConnectDB.getConnection().createStatement();
766
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:sapientia.ualg.pt/','oai:sapientia.ualg.pt:') WHERE entity_id LIKE 'oai:sapientia.ualg.pt/%';";
767
        stmt.executeUpdate(sql);
768
        stmt.close();
769
        ConnectDB.getConnection().commit();
770

    
771
        stmt = ConnectDB.getConnection().createStatement();
772
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipsantarem.pt/','oai:repositorio.ipsantarem.pt:') WHERE entity_id LIKE 'oai:repositorio.ipsantarem.pt/%';";
773
        stmt.executeUpdate(sql);
774
        stmt.close();
775
        ConnectDB.getConnection().commit();
776

    
777
        stmt = ConnectDB.getConnection().createStatement();
778
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:arca.igc.gulbenkian.pt/','oai:arca.igc.gulbenkian.pt:') WHERE entity_id LIKE 'oai:arca.igc.gulbenkian.pt/%';";
779
        stmt.executeUpdate(sql);
780
        stmt.close();
781
        ConnectDB.getConnection().commit();
782

    
783
        stmt = ConnectDB.getConnection().createStatement();
784
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:ubibliorum.ubi.pt/','oai:ubibliorum.ubi.pt:') WHERE entity_id LIKE 'oai:ubibliorum.ubi.pt/%';";
785
        stmt.executeUpdate(sql);
786
        stmt.close();
787
        ConnectDB.getConnection().commit();
788

    
789
        stmt = ConnectDB.getConnection().createStatement();
790
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:digituma.uma.pt/','oai:digituma.uma.pt:') WHERE entity_id LIKE 'oai:digituma.uma.pt/%';";
791
        stmt.executeUpdate(sql);
792
        stmt.close();
793
        ConnectDB.getConnection().commit();
794

    
795
        stmt = ConnectDB.getConnection().createStatement();
796
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ul.pt/','oai:repositorio.ul.pt:') WHERE entity_id LIKE 'oai:repositorio.ul.pt/%';";
797
        stmt.executeUpdate(sql);
798
        stmt.close();
799
        ConnectDB.getConnection().commit();
800

    
801
        stmt = ConnectDB.getConnection().createStatement();
802
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hff.min-saude.pt/','oai:repositorio.hff.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.hff.min-saude.pt/%';";
803
        stmt.executeUpdate(sql);
804
        stmt.close();
805
        ConnectDB.getConnection().commit();
806

    
807
        stmt = ConnectDB.getConnection().createStatement();
808
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorium.sdum.uminho.pt/','oai:repositorium.sdum.uminho.pt:') WHERE entity_id LIKE 'oai:repositorium.sdum.uminho.pt/%';";
809
        stmt.executeUpdate(sql);
810
        stmt.close();
811
        ConnectDB.getConnection().commit();
812

    
813
        stmt = ConnectDB.getConnection().createStatement();
814
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:recipp.ipp.pt/','oai:recipp.ipp.pt:') WHERE entity_id LIKE 'oai:recipp.ipp.pt/%';";
815
        stmt.executeUpdate(sql);
816
        stmt.close();
817
        ConnectDB.getConnection().commit();
818

    
819
        stmt = ConnectDB.getConnection().createStatement();
820
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:bdigital.ufp.pt/','oai:bdigital.ufp.pt:') WHERE entity_id LIKE 'oai:bdigital.ufp.pt/%';";
821
        stmt.executeUpdate(sql);
822
        stmt.close();
823
        ConnectDB.getConnection().commit();
824

    
825
        stmt = ConnectDB.getConnection().createStatement();
826
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.lneg.pt/','oai:repositorio.lneg.pt:') WHERE entity_id LIKE 'oai:repositorio.lneg.pt/%';";
827
        stmt.executeUpdate(sql);
828
        stmt.close();
829
        ConnectDB.getConnection().commit();
830

    
831
        stmt = ConnectDB.getConnection().createStatement();
832
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:iconline.ipleiria.pt/','oai:iconline.ipleiria.pt:') WHERE entity_id LIKE 'oai:iconline.ipleiria.pt/%';";
833
        stmt.executeUpdate(sql);
834
        stmt.close();
835
        ConnectDB.getConnection().commit();
836

    
837
        stmt = ConnectDB.getConnection().createStatement();
838
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:comum.rcaap.pt/','oai:comum.rcaap.pt:') WHERE entity_id LIKE 'oai:comum.rcaap.pt/%';";
839
        stmt.executeUpdate(sql);
840
        stmt.close();
841
        ConnectDB.getConnection().commit();
842

    
843
        ConnectDB.getConnection().close();
844
    }
845

    
846
    private String processPortalURL(String url) {
847

    
848
        if (url.indexOf("explore.openaire.eu") > 0) {
849
            try {
850
                url = URLDecoder.decode(url, "UTF-8");
851
            } catch (Exception e) {
852
                log.info(url);
853
            }
854
            if (url.indexOf("datasourceId=") > 0 && url.substring(url.indexOf("datasourceId=") + 13).length() >= 46) {
855
                url = "datasource|" + url.substring(url.indexOf("datasourceId=") + 13, url.indexOf("datasourceId=") + 59);
856
            } else if (url.indexOf("datasource=") > 0 && url.substring(url.indexOf("datasource=") + 11).length() >= 46) {
857
                url = "datasource|" + url.substring(url.indexOf("datasource=") + 11, url.indexOf("datasource=") + 57);
858
            } else if (url.indexOf("datasourceFilter=") > 0 && url.substring(url.indexOf("datasourceFilter=") + 17).length() >= 46) {
859
                url = "datasource|" + url.substring(url.indexOf("datasourceFilter=") + 17, url.indexOf("datasourceFilter=") + 63);
860
            } else if (url.indexOf("articleId=") > 0 && url.substring(url.indexOf("articleId=") + 10).length() >= 46) {
861
                url = "result|" + url.substring(url.indexOf("articleId=") + 10, url.indexOf("articleId=") + 56);
862
            } else if (url.indexOf("datasetId=") > 0 && url.substring(url.indexOf("datasetId=") + 10).length() >= 46) {
863
                url = "result|" + url.substring(url.indexOf("datasetId=") + 10, url.indexOf("datasetId=") + 56);
864
            } else if (url.indexOf("projectId=") > 0 && url.substring(url.indexOf("projectId=") + 10).length() >= 46 && !url.contains("oai:dnet:corda")) {
865
                url = "project|" + url.substring(url.indexOf("projectId=") + 10, url.indexOf("projectId=") + 56);
866
            } else if (url.indexOf("organizationId=") > 0 && url.substring(url.indexOf("organizationId=") + 15).length() >= 46) {
867
                url = "organization|" + url.substring(url.indexOf("organizationId=") + 15, url.indexOf("organizationId=") + 61);
868
            } else {
869
                url = "";
870
            }
871
        } else {
872
            url = "";
873
        }
874

    
875
        return url;
876
    }
877

    
878
    private void updateProdTables() throws SQLException {
879
        Statement stmt = ConnectDB.getConnection().createStatement();
880
        ConnectDB.getConnection().setAutoCommit(false);
881

    
882
       String sql = "insert into piwiklog select * from piwiklogtmp;";
883
       stmt.executeUpdate(sql);
884
        
885
        sql = "insert into views_stats select * from views_stats_tmp;";
886
        stmt.executeUpdate(sql);
887
        
888
        sql = "insert into downloads_stats select * from downloads_stats_tmp;";
889
        stmt.executeUpdate(sql);
890
        
891
        sql = "insert into pageviews_stats select * from pageviews_stats_tmp;";
892
        stmt.executeUpdate(sql);
893

    
894
        sql = "DROP TABLE IF EXISTS views_stats_tmp;";
895
        stmt.executeUpdate(sql);
896
        
897
        sql = "DROP TABLE IF EXISTS downloads_stats_tmp;";
898
        stmt.executeUpdate(sql);
899
        
900
        sql = "DROP TABLE IF EXISTS pageviews_stats_tmp;";
901
        stmt.executeUpdate(sql);
902

    
903
        sql = "DROP TABLE IF EXISTS process_portal_log_tmp;";
904
        stmt.executeUpdate(sql);
905
       
906
        
907
        stmt.close();
908
        ConnectDB.getConnection().commit();
909
        ConnectDB.getConnection().close();
910

    
911
        log.info("updateProdTables done");
912
    }
913

    
914
    private ArrayList<String> listHdfsDir(String dir) throws Exception {
915

    
916
        FileSystem hdfs = FileSystem.get(new Configuration());
917
        RemoteIterator<LocatedFileStatus> Files;
918
        ArrayList<String> fileNames = new ArrayList<>();
919

    
920
        try {
921
            Path exportPath = new Path(hdfs.getUri() + dir);
922
            Files = hdfs.listFiles(exportPath, false);
923
            while (Files.hasNext()) {
924
                String fileName = Files.next().getPath().toString();
925
                fileNames.add(fileName);
926
            }
927

    
928
            hdfs.close();
929
        } catch (Exception e) {
930
            log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logPath));
931
            throw new Exception("HDFS file path with exported data does not exist :   " + logPath, e);
932
        }
933

    
934
        return fileNames;
935
    }
936

    
937
    private String readHDFSFile(String filename) throws Exception {
938
        String result;
939
        try {
940

    
941
            FileSystem fs = FileSystem.get(new Configuration());
942
            //log.info("reading file : " + filename);
943

    
944
            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
945

    
946
            StringBuilder sb = new StringBuilder();
947
            String line = br.readLine();
948

    
949
            while (line != null) {
950
                if (!line.equals("[]")) {
951
                    sb.append(line);
952
                }
953
                //sb.append(line);
954
                line = br.readLine();
955
            }
956
            result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
957
            if (result.equals("")) {
958
                result = "[]";
959
            }
960

    
961
            //fs.close();
962
        } catch (Exception e) {
963
            log.error(e);
964
            throw new Exception(e);
965
        }
966

    
967
        return result;
968
    }
969

    
970
    private Connection getConnection() throws SQLException {
971
        return ConnectDB.getConnection();
972
    }
973
}
(5-5/9)