Project

General

Profile

1 45524 tsampikos.
package eu.dnetlib.usagestats.export;
2
3
import java.io.*;
4
import java.net.URLDecoder;
5
import java.sql.Connection;
6
import java.sql.DriverManager;
7
import java.sql.PreparedStatement;
8
import java.sql.Statement;
9
import java.sql.Timestamp;
10
import java.text.SimpleDateFormat;
11
import java.util.*;
12
13
import org.apache.hadoop.conf.Configuration;
14
import org.apache.hadoop.fs.LocatedFileStatus;
15
import org.apache.hadoop.fs.Path;
16
import org.apache.hadoop.fs.FileSystem;
17
import org.apache.hadoop.fs.RemoteIterator;
18
import org.apache.log4j.Logger;
19
import org.json.simple.JSONArray;
20
import org.json.simple.JSONObject;
21
import org.json.simple.parser.JSONParser;
22
23
public class PiwikStatsDB {
24 45950 tsampikos.
    private final String dbUrl;
25
    private final String dbSchema;
26
    private final String dbUserName;
27
    private final String dbPassword;
28
    private final String logPath;
29 45524 tsampikos.
30
    private Connection conn = null;
31
    private Statement stmt = null;
32
33 45950 tsampikos.
    private final Logger log = Logger.getLogger(this.getClass());
34 45524 tsampikos.
35 45950 tsampikos.
    public PiwikStatsDB(String dbUrl, String dbUserName, String dbPassword, String logPath) throws Exception {
36
        this.dbUrl = dbUrl;
37
        this.dbSchema = "shadow";
38
        this.dbUserName = dbUserName;
39
        this.dbPassword = dbPassword;
40
        this.logPath = logPath;
41 45524 tsampikos.
42
        connectDB();
43
        createTables();
44
    }
45
46 45950 tsampikos.
    private void connectDB() throws Exception {
47
        try {
48 45524 tsampikos.
            Class.forName("org.postgresql.Driver");
49 45950 tsampikos.
            conn = DriverManager.getConnection(dbUrl, dbUserName, dbPassword);
50 45524 tsampikos.
51 45950 tsampikos.
            stmt = conn.createStatement();
52
            String sqlSetSearchPath = "SET search_path TO " + dbSchema + ";";
53 45524 tsampikos.
            stmt.executeUpdate(sqlSetSearchPath);
54
55 45950 tsampikos.
            //log.info("Opened database successfully");
56
        } catch (Exception e) {
57
            log.error("Connect to db failed: " + e);
58
            throw new Exception("Failed to connect to db: " + e.toString(), e);
59 45524 tsampikos.
        }
60
    }
61
62 45950 tsampikos.
    private void createTables() throws Exception {
63 45524 tsampikos.
        try {
64 45950 tsampikos.
            stmt = conn.createStatement();
65 47268 tsampikos.
            String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklog(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
66 45950 tsampikos.
            String sqlcreateRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " +
67 45524 tsampikos.
                    " ON INSERT TO piwiklog " +
68 45950 tsampikos.
                    " WHERE (EXISTS ( SELECT piwiklog.source, piwiklog.id_visit," +
69 47073 tsampikos.
                    "piwiklog.action, piwiklog.\"timestamp\", piwiklog.entity_id " +
70 45524 tsampikos.
                    "FROM piwiklog " +
71 47073 tsampikos.
                    "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
72 45524 tsampikos.
            stmt.executeUpdate(sqlCreateTablePiwikLog);
73
            stmt.executeUpdate(sqlcreateRulePiwikLog);
74
75 45950 tsampikos.
            String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;";
76
            stmt.executeUpdate(sqlCopyPublicPiwiklog);
77 45524 tsampikos.
78 47268 tsampikos.
            String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
79 45950 tsampikos.
            String sqlcreateRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " +
80
                    " ON INSERT TO process_portal_log " +
81
                    " WHERE (EXISTS ( SELECT process_portal_log.source, process_portal_log.id_visit," +
82
                    "process_portal_log.\"timestamp\" " +
83
                    "FROM process_portal_log " +
84
                    "WHERE process_portal_log.source = new.source AND process_portal_log.id_visit = new.id_visit AND process_portal_log.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
85 45524 tsampikos.
            stmt.executeUpdate(sqlCreateTablePortalLog);
86
            stmt.executeUpdate(sqlcreateRulePortalLog);
87
88
            stmt.close();
89
            conn.close();
90
            log.info("Usage Tables Created");
91
92 45950 tsampikos.
        } catch (Exception e) {
93 45524 tsampikos.
            log.error("Failed to create tables: " + e);
94
            throw new Exception("Failed to create tables: " + e.toString(), e);
95
            //System.exit(0);
96
        }
97
    }
98
99 45950 tsampikos.
    protected void processLogs() throws Exception {
100 45524 tsampikos.
        try {
101 45950 tsampikos.
            processRepositoryLog();
102
            log.info("repository process done");
103
            removeDoubleClicks();
104
            log.info("removing double clicks done");
105 47073 tsampikos.
            cleanOAI();
106
            log.info("cleaning oai done");
107 45950 tsampikos.
108 45524 tsampikos.
            processPortalLog();
109 45950 tsampikos.
            log.info("portal process done");
110 45524 tsampikos.
            portalStats();
111 45950 tsampikos.
            log.info("portal stats done");
112
        } catch (Exception e) {
113 45524 tsampikos.
            log.error("Failed to process logs: " + e);
114
            throw new Exception("Failed to process logs: " + e.toString(), e);
115
        }
116
    }
117
118 45950 tsampikos.
    protected void usageStats() throws Exception {
119 45524 tsampikos.
        try {
120 45950 tsampikos.
            //resultStats();
121
            //dataSourceStats();
122
            //organizationsStats();
123
            //projectsStats();
124
            //repositoryViewsStats();
125
            //repositoryDownloadsStats();
126
            viewsStats();
127
            downloadsStats();
128
            log.info("stat tables and views done");
129
        } catch (Exception e) {
130 45524 tsampikos.
            log.error("Failed to create usage stats: " + e);
131
            throw new Exception("Failed to create usage stats: " + e.toString(), e);
132
        }
133
    }
134
135
    // Import repository Logs to DB
136 45950 tsampikos.
    private void processRepositoryLog() throws Exception {
137
        if (conn.isClosed())
138 45524 tsampikos.
            connectDB();
139
140 45950 tsampikos.
        stmt = conn.createStatement();
141 45524 tsampikos.
        conn.setAutoCommit(false);
142
143 45950 tsampikos.
        ArrayList<String> jsonFiles = listHdfsDir(logPath + "repolog");
144 45524 tsampikos.
145 47268 tsampikos.
        PreparedStatement prepStatem = conn.prepareStatement("INSERT INTO piwiklog (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
146 45524 tsampikos.
        int batch_size = 0;
147
        for (String jsonFile : jsonFiles) {
148
            JSONParser parser = new JSONParser();
149 45950 tsampikos.
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
150 45524 tsampikos.
151 45950 tsampikos.
            for (Object aJsonArray : jsonArray) {
152
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
153 45524 tsampikos.
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
154 45950 tsampikos.
                String idVisit = jsonObjectRow.get("idVisit").toString();
155
                String country = jsonObjectRow.get("country").toString();
156
                String referrerName = jsonObjectRow.get("referrerName").toString();
157
                String agent = jsonObjectRow.get("browser").toString();
158 45524 tsampikos.
159 45950 tsampikos.
                String sourceItemType = "repItem";
160 45524 tsampikos.
161
                JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
162 45950 tsampikos.
                for (Object actionDetail : actionDetails) {
163
                    JSONObject actionDetailsObj = (JSONObject) actionDetail;
164 45524 tsampikos.
165 45950 tsampikos.
                    if (actionDetailsObj.get("customVariables") != null) {
166
                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
167
                        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
168
                        Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
169
                        String url = actionDetailsObj.get("url").toString();
170
                        String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1")).get("customVariablePageValue1").toString();
171
                        String action = actionDetailsObj.get("type").toString();
172 45524 tsampikos.
173 45950 tsampikos.
                        prepStatem.setInt(1, idSite);
174
                        prepStatem.setString(2, idVisit);
175 47268 tsampikos.
                        prepStatem.setString(3, country);
176
                        prepStatem.setString(4, action);
177
                        prepStatem.setString(5, url);
178
                        prepStatem.setString(6, oaipmh);
179
                        prepStatem.setString(7, sourceItemType);
180
                        prepStatem.setString(8, simpleDateFormat.format(timestamp));
181
                        prepStatem.setString(9, referrerName);
182
                        prepStatem.setString(10, agent);
183 45950 tsampikos.
                        prepStatem.addBatch();
184
                        batch_size++;
185
                        if (batch_size == 10000) {
186
                            prepStatem.executeBatch();
187
                            conn.commit();
188
                            batch_size = 0;
189 45524 tsampikos.
                        }
190
                    }
191
                }
192
            }
193
        }
194
        prepStatem.executeBatch();
195
        conn.commit();
196
        stmt.close();
197
        conn.close();
198
    }
199 45950 tsampikos.
200
    private void removeDoubleClicks() throws Exception {
201
        if (conn.isClosed())
202 45524 tsampikos.
            connectDB();
203
204 45950 tsampikos.
        stmt = conn.createStatement();
205 45524 tsampikos.
        conn.setAutoCommit(false);
206
207 45950 tsampikos.
        //clean download double clicks
208 47268 tsampikos.
        String sql = "DELETE FROM piwiklog p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM piwiklog p1, piwiklog p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
209 45950 tsampikos.
        stmt.executeUpdate(sql);
210
        stmt.close();
211
        conn.commit();
212 45524 tsampikos.
213 45950 tsampikos.
        stmt = conn.createStatement();
214 45524 tsampikos.
215 45950 tsampikos.
        //clean view double clicks
216 47268 tsampikos.
        sql = "DELETE FROM piwiklog p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from piwiklog p1, piwiklog p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
217 45950 tsampikos.
        stmt.executeUpdate(sql);
218
        stmt.close();
219
        conn.commit();
220
        conn.close();
221
    }
222 45524 tsampikos.
223 47073 tsampikos.
    private void viewsStats() throws Exception {
224
        if (conn.isClosed())
225
            connectDB();
226 45524 tsampikos.
227 47073 tsampikos.
        stmt = conn.createStatement();
228
        conn.setAutoCommit(false);
229
230
        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
231
        String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
232
        stmt.executeUpdate(sql);
233
234 55646 antonis.le
        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
235 56483 antonis.le
        sql = "CREATE TABLE views_stats AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
236 47073 tsampikos.
        stmt.executeUpdate(sql);
237
238 55646 antonis.le
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
239 56483 antonis.le
        sql = "CREATE TABLE pageviews_stats AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
240 47073 tsampikos.
        stmt.executeUpdate(sql);
241
242
        sql = "DROP VIEW IF EXISTS result_views_monthly;";
243
        stmt.executeUpdate(sql);
244
245
        stmt.close();
246
        conn.commit();
247
        conn.close();
248
    }
249
250
    private void downloadsStats() throws Exception {
251
        if (conn.isClosed())
252
            connectDB();
253
254
        stmt = conn.createStatement();
255
        conn.setAutoCommit(false);
256
257
        //String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
258
        String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
259
        stmt.executeUpdate(sql);
260
261
        //sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
262 55646 antonis.le
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
263 56483 antonis.le
        sql = "CREATE TABLE downloads_stats AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
264 47073 tsampikos.
        stmt.executeUpdate(sql);
265
266
        sql = "DROP VIEW IF EXISTS result_downloads_monthly;";
267
        stmt.executeUpdate(sql);
268
269
        stmt.close();
270
        conn.commit();
271
        conn.close();
272
    }
273
274
    public void finalizeStats() throws Exception {
275
        if (conn.isClosed())
276
            connectDB();
277
278
        stmt = conn.createStatement();
279
        conn.setAutoCommit(false);
280
281
        Calendar startCalendar = Calendar.getInstance();
282
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
283
        Calendar endCalendar = Calendar.getInstance();
284
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
285
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
286
287 55646 antonis.le
//        String sql = "SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date INTO full_dates FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
288 56483 antonis.le
        String sql = "CREATE TABLE full_dates AS SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
289 47073 tsampikos.
        stmt.executeUpdate(sql);
290
291
        sql = "CREATE INDEX full_dates_full_date ON full_dates USING btree(full_date);";
292
        stmt.executeUpdate(sql);
293
294
295
        sql = "CREATE INDEX views_stats_source ON views_stats USING btree(source);";
296
        stmt.executeUpdate(sql);
297
298
        sql = "CREATE INDEX views_stats_repository_id ON views_stats USING btree(repository_id);";
299
        stmt.executeUpdate(sql);
300
301
        sql = "CREATE INDEX views_stats_result_id ON views_stats USING btree(result_id);";
302
        stmt.executeUpdate(sql);
303
304
        sql = "CREATE INDEX views_stats_date ON views_stats USING btree(date);";
305
        stmt.executeUpdate(sql);
306
307
308
        sql = "CREATE INDEX pageviews_stats_source ON pageviews_stats USING btree(source);";
309
        stmt.executeUpdate(sql);
310
311
        sql = "CREATE INDEX pageviews_stats_repository_id ON pageviews_stats USING btree(repository_id);";
312
        stmt.executeUpdate(sql);
313
314
        sql = "CREATE INDEX pageviews_stats_result_id ON pageviews_stats USING btree(result_id);";
315
        stmt.executeUpdate(sql);
316
317
        sql = "CREATE INDEX pageviews_stats_date ON pageviews_stats USING btree(date);";
318
        stmt.executeUpdate(sql);
319
320
321
        sql = "CREATE INDEX downloads_stats_source ON downloads_stats USING btree(source);";
322
        stmt.executeUpdate(sql);
323
324
        sql = "CREATE INDEX downloads_stats_repository_id ON downloads_stats USING btree(repository_id);";
325
        stmt.executeUpdate(sql);
326
327
        sql = "CREATE INDEX downloads_stats_result_id ON downloads_stats USING btree(result_id);";
328
        stmt.executeUpdate(sql);
329
330
        sql = "CREATE INDEX downloads_stats_date ON downloads_stats USING btree(date);";
331
        stmt.executeUpdate(sql);
332
333 55646 antonis.le
//        sql = "SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views INTO usage_stats FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
334 56483 antonis.le
        sql = "CREATE TABLE usage_stats AS SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
335 53597 tsampikos.
        stmt.executeUpdate(sql);
336 47073 tsampikos.
337 53597 tsampikos.
        sql = "CREATE INDEX usage_stats_source ON usage_stats USING btree(source);";
338
        stmt.executeUpdate(sql);
339
340
        sql = "CREATE INDEX usage_stats_repository_id ON usage_stats USING btree(repository_id);";
341
        stmt.executeUpdate(sql);
342
343
        sql = "CREATE INDEX usage_stats_result_id ON usage_stats USING btree(result_id);";
344
        stmt.executeUpdate(sql);
345
346
        sql = "CREATE INDEX usage_stats_date ON usage_stats USING btree(date);";
347
        stmt.executeUpdate(sql);
348
349 47073 tsampikos.
        stmt.close();
350
        conn.commit();
351
        conn.close();
352
    }
353
354 45950 tsampikos.
    //views stats
355 55646 antonis.le
//    private void viewsStatsOLD() throws Exception {
356
//        if (conn.isClosed())
357
//            connectDB();
358
//
359
//        stmt = conn.createStatement();
360
//        conn.setAutoCommit(false);
361
//
362
//        Calendar startCalendar = Calendar.getInstance();
363
//        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
364
//        Calendar endCalendar = Calendar.getInstance();
365
//        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
366
//        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
367
//
368
//        String sql = "CREATE OR REPLACE VIEW result_views_monthly as select entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'-'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
369
//        stmt.executeUpdate(sql);
370
//
371
//        sql = "SELECT d.id, d.new_date AS month, CASE when rdm.views IS NULL THEN 0 ELSE rdm.views END, d.source INTO result_views_sushi FROM (SELECT distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date, rdsm.source FROM generate_series(0, " + diffMonth + ", 1) AS offs, result_views_monthly rdsm) d LEFT JOIN (SELECT id, month, views, source FROM result_views_monthly) rdm ON d.new_date=rdm.month AND d.id=rdm.id AND d.source=rdm.source ORDER BY d.source, d.id, d.new_date;";
372
//        stmt.executeUpdate(sql);
373
//
374
//        sql = "CREATE INDEX result_views_sushi_id ON result_views_sushi USING btree (id);";
375
//        stmt.executeUpdate(sql);
376
//
377
//        sql = "CREATE INDEX result_views_sushi_month ON result_views_sushi USING btree (month);";
378
//        stmt.executeUpdate(sql);
379
//
380
//        sql = "CREATE INDEX result_views_sushi_source ON result_views_sushi USING btree (source);";
381
//        stmt.executeUpdate(sql);
382
//
383
//        sql = "SELECT roid.id, extract('year' from month::date) ||'/'|| LPAD(CAST(extract('month' from month::date) AS VARCHAR), 2, '0') as month, max(views) as views, source INTO result_views FROM result_views_sushi rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY source, roid.id, month ORDER BY source, roid.id, month;";
384
//        stmt.executeUpdate(sql);
385
//
386
//        sql = "CREATE INDEX result_views_id ON result_views USING btree (id);";
387
//        stmt.executeUpdate(sql);
388
//
389
//        sql = "CREATE INDEX result_views_month ON result_views USING btree (month);";
390
//        stmt.executeUpdate(sql);
391
//
392
//        sql = "CREATE INDEX result_views_source ON result_views USING btree (source);";
393
//        stmt.executeUpdate(sql);
394
//
395
//
396
//        sql = "DROP VIEW IF EXISTS result_views_monthly;";
397
//        stmt.executeUpdate(sql);
398
//
399
//        stmt.close();
400
//        conn.commit();
401
//        conn.close();
402
//    }
403 45524 tsampikos.
404 45950 tsampikos.
    //downloads stats
405 55646 antonis.le
//    private void downloadsStatsOLD() throws Exception {
406
//        if (conn.isClosed())
407
//            connectDB();
408
//
409
//        stmt = conn.createStatement();
410
//        conn.setAutoCommit(false);
411
//
412
//        Calendar startCalendar = Calendar.getInstance();
413
//        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
414
//        Calendar endCalendar = Calendar.getInstance();
415
//        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
416
//        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
417
//
418
//        String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'-'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
419
//        stmt.executeUpdate(sql);
420
//
421
//        sql = "SELECT d.id, d.new_date AS month, CASE when rdm.downloads IS NULL THEN 0 ELSE rdm.downloads END, d.source INTO result_downloads_sushi FROM (SELECT distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date, rdsm.source FROM generate_series(0, " + diffMonth + ", 1) AS offs, result_downloads_monthly rdsm) d LEFT JOIN (SELECT id, month, downloads, source FROM result_downloads_monthly) rdm ON d.new_date=rdm.month AND d.id=rdm.id AND d.source=rdm.source ORDER BY d.source, d.id, d.new_date;";
422
//        stmt.executeUpdate(sql);
423
//
424
//        sql = "CREATE INDEX result_downloads_sushi_id ON result_downloads_sushi USING btree (id);";
425
//        stmt.executeUpdate(sql);
426
//
427
//        sql = "CREATE INDEX result_downloads_sushi_month ON result_downloads_sushi USING btree (month);";
428
//        stmt.executeUpdate(sql);
429
//
430
//        sql = "CREATE INDEX result_downloads_sushi_source ON result_downloads_sushi USING btree (source);";
431
//        stmt.executeUpdate(sql);
432
//
433
//        sql = "SELECT roid.id, extract('year' from month::date) ||'/'|| LPAD(CAST(extract('month' from month::date) AS VARCHAR), 2, '0') as month, downloads, source INTO result_downloads FROM result_downloads_sushi rvs, result_oids roid WHERE rvs.id=roid.orid ORDER BY source, roid.id, month;";
434
//        stmt.executeUpdate(sql);
435
//
436
//        sql = "CREATE INDEX result_downloads_id ON result_downloads USING btree (id);";
437
//        stmt.executeUpdate(sql);
438
//
439
//        sql = "CREATE INDEX result_downloads_month ON result_downloads USING btree (month);";
440
//        stmt.executeUpdate(sql);
441
//
442
//        sql = "CREATE INDEX result_downloads_source ON result_downloads USING btree (source);";
443
//        stmt.executeUpdate(sql);
444
//
445
//
446
//        sql = "DROP VIEW IF EXISTS result_downloads_monthly;";
447
//        stmt.executeUpdate(sql);
448
//
449
//        stmt.close();
450
//        conn.commit();
451
//        conn.close();
452
//    }
453 45524 tsampikos.
454
    //Create repository Views statistics
455 45950 tsampikos.
    private void repositoryViewsStats() throws Exception {
456
        if (conn.isClosed())
457 45524 tsampikos.
            connectDB();
458
459 45950 tsampikos.
        stmt = conn.createStatement();
460 45524 tsampikos.
        conn.setAutoCommit(false);
461
462 55646 antonis.le
//        String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
463 56483 antonis.le
        String sql = "CREATE TABLE repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
464 45524 tsampikos.
        stmt.executeUpdate(sql);
465
466 45950 tsampikos.
        sql = "CREATE INDEX repo_view_stats_id ON repo_view_stats USING btree (id)";
467 45524 tsampikos.
        stmt.executeUpdate(sql);
468
469 45950 tsampikos.
        sql = "CREATE INDEX repo_view_stats_date ON repo_view_stats USING btree(date)";
470 45524 tsampikos.
        stmt.executeUpdate(sql);
471
472 55646 antonis.le
//        sql = "SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_view_stats_monthly_clean FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
473 56483 antonis.le
        sql = "CREATE TABLE repo_view_stats_monthly_clean AS SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
474 45524 tsampikos.
        stmt.executeUpdate(sql);
475
476 45950 tsampikos.
        sql = "CREATE INDEX repo_view_stats_monthly_clean_id ON repo_view_stats_monthly_clean USING btree (id)";
477 45524 tsampikos.
        stmt.executeUpdate(sql);
478
479 45950 tsampikos.
        sql = "CREATE INDEX repo_view_stats_monthly_clean_month ON repo_view_stats_monthly_clean USING btree(month)";
480 45524 tsampikos.
        stmt.executeUpdate(sql);
481
482 45950 tsampikos.
        sql = "CREATE INDEX repo_view_stats_monthly_clean_source ON repo_view_stats_monthly_clean USING btree(source)";
483 45524 tsampikos.
        stmt.executeUpdate(sql);
484
485
        Calendar startCalendar = Calendar.getInstance();
486
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
487
        Calendar endCalendar = Calendar.getInstance();
488
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
489
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
490
491
        //sql="CREATE OR REPLACE view repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
492 55646 antonis.le
//        sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_view_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
493 56483 antonis.le
        sql = "CREATE TABLE repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
494 45524 tsampikos.
        stmt.executeUpdate(sql);
495
496 45950 tsampikos.
        sql = "CREATE INDEX repo_view_stats_monthly_id ON repo_view_stats_monthly USING btree (id)";
497 45524 tsampikos.
        stmt.executeUpdate(sql);
498
499 45950 tsampikos.
        sql = "CREATE INDEX repo_view_stats_monthly_month ON repo_view_stats_monthly USING btree(month)";
500 45524 tsampikos.
        stmt.executeUpdate(sql);
501
502 45950 tsampikos.
        sql = "CREATE INDEX repo_view_stats_monthly_source ON repo_view_stats_monthly USING btree(source)";
503 45524 tsampikos.
        stmt.executeUpdate(sql);
504
505 45950 tsampikos.
        sql = "CREATE OR REPLACE view repo_view_stats_monthly_sushi AS SELECT id, sum(number_of_views), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_view_stats group by id, month, source;";
506
        stmt.executeUpdate(sql);
507
508 45524 tsampikos.
        stmt.close();
509
        conn.commit();
510
        conn.close();
511
    }
512
513 45950 tsampikos.
    //Create repository downloads statistics
514
    private void repositoryDownloadsStats() throws Exception {
515
        if (conn.isClosed())
516 45524 tsampikos.
            connectDB();
517
518 45950 tsampikos.
        stmt = conn.createStatement();
519 45524 tsampikos.
        conn.setAutoCommit(false);
520
521 55646 antonis.le
//        String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
522 56483 antonis.le
        String sql = "CREATE TABLE repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
523 45524 tsampikos.
        stmt.executeUpdate(sql);
524
525 45950 tsampikos.
        sql = "CREATE INDEX repo_download_stats_id ON repo_download_stats USING btree (id)";
526 45524 tsampikos.
        stmt.executeUpdate(sql);
527
528 45950 tsampikos.
        sql = "CREATE INDEX repo_download_stats_date ON repo_download_stats USING btree(date)";
529 45524 tsampikos.
        stmt.executeUpdate(sql);
530
531 55646 antonis.le
//        sql = "SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_download_stats_monthly_clean FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
532 56483 antonis.le
        sql = "CREATE TABLE repo_download_stats_monthly_clean AS SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
533 45524 tsampikos.
        stmt.executeUpdate(sql);
534
535 45950 tsampikos.
        sql = "CREATE INDEX repo_download_stats_monthly_clean_id ON repo_download_stats_monthly_clean USING btree (id)";
536 45524 tsampikos.
        stmt.executeUpdate(sql);
537
538 45950 tsampikos.
        sql = "CREATE INDEX repo_download_stats_monthly_clean_month ON repo_download_stats_monthly_clean USING btree(month)";
539 45524 tsampikos.
        stmt.executeUpdate(sql);
540
541 45950 tsampikos.
        sql = "CREATE INDEX repo_download_stats_monthly_clean_source ON repo_download_stats_monthly_clean USING btree(source)";
542 45524 tsampikos.
        stmt.executeUpdate(sql);
543
544
        Calendar startCalendar = Calendar.getInstance();
545
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
546
        Calendar endCalendar = Calendar.getInstance();
547
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
548
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
549
550
        //sql="CREATE OR REPLACE view repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
551 55646 antonis.le
        // sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_download_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
552 56483 antonis.le
        sql = "CREATE TABLE repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
553 45524 tsampikos.
        stmt.executeUpdate(sql);
554
555 45950 tsampikos.
        sql = "CREATE INDEX repo_download_stats_monthly_id ON repo_download_stats_monthly USING btree (id)";
556 45524 tsampikos.
        stmt.executeUpdate(sql);
557
558 45950 tsampikos.
        sql = "CREATE INDEX repo_download_stats_monthly_month ON repo_download_stats_monthly USING btree(month)";
559 45524 tsampikos.
        stmt.executeUpdate(sql);
560
561 45950 tsampikos.
        sql = "CREATE INDEX repo_download_stats_monthly_source ON repo_download_stats_monthly USING btree(source)";
562 45524 tsampikos.
        stmt.executeUpdate(sql);
563
564 45950 tsampikos.
565
        sql = "CREATE OR REPLACE view repo_download_stats_monthly_sushi AS SELECT id, sum(number_of_downloads), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_download_stats group by id, month, source;";
566
        stmt.executeUpdate(sql);
567
568 45524 tsampikos.
        stmt.close();
569
        conn.commit();
570
        conn.close();
571
    }
572
573
    // Import OPENAIRE Logs to DB
574 45950 tsampikos.
    private void processPortalLog() throws Exception {
575 45524 tsampikos.
576 45950 tsampikos.
        if (conn.isClosed())
577 45524 tsampikos.
            connectDB();
578
579 45950 tsampikos.
        stmt = conn.createStatement();
580 45524 tsampikos.
        conn.setAutoCommit(false);
581
582 45950 tsampikos.
        ArrayList<String> jsonFiles = listHdfsDir(logPath + "portallog");
583 47268 tsampikos.
        PreparedStatement prepStatem = conn.prepareStatement("INSERT INTO process_portal_log (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
584 45524 tsampikos.
        int batch_size = 0;
585 45950 tsampikos.
        JSONParser parser = new JSONParser();
586 45524 tsampikos.
        for (String jsonFile : jsonFiles) {
587 45950 tsampikos.
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
588 45524 tsampikos.
589 45950 tsampikos.
            for (Object aJsonArray : jsonArray) {
590
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
591 45524 tsampikos.
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
592 45950 tsampikos.
                String idVisit = jsonObjectRow.get("idVisit").toString();
593
                String country = jsonObjectRow.get("country").toString();
594
                String referrerName = jsonObjectRow.get("referrerName").toString();
595
                String agent = jsonObjectRow.get("browser").toString();
596 45524 tsampikos.
597
                JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
598 45950 tsampikos.
                for (Object actionDetail : actionDetails) {
599
                    JSONObject actionDetailsObj = (JSONObject) actionDetail;
600 45524 tsampikos.
601
                    SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
602
                    simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
603 45950 tsampikos.
                    Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
604 45524 tsampikos.
605
                    String action = actionDetailsObj.get("type").toString();
606
                    String url = actionDetailsObj.get("url").toString();
607
608 45950 tsampikos.
                    String entityID = processPortalURL(url);
609
                    String sourceItemType = "";
610 45524 tsampikos.
611 45950 tsampikos.
                    if (entityID.indexOf("|") > 0) {
612
                        sourceItemType = entityID.substring(0, entityID.indexOf("|"));
613
                        entityID = entityID.substring(entityID.indexOf("|") + 1);
614 45524 tsampikos.
                    }
615
616
                    prepStatem.setInt(1, idSite);
617
                    prepStatem.setString(2, idVisit);
618 47268 tsampikos.
                    prepStatem.setString(3, country);
619
                    prepStatem.setString(4, action);
620
                    prepStatem.setString(5, url);
621
                    prepStatem.setString(6, entityID);
622
                    prepStatem.setString(7, sourceItemType);
623
                    prepStatem.setString(8, simpleDateFormat.format(timestamp));
624
                    prepStatem.setString(9, referrerName);
625
                    prepStatem.setString(10, agent);
626 45524 tsampikos.
627
                    prepStatem.addBatch();
628
                    batch_size++;
629 45950 tsampikos.
                    if (batch_size == 10000) {
630 45524 tsampikos.
                        prepStatem.executeBatch();
631
                        conn.commit();
632
                        batch_size = 0;
633
                    }
634
                }
635
            }
636
        }
637
        prepStatem.executeBatch();
638
        conn.commit();
639
640
        stmt.close();
641
        conn.close();
642
    }
643
644 45950 tsampikos.
    private void portalStats() throws Exception {
645
        if (conn.isClosed())
646 45524 tsampikos.
            connectDB();
647
648 45950 tsampikos.
        stmt = conn.createStatement();
649 45524 tsampikos.
        conn.setAutoCommit(false);
650
651 55646 antonis.le
        String sql = "INSERT INTO piwiklog SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'oaItem\', timestamp, referrer_name, agent FROM process_portal_log, result_oids roid WHERE entity_id IS NOT null AND entity_id=roid.id AND roid.orid IS NOT null;";
652 45524 tsampikos.
        stmt.executeUpdate(sql);
653
        stmt.close();
654
        conn.commit();
655
656 45950 tsampikos.
        stmt = conn.createStatement();
657 55646 antonis.le
        sql = "INSERT INTO piwiklog SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'datasource\', timestamp, referrer_name, agent FROM process_portal_log, datasource_oids roid WHERE entity_id IS NOT null AND entity_id=roid.id AND roid.orid IS NOT null;";
658 45524 tsampikos.
        stmt.executeUpdate(sql);
659
        stmt.close();
660
        conn.commit();
661
662 45950 tsampikos.
        stmt = conn.createStatement();
663 55646 antonis.le
        sql = "INSERT INTO piwiklog SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'organization\', timestamp, referrer_name, agent FROM process_portal_log, organization_oids roid WHERE entity_id IS NOT null AND entity_id=roid.id AND roid.orid IS NOT null;";
664 45524 tsampikos.
        stmt.executeUpdate(sql);
665
        stmt.close();
666
        conn.commit();
667
668 45950 tsampikos.
        stmt = conn.createStatement();
669 55646 antonis.le
        sql = "INSERT INTO piwiklog SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'project\', timestamp, referrer_name, agent FROM process_portal_log, project_oids roid WHERE entity_id IS NOT null AND entity_id=roid.id AND roid.orid IS NOT null;";
670 45524 tsampikos.
        stmt.executeUpdate(sql);
671
        stmt.close();
672
        conn.commit();
673
674 45950 tsampikos.
        stmt = conn.createStatement();
675
        sql = "DROP TABLE process_portal_log;";
676 45524 tsampikos.
        stmt.executeUpdate(sql);
677
        stmt.close();
678
        conn.commit();
679
680
        conn.close();
681
    }
682
683 45950 tsampikos.
    private void cleanOAI() throws Exception {
684
        if (conn.isClosed())
685 45524 tsampikos.
            connectDB();
686
687
        conn.setAutoCommit(false);
688
689 45950 tsampikos.
        stmt = conn.createStatement();
690
        String sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chlc.min-saude.pt/','oai:repositorio.chlc.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.chlc.min-saude.pt/%';";
691 45524 tsampikos.
        stmt.executeUpdate(sql);
692 45950 tsampikos.
        stmt.close();
693
        conn.commit();
694
695
        stmt = conn.createStatement();
696
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hospitaldebraga.pt/','oai:repositorio.hospitaldebraga.pt:') WHERE entity_id LIKE 'oai:repositorio.hospitaldebraga.pt/%';";
697 45524 tsampikos.
        stmt.executeUpdate(sql);
698 45950 tsampikos.
        stmt.close();
699
        conn.commit();
700
701
        stmt = conn.createStatement();
702
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipl.pt/','oai:repositorio.ipl.pt:') WHERE entity_id LIKE 'oai:repositorio.ipl.pt/%';";
703 45524 tsampikos.
        stmt.executeUpdate(sql);
704 45950 tsampikos.
        stmt.close();
705
        conn.commit();
706
707
        stmt = conn.createStatement();
708
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:bibliotecadigital.ipb.pt/','oai:bibliotecadigital.ipb.pt:') WHERE entity_id LIKE 'oai:bibliotecadigital.ipb.pt/%';";
709 45524 tsampikos.
        stmt.executeUpdate(sql);
710 45950 tsampikos.
        stmt.close();
711
        conn.commit();
712
713
        stmt = conn.createStatement();
714
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ismai.pt/','oai:repositorio.ismai.pt:') WHERE entity_id LIKE 'oai:repositorio.ismai.pt/%';";
715 45524 tsampikos.
        stmt.executeUpdate(sql);
716 45950 tsampikos.
        stmt.close();
717
        conn.commit();
718
719
        stmt = conn.createStatement();
720
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorioaberto.uab.pt/','oai:repositorioaberto.uab.pt:') WHERE entity_id LIKE 'oai:repositorioaberto.uab.pt/%';";
721 45524 tsampikos.
        stmt.executeUpdate(sql);
722 45950 tsampikos.
        stmt.close();
723
        conn.commit();
724
725
        stmt = conn.createStatement();
726
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.uac.pt/','oai:repositorio.uac.pt:') WHERE entity_id LIKE 'oai:repositorio.uac.pt/%';";
727 45524 tsampikos.
        stmt.executeUpdate(sql);
728 45950 tsampikos.
        stmt.close();
729
        conn.commit();
730
731
        stmt = conn.createStatement();
732
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.insa.pt/','oai:repositorio.insa.pt:') WHERE entity_id LIKE 'oai:repositorio.insa.pt/%';";
733 45524 tsampikos.
        stmt.executeUpdate(sql);
734 45950 tsampikos.
        stmt.close();
735
        conn.commit();
736
737
        stmt = conn.createStatement();
738
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipcb.pt/','oai:repositorio.ipcb.pt:') WHERE entity_id LIKE 'oai:repositorio.ipcb.pt/%';";
739 45524 tsampikos.
        stmt.executeUpdate(sql);
740 45950 tsampikos.
        stmt.close();
741
        conn.commit();
742
743
        stmt = conn.createStatement();
744
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ispa.pt/','oai:repositorio.ispa.pt:') WHERE entity_id LIKE 'oai:repositorio.ispa.pt/%';";
745 45524 tsampikos.
        stmt.executeUpdate(sql);
746 45950 tsampikos.
        stmt.close();
747
        conn.commit();
748
749
        stmt = conn.createStatement();
750
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chporto.pt/','oai:repositorio.chporto.pt:') WHERE entity_id LIKE 'oai:repositorio.chporto.pt/%';";
751 45524 tsampikos.
        stmt.executeUpdate(sql);
752 45950 tsampikos.
        stmt.close();
753
        conn.commit();
754
755
        stmt = conn.createStatement();
756
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ucp.pt/','oai:repositorio.ucp.pt:') WHERE entity_id LIKE 'oai:repositorio.ucp.pt/%';";
757 45524 tsampikos.
        stmt.executeUpdate(sql);
758 45950 tsampikos.
        stmt.close();
759
        conn.commit();
760
761
        stmt = conn.createStatement();
762
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:rihuc.huc.min-saude.pt/','oai:rihuc.huc.min-saude.pt:') WHERE entity_id LIKE 'oai:rihuc.huc.min-saude.pt/%';";
763 45524 tsampikos.
        stmt.executeUpdate(sql);
764 45950 tsampikos.
        stmt.close();
765
        conn.commit();
766
767
        stmt = conn.createStatement();
768
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipv.pt/','oai:repositorio.ipv.pt:') WHERE entity_id LIKE 'oai:repositorio.ipv.pt/%';";
769 45524 tsampikos.
        stmt.executeUpdate(sql);
770 45950 tsampikos.
        stmt.close();
771
        conn.commit();
772
773
        stmt = conn.createStatement();
774
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:www.repository.utl.pt/','oai:www.repository.utl.pt:') WHERE entity_id LIKE 'oai:www.repository.utl.pt/%';";
775 45524 tsampikos.
        stmt.executeUpdate(sql);
776 45950 tsampikos.
        stmt.close();
777
        conn.commit();
778
779
        stmt = conn.createStatement();
780
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:run.unl.pt/','oai:run.unl.pt:') WHERE entity_id LIKE 'oai:run.unl.pt/%';";
781 45524 tsampikos.
        stmt.executeUpdate(sql);
782 45950 tsampikos.
        stmt.close();
783
        conn.commit();
784
785
        stmt = conn.createStatement();
786
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:sapientia.ualg.pt/','oai:sapientia.ualg.pt:') WHERE entity_id LIKE 'oai:sapientia.ualg.pt/%';";
787 45524 tsampikos.
        stmt.executeUpdate(sql);
788 45950 tsampikos.
        stmt.close();
789
        conn.commit();
790
791
        stmt = conn.createStatement();
792
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipsantarem.pt/','oai:repositorio.ipsantarem.pt:') WHERE entity_id LIKE 'oai:repositorio.ipsantarem.pt/%';";
793 45524 tsampikos.
        stmt.executeUpdate(sql);
794 45950 tsampikos.
        stmt.close();
795
        conn.commit();
796
797
        stmt = conn.createStatement();
798
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:arca.igc.gulbenkian.pt/','oai:arca.igc.gulbenkian.pt:') WHERE entity_id LIKE 'oai:arca.igc.gulbenkian.pt/%';";
799 45524 tsampikos.
        stmt.executeUpdate(sql);
800 45950 tsampikos.
        stmt.close();
801
        conn.commit();
802
803
        stmt = conn.createStatement();
804
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:ubibliorum.ubi.pt/','oai:ubibliorum.ubi.pt:') WHERE entity_id LIKE 'oai:ubibliorum.ubi.pt/%';";
805 45524 tsampikos.
        stmt.executeUpdate(sql);
806 45950 tsampikos.
        stmt.close();
807
        conn.commit();
808
809
        stmt = conn.createStatement();
810
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:digituma.uma.pt/','oai:digituma.uma.pt:') WHERE entity_id LIKE 'oai:digituma.uma.pt/%';";
811 45524 tsampikos.
        stmt.executeUpdate(sql);
812 45950 tsampikos.
        stmt.close();
813
        conn.commit();
814
815
        stmt = conn.createStatement();
816
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ul.pt/','oai:repositorio.ul.pt:') WHERE entity_id LIKE 'oai:repositorio.ul.pt/%';";
817 45524 tsampikos.
        stmt.executeUpdate(sql);
818 45950 tsampikos.
        stmt.close();
819
        conn.commit();
820
821
        stmt = conn.createStatement();
822
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hff.min-saude.pt/','oai:repositorio.hff.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.hff.min-saude.pt/%';";
823 45524 tsampikos.
        stmt.executeUpdate(sql);
824 45950 tsampikos.
        stmt.close();
825
        conn.commit();
826
827
        stmt = conn.createStatement();
828
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorium.sdum.uminho.pt/','oai:repositorium.sdum.uminho.pt:') WHERE entity_id LIKE 'oai:repositorium.sdum.uminho.pt/%';";
829 45524 tsampikos.
        stmt.executeUpdate(sql);
830 45950 tsampikos.
        stmt.close();
831
        conn.commit();
832
833
        stmt = conn.createStatement();
834
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:recipp.ipp.pt/','oai:recipp.ipp.pt:') WHERE entity_id LIKE 'oai:recipp.ipp.pt/%';";
835 45524 tsampikos.
        stmt.executeUpdate(sql);
836 45950 tsampikos.
        stmt.close();
837
        conn.commit();
838
839
        stmt = conn.createStatement();
840
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:bdigital.ufp.pt/','oai:bdigital.ufp.pt:') WHERE entity_id LIKE 'oai:bdigital.ufp.pt/%';";
841 45524 tsampikos.
        stmt.executeUpdate(sql);
842 45950 tsampikos.
        stmt.close();
843
        conn.commit();
844
845
        stmt = conn.createStatement();
846
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.lneg.pt/','oai:repositorio.lneg.pt:') WHERE entity_id LIKE 'oai:repositorio.lneg.pt/%';";
847 45524 tsampikos.
        stmt.executeUpdate(sql);
848 45950 tsampikos.
        stmt.close();
849
        conn.commit();
850
851
        stmt = conn.createStatement();
852
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:iconline.ipleiria.pt/','oai:iconline.ipleiria.pt:') WHERE entity_id LIKE 'oai:iconline.ipleiria.pt/%';";
853 45524 tsampikos.
        stmt.executeUpdate(sql);
854 45950 tsampikos.
        stmt.close();
855
        conn.commit();
856 45524 tsampikos.
857 45950 tsampikos.
        stmt = conn.createStatement();
858
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:comum.rcaap.pt/','oai:comum.rcaap.pt:') WHERE entity_id LIKE 'oai:comum.rcaap.pt/%';";
859
        stmt.executeUpdate(sql);
860 45524 tsampikos.
        stmt.close();
861
        conn.commit();
862 45950 tsampikos.
863 45524 tsampikos.
        conn.close();
864
    }
865
866
    //Create OpenAIRE's portal datasource statistics
867 45950 tsampikos.
    private void dataSourceStats() throws Exception {
868
        if (conn.isClosed())
869 45524 tsampikos.
            connectDB();
870
871 45950 tsampikos.
        stmt = conn.createStatement();
872 45524 tsampikos.
        conn.setAutoCommit(false);
873
874 55646 antonis.le
        // String sql = "SELECT  orgid AS id, max(viewcount) AS number_of_views, date INTO datasource_stats FROM (select entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date as date FROM piwiklog, datasource_oids ooid WHERE entity_id=ooid.orid AND source_item_type='datasource' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
875 56483 antonis.le
        String sql = "CREATE TABLE datasource_stats AS SELECT  orgid AS id, max(viewcount) AS number_of_views, date FROM (select entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date as date FROM piwiklog, datasource_oids ooid WHERE entity_id=ooid.orid AND source_item_type='datasource' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
876 45524 tsampikos.
        stmt.executeUpdate(sql);
877
878 45950 tsampikos.
        sql = "CREATE INDEX datasource_stats_id ON datasource_stats USING btree (id)";
879 45524 tsampikos.
        stmt.executeUpdate(sql);
880
881 45950 tsampikos.
        sql = "CREATE INDEX datasource_stats_date ON datasource_stats USING btree(date)";
882 45524 tsampikos.
        stmt.executeUpdate(sql);
883
884 55646 antonis.le
        // sql = "SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month INTO datasource_stats_monthly_clean FROM datasource_stats GROUP BY id, month;";
885 56483 antonis.le
        sql = "CREATE TABLE datasource_stats_monthly_clean AS SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month FROM datasource_stats GROUP BY id, month;";
886 45524 tsampikos.
        stmt.executeUpdate(sql);
887
888 45950 tsampikos.
        sql = "CREATE INDEX datasource_stats_monthly_clean_id ON datasource_stats_monthly_clean USING btree (id)";
889 45524 tsampikos.
        stmt.executeUpdate(sql);
890
891 45950 tsampikos.
        sql = "CREATE INDEX datasource_stats_monthly_clean_month ON datasource_stats_monthly_clean USING btree(month)";
892 45524 tsampikos.
        stmt.executeUpdate(sql);
893
894
        Calendar startCalendar = Calendar.getInstance();
895
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
896
        Calendar endCalendar = Calendar.getInstance();
897
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
898
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
899
900
        //sql="CREATE OR REPLACE view datasource_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth +", 1) AS offs, datasource_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from datasource_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
901 55646 antonis.le
        //sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end INTO datasource_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, datasource_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from datasource_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
902 56483 antonis.le
        sql = "CREATE TABLE datasource_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, datasource_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from datasource_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
903 45524 tsampikos.
        stmt.executeUpdate(sql);
904
905 45950 tsampikos.
        sql = "CREATE INDEX datasource_stats_monthly_id ON datasource_stats_monthly USING btree (id)";
906 45524 tsampikos.
        stmt.executeUpdate(sql);
907
908 45950 tsampikos.
        sql = "CREATE INDEX datasource_stats_monthly_month ON datasource_stats_monthly USING btree(month)";
909 45524 tsampikos.
        stmt.executeUpdate(sql);
910
911
        stmt.close();
912
        conn.commit();
913
        conn.close();
914
    }
915
916
    //Create OpenAIRE's portal results statistics
917 45950 tsampikos.
    private void resultStats() throws Exception {
918
        if (conn.isClosed())
919 45524 tsampikos.
            connectDB();
920
921 45950 tsampikos.
        stmt = conn.createStatement();
922 45524 tsampikos.
        conn.setAutoCommit(false);
923
924 55646 antonis.le
        // String sql = "SELECT orgid AS id, max(viewcount) AS number_of_views, date INTO result_stats FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date as date FROM piwiklog, result_oids ooid WHERE entity_id=ooid.orid AND source_item_type='oaItem' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
925 56483 antonis.le
        String sql = "CREATE TABLE result_stats AS SELECT orgid AS id, max(viewcount) AS number_of_views, date  FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date as date FROM piwiklog, result_oids ooid WHERE entity_id=ooid.orid AND source_item_type='oaItem' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
926 45524 tsampikos.
        stmt.executeUpdate(sql);
927
928 45950 tsampikos.
        sql = "CREATE INDEX result_stats_id ON result_stats USING btree (id)";
929 45524 tsampikos.
        stmt.executeUpdate(sql);
930
931 45950 tsampikos.
        sql = "CREATE INDEX result_stats_date ON result_stats USING btree(date)";
932 45524 tsampikos.
        stmt.executeUpdate(sql);
933
934 55646 antonis.le
//        sql = "SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month INTO result_stats_monthly_clean FROM result_stats group by id, month;";
935
        sql = "CREATE TABLE IF NOT EXISTS result_stats_monthly_clean AS SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month FROM result_stats group by id, month;";
936 45524 tsampikos.
        stmt.executeUpdate(sql);
937
938 45950 tsampikos.
        sql = "CREATE INDEX result_stats_monthly_clean_id ON result_stats_monthly_clean USING btree (id)";
939 45524 tsampikos.
        stmt.executeUpdate(sql);
940
941 45950 tsampikos.
        sql = "CREATE INDEX result_stats_monthly_clean_month ON result_stats_monthly_clean USING btree(month)";
942 45524 tsampikos.
        stmt.executeUpdate(sql);
943
944
        Calendar startCalendar = Calendar.getInstance();
945
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
946
        Calendar endCalendar = Calendar.getInstance();
947
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
948
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
949
950
        //sql="CREATE OR REPLACE view result_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth +", 1) AS offs, result_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from result_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
951 55646 antonis.le
//        sql = "select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end INTO result_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, result_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from result_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
952
        sql = "CREATE TABLE IF NOT EXISTS result_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, result_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from result_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
953 45524 tsampikos.
        stmt.executeUpdate(sql);
954
955 45950 tsampikos.
        sql = "CREATE INDEX result_stats_monthly_id ON result_stats_monthly USING btree (id)";
956 45524 tsampikos.
        stmt.executeUpdate(sql);
957
958 45950 tsampikos.
        sql = "CREATE INDEX result_stats_monthly_month ON result_stats_monthly USING btree(month)";
959 45524 tsampikos.
        stmt.executeUpdate(sql);
960
961
        stmt.close();
962
        conn.commit();
963
        conn.close();
964
    }
965
966
    //Create OpenAIRE's portal organization statistics
967 45950 tsampikos.
    private void organizationsStats() throws Exception {
968
        if (conn.isClosed())
969 45524 tsampikos.
            connectDB();
970
971 45950 tsampikos.
        stmt = conn.createStatement();
972 45524 tsampikos.
        conn.setAutoCommit(false);
973
974 55646 antonis.le
        // String sql = "SELECT orgid AS id, max(viewcount) AS number_of_views, date INTO organization_stats FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date AS date FROM piwiklog, organization_oids ooid WHERE entity_id=ooid.orid AND source_item_type='organization' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
975 56483 antonis.le
        String sql = "CREATE TABLE  organization_stats AS SELECT orgid AS id, max(viewcount) AS number_of_views, date FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date AS date FROM piwiklog, organization_oids ooid WHERE entity_id=ooid.orid AND source_item_type='organization' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
976 45524 tsampikos.
        stmt.executeUpdate(sql);
977
978 45950 tsampikos.
        sql = "CREATE INDEX organization_stats_id ON organization_stats USING btree (id)";
979 45524 tsampikos.
        stmt.executeUpdate(sql);
980
981 45950 tsampikos.
        sql = "CREATE INDEX organization_stats_date ON organization_stats USING btree(date)";
982 45524 tsampikos.
        stmt.executeUpdate(sql);
983
984 55646 antonis.le
        // sql = "SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month INTO organization_stats_monthly_clean FROM organization_stats group by id, month;";
985 56483 antonis.le
        sql = "CREATE TABLE  organization_stats_monthly_clean AS SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month FROM organization_stats group by id, month;";
986 45524 tsampikos.
        stmt.executeUpdate(sql);
987
988 45950 tsampikos.
        sql = "CREATE INDEX organization_stats_monthly_clean_id ON organization_stats_monthly_clean USING btree (id)";
989 45524 tsampikos.
        stmt.executeUpdate(sql);
990
991 45950 tsampikos.
        sql = "CREATE INDEX organization_stats_monthly_clean_month ON organization_stats_monthly_clean USING btree(month)";
992 45524 tsampikos.
        stmt.executeUpdate(sql);
993
994
        Calendar startCalendar = Calendar.getInstance();
995
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
996
        Calendar endCalendar = Calendar.getInstance();
997
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
998
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
999
1000
        //sql="CREATE OR REPLACE view organization_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth +", 1) AS offs, organization_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from organization_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1001 55646 antonis.le
        // sql = "select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end INTO organization_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, organization_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from organization_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1002 56483 antonis.le
        sql = "CREATE TABLE  organization_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, organization_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from organization_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1003 45524 tsampikos.
        stmt.executeUpdate(sql);
1004
1005 45950 tsampikos.
        sql = "CREATE INDEX organization_stats_monthly_id ON organization_stats_monthly USING btree (id)";
1006 45524 tsampikos.
        stmt.executeUpdate(sql);
1007
1008 45950 tsampikos.
        sql = "CREATE INDEX organization_stats_monthly_month ON organization_stats_monthly USING btree(month)";
1009 45524 tsampikos.
        stmt.executeUpdate(sql);
1010
1011
        stmt.close();
1012
        conn.commit();
1013
        conn.close();
1014
    }
1015
1016
    //Create OpenAIRE's portal projects statistics
1017 45950 tsampikos.
    private void projectsStats() throws Exception {
1018
        if (conn.isClosed())
1019 45524 tsampikos.
            connectDB();
1020
1021 45950 tsampikos.
        stmt = conn.createStatement();
1022 45524 tsampikos.
        conn.setAutoCommit(false);
1023
1024 55646 antonis.le
        // String sql = "SELECT orgid AS id, max(viewcount) AS number_of_views, date INTO project_stats FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date AS date FROM piwiklog, project_oids ooid WHERE entity_id=ooid.orid AND source_item_type='project' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
1025 56483 antonis.le
        String sql = "CREATE TABLE  project_stats AS SELECT orgid AS id, max(viewcount) AS number_of_views, date FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date AS date FROM piwiklog, project_oids ooid WHERE entity_id=ooid.orid AND source_item_type='project' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
1026 45524 tsampikos.
        stmt.executeUpdate(sql);
1027
1028 45950 tsampikos.
        sql = "CREATE INDEX project_stats_id ON project_stats USING btree (id)";
1029 45524 tsampikos.
        stmt.executeUpdate(sql);
1030
1031 45950 tsampikos.
        sql = "CREATE INDEX project_stats_date ON project_stats USING btree(date)";
1032 45524 tsampikos.
        stmt.executeUpdate(sql);
1033
1034 55646 antonis.le
        // sql = "SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month INTO project_stats_monthly_clean FROM project_stats group by id, month;";
1035 56483 antonis.le
        sql = "CREATE TABLE  project_stats_monthly_clean AS SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month FROM project_stats group by id, month;";
1036 45524 tsampikos.
        stmt.executeUpdate(sql);
1037
1038 45950 tsampikos.
        sql = "CREATE INDEX project_stats_monthly_clean_id ON project_stats_monthly_clean USING btree (id)";
1039 45524 tsampikos.
        stmt.executeUpdate(sql);
1040
1041 45950 tsampikos.
        sql = "CREATE INDEX project_stats_monthly_clean_month ON project_stats_monthly_clean USING btree(month)";
1042 45524 tsampikos.
        stmt.executeUpdate(sql);
1043
1044
        Calendar startCalendar = Calendar.getInstance();
1045
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
1046
        Calendar endCalendar = Calendar.getInstance();
1047
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
1048
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
1049
1050 55646 antonis.le
        // sql="CREATE OR REPLACE view project_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth +", 1) AS offs, project_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from project_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1051
        // sql = "select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end INTO project_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, project_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from project_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1052 56483 antonis.le
        sql = "CREATE TABLE  project_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, project_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from project_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1053 45524 tsampikos.
        stmt.executeUpdate(sql);
1054
1055 45950 tsampikos.
        sql = "CREATE INDEX project_stats_monthly_id ON project_stats_monthly USING btree (id)";
1056 45524 tsampikos.
        stmt.executeUpdate(sql);
1057
1058 45950 tsampikos.
        sql = "CREATE INDEX project_stats_monthly_month ON project_stats_monthly USING btree(month)";
1059 45524 tsampikos.
        stmt.executeUpdate(sql);
1060
1061
        stmt.close();
1062
        conn.commit();
1063
        conn.close();
1064
    }
1065
1066
    private String processPortalURL(String url) {
1067
1068 45950 tsampikos.
        if (url.indexOf("openaire.eu") > 0) {
1069 45524 tsampikos.
            try {
1070
                url = URLDecoder.decode(url, "UTF-8");
1071
            } catch (Exception e) {
1072
                log.info(url);
1073
            }
1074
            if (url.indexOf("datasourceId=") > 0 && url.substring(url.indexOf("datasourceId=") + 13).length() >= 46) {
1075
                url = "datasource|" + url.substring(url.indexOf("datasourceId=") + 13, url.indexOf("datasourceId=") + 59);
1076 45950 tsampikos.
            } else if (url.indexOf("datasource=") > 0 && url.substring(url.indexOf("datasource=") + 11).length() >= 46) {
1077 45524 tsampikos.
                url = "datasource|" + url.substring(url.indexOf("datasource=") + 11, url.indexOf("datasource=") + 57);
1078 45950 tsampikos.
            } else if (url.indexOf("datasourceFilter=") > 0 && url.substring(url.indexOf("datasourceFilter=") + 17).length() >= 46) {
1079 45524 tsampikos.
                url = "datasource|" + url.substring(url.indexOf("datasourceFilter=") + 17, url.indexOf("datasourceFilter=") + 63);
1080 45950 tsampikos.
            } else if (url.indexOf("articleId=") > 0 && url.substring(url.indexOf("articleId=") + 10).length() >= 46) {
1081 45524 tsampikos.
                url = "result|" + url.substring(url.indexOf("articleId=") + 10, url.indexOf("articleId=") + 56);
1082 45950 tsampikos.
            } else if (url.indexOf("datasetId=") > 0 && url.substring(url.indexOf("datasetId=") + 10).length() >= 46) {
1083 45524 tsampikos.
                url = "result|" + url.substring(url.indexOf("datasetId=") + 10, url.indexOf("datasetId=") + 56);
1084 45950 tsampikos.
            } else if (url.indexOf("projectId=") > 0 && url.substring(url.indexOf("projectId=") + 10).length() >= 46 && !url.contains("oai:dnet:corda")) {
1085 45524 tsampikos.
                url = "project|" + url.substring(url.indexOf("projectId=") + 10, url.indexOf("projectId=") + 56);
1086 45950 tsampikos.
            } else if (url.indexOf("organizationId=") > 0 && url.substring(url.indexOf("organizationId=") + 15).length() >= 46) {
1087 45524 tsampikos.
                url = "organization|" + url.substring(url.indexOf("organizationId=") + 15, url.indexOf("organizationId=") + 61);
1088 45950 tsampikos.
            } else {
1089 45524 tsampikos.
                url = "";
1090
            }
1091
        } else {
1092
            url = "";
1093
        }
1094
1095
        return url;
1096
    }
1097
1098 45950 tsampikos.
    private ArrayList<String> listHdfsDir(String dir) throws Exception {
1099 45524 tsampikos.
1100
        FileSystem hdfs = FileSystem.get(new Configuration());
1101
        RemoteIterator<LocatedFileStatus> Files;
1102 45950 tsampikos.
        ArrayList<String> fileNames = new ArrayList<>();
1103 45524 tsampikos.
1104
        try {
1105
            Path exportPath = new Path(hdfs.getUri() + dir);
1106
            Files = hdfs.listFiles(exportPath, false);
1107 45950 tsampikos.
            while (Files.hasNext()) {
1108
                String fileName = Files.next().getPath().toString();
1109
                //log.info("Found hdfs file " + fileName);
1110
                fileNames.add(fileName);
1111
            }
1112
            //hdfs.close();
1113 45524 tsampikos.
        } catch (Exception e) {
1114
            log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logPath));
1115
            throw new Exception("HDFS file path with exported data does not exist :   " + logPath, e);
1116
        }
1117
1118
        return fileNames;
1119
    }
1120
1121 45950 tsampikos.
    private String readHDFSFile(String filename) throws Exception {
1122 47073 tsampikos.
        String result;
1123 45524 tsampikos.
        try {
1124
1125
            FileSystem fs = FileSystem.get(new Configuration());
1126 45950 tsampikos.
            //log.info("reading file : " + filename);
1127 45524 tsampikos.
1128
            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
1129
1130
            StringBuilder sb = new StringBuilder();
1131
            String line = br.readLine();
1132
1133
            while (line != null) {
1134 45950 tsampikos.
                if (!line.equals("[]")) {
1135 45524 tsampikos.
                    sb.append(line);
1136
                }
1137
                //sb.append(line);
1138
                line = br.readLine();
1139
            }
1140
            result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
1141 45950 tsampikos.
            if (result.equals("")) {
1142
                result = "[]";
1143 45524 tsampikos.
            }
1144
1145
            //fs.close();
1146
        } catch (Exception e) {
1147
            log.error(e);
1148
            throw new Exception(e);
1149
        }
1150
1151
        return result;
1152
    }
1153
}