Project

General

Profile

1
package eu.dnetlib.usagestats.export;
2

    
3
import java.io.*;
4
import java.net.URLDecoder;
5
import java.sql.Connection;
6
import java.sql.PreparedStatement;
7
import java.sql.SQLException;
8
import java.sql.Statement;
9
import java.sql.Timestamp;
10
import java.text.SimpleDateFormat;
11
import java.util.*;
12
import java.util.regex.Matcher;
13
import java.util.regex.Pattern;
14

    
15
import org.apache.hadoop.conf.Configuration;
16
import org.apache.hadoop.fs.LocatedFileStatus;
17
import org.apache.hadoop.fs.Path;
18
import org.apache.hadoop.fs.FileSystem;
19
import org.apache.hadoop.fs.RemoteIterator;
20
import org.apache.log4j.Logger;
21
import org.json.simple.JSONArray;
22
import org.json.simple.JSONObject;
23
import org.json.simple.parser.JSONParser;
24

    
25
public class PiwikStatsDB {
26

    
27
    private String logPath;
28
    private String logRepoPath;
29
    private String logPortalPath;
30

    
31
    private Statement stmt = null;
32

    
33
    private final Logger log = Logger.getLogger(this.getClass());
34
    private String CounterRobotsURL;
35
    private ArrayList robotsList;
36

    
37

    
38
    public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception {
39
        this.logRepoPath = logRepoPath;
40
        this.logPortalPath = logPortalPath;
41
        this.createTables();
42
        this.createTmpTables();
43
    }
44

    
45
    public ArrayList getRobotsList() {
46
        return robotsList;
47
    }
48

    
49
    public void setRobotsList(ArrayList robotsList) {
50
        this.robotsList = robotsList;
51
    }
52

    
53
    public String getCounterRobotsURL() {
54
        return CounterRobotsURL;
55
    }
56

    
57
    public void setCounterRobotsURL(String CounterRobotsURL) {
58
        this.CounterRobotsURL = CounterRobotsURL;
59
    }
60

    
61
    private void connectDB() throws Exception {
62
        try {
63
            ConnectDB connectDB = new ConnectDB();
64
        } catch (Exception e) {
65
            log.error("Connect to db failed: " + e);
66
            throw new Exception("Failed to connect to db: " + e.toString(), e);
67
        }
68
    }
69

    
70
    private void createTables() throws Exception {
71
        try {
72
            stmt = ConnectDB.DB_CONNECTION.createStatement();
73
            String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklog(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
74
            String sqlcreateRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
75
                    + " ON INSERT TO piwiklog "
76
                    + " WHERE (EXISTS ( SELECT piwiklog.source, piwiklog.id_visit,"
77
                    + "piwiklog.action, piwiklog.\"timestamp\", piwiklog.entity_id "
78
                    + "FROM piwiklog "
79
                    + "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
80
            stmt.executeUpdate(sqlCreateTablePiwikLog);
81
            stmt.executeUpdate(sqlcreateRulePiwikLog);
82

    
83
            String sqlCopyPublicPiwiklog = "insert into piwiklog select * from public.piwiklog;";
84
            stmt.executeUpdate(sqlCopyPublicPiwiklog);
85

    
86
            String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
87
            String sqlcreateRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
88
                    + " ON INSERT TO process_portal_log "
89
                    + " WHERE (EXISTS ( SELECT process_portal_log.source, process_portal_log.id_visit,"
90
                    + "process_portal_log.\"timestamp\" "
91
                    + "FROM process_portal_log "
92
                    + "WHERE process_portal_log.source = new.source AND process_portal_log.id_visit = new.id_visit AND process_portal_log.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
93
            stmt.executeUpdate(sqlCreateTablePortalLog);
94
            stmt.executeUpdate(sqlcreateRulePortalLog);
95

    
96
            stmt.close();
97
            ConnectDB.DB_CONNECTION.close();
98
            log.info("Usage Tables Created");
99

    
100
        } catch (Exception e) {
101
            log.error("Failed to create tables: " + e);
102
            throw new Exception("Failed to create tables: " + e.toString(), e);
103
            //System.exit(0);
104
        }
105
    }
106

    
107
    private void createTmpTables() throws Exception {
108
        try {
109
            Statement stmt = ConnectDB.DB_CONNECTION.createStatement();
110
            String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklogtmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
111
            String sqlcreateTmpRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
112
                    + " ON INSERT TO piwiklogtmp "
113
                    + " WHERE (EXISTS ( SELECT piwiklogtmp.source, piwiklogtmp.id_visit,"
114
                    + "piwiklogtmp.action, piwiklogtmp.\"timestamp\", piwiklogtmp.entity_id "
115
                    + "FROM piwiklogtmp "
116
                    + "WHERE piwiklogtmp.source = new.source AND piwiklogtmp.id_visit = new.id_visit AND piwiklogtmp.action = new.action AND piwiklogtmp.entity_id = new.entity_id AND piwiklogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
117
            stmt.executeUpdate(sqlCreateTmpTablePiwikLog);
118
            stmt.executeUpdate(sqlcreateTmpRulePiwikLog);
119

    
120
            //String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;";
121
            //stmt.executeUpdate(sqlCopyPublicPiwiklog);
122
            String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
123
            String sqlcreateTmpRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
124
                    + " ON INSERT TO process_portal_log_tmp "
125
                    + " WHERE (EXISTS ( SELECT process_portal_log_tmp.source, process_portal_log_tmp.id_visit,"
126
                    + "process_portal_log_tmp.\"timestamp\" "
127
                    + "FROM process_portal_log_tmp "
128
                    + "WHERE process_portal_log_tmp.source = new.source AND process_portal_log_tmp.id_visit = new.id_visit AND process_portal_log_tmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
129
            stmt.executeUpdate(sqlCreateTmpTablePortalLog);
130
            stmt.executeUpdate(sqlcreateTmpRulePortalLog);
131

    
132
            stmt.close();
133
            log.info("Usage Tmp Tables Created");
134

    
135
        } catch (Exception e) {
136
            log.error("Failed to create tmptables: " + e);
137
            throw new Exception("Failed to create tmp tables: " + e.toString(), e);
138
            //System.exit(0);
139
        }
140
    }
141

    
142
    public void processLogs() throws Exception {
143
        try {
144
            ReadCounterRobotsList counterRobots = new ReadCounterRobotsList(this.getCounterRobotsURL());
145
            this.robotsList = counterRobots.getRobotsPatterns();
146

    
147
            processRepositoryLog();
148
            log.info("repository process done");
149
            removeDoubleClicks();
150
            log.info("removing double clicks done");
151
            cleanOAI();
152
            log.info("cleaning oai done");
153

    
154
            viewsStats();
155
            downloadsStats();
156

    
157
            processPortalLog();
158
            log.info("portal process done");
159
            
160
            portalStats();
161
            log.info("portal stats done");
162

    
163
            updateProdTables();
164

    
165
        } catch (Exception e) {
166
            log.error("Failed to process logs: " + e);
167
            throw new Exception("Failed to process logs: " + e.toString(), e);
168
        }
169
    }
170

    
171
    
172

    
173
    public void usageStats() throws Exception {
174
        try {
175
            //resultStats();
176
            //dataSourceStats();
177
            //organizationsStats();
178
            //projectsStats();
179
            //repositoryViewsStats();
180
            //repositoryDownloadsStats();
181
            viewsStats();
182
            downloadsStats();
183
            log.info("stat tables and views done");
184
        } catch (Exception e) {
185
            log.error("Failed to create usage stats: " + e);
186
            throw new Exception("Failed to create usage stats: " + e.toString(), e);
187
        }
188
    }
189

    
190
    public void processRepositoryLog() throws Exception {
191
        if (ConnectDB.DB_CONNECTION.isClosed()) {
192
            connectDB();
193
        }
194

    
195
        Statement stmt = ConnectDB.DB_CONNECTION.createStatement();
196
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
197

    
198
        //ArrayList<String> jsonFiles = listHdfsDir(logPath + "repolog");
199
        File dir = new File(this.logRepoPath);
200
        File[] jsonFiles = dir.listFiles();
201

    
202
        PreparedStatement prepStatem = ConnectDB.DB_CONNECTION.prepareStatement("INSERT INTO piwiklogtmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
203
        int batch_size = 0;
204
        JSONParser parser = new JSONParser();
205
        for (File jsonFile : jsonFiles) {
206
            System.out.println(jsonFile);
207
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile.getAbsolutePath()));
208
            for (Object aJsonArray : jsonArray) {
209
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
210
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
211
                String idVisit = jsonObjectRow.get("idVisit").toString();
212
                String country = jsonObjectRow.get("country").toString();
213
                String referrerName = jsonObjectRow.get("referrerName").toString();
214
                String agent = jsonObjectRow.get("browser").toString();
215
                boolean botFound = false;
216
                Iterator it = robotsList.iterator();
217
                while (it.hasNext()) {
218
                    // Create a Pattern object
219
                    Pattern r = Pattern.compile(it.next().toString());
220
                    // Now create matcher object.
221
                    Matcher m = r.matcher(agent);
222
                    if (m.find()) {
223
                        //System.out.println("Found value: " + m.group(0));
224
                        botFound = true;
225
                        break;
226
                    }
227
                }
228
                if (botFound == false) {
229
                    String sourceItemType = "repItem";
230

    
231
                    JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
232
                    for (Object actionDetail : actionDetails) {
233
                        JSONObject actionDetailsObj = (JSONObject) actionDetail;
234

    
235
                        if (actionDetailsObj.get("customVariables") != null) {
236
                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
237
                            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
238
                            Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
239
                            String url = actionDetailsObj.get("url").toString();
240
                            String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1")).get("customVariablePageValue1").toString();
241
                            String action = actionDetailsObj.get("type").toString();
242

    
243
                            prepStatem.setInt(1, idSite);
244
                            prepStatem.setString(2, idVisit);
245
                            prepStatem.setString(3, country);
246
                            prepStatem.setString(4, action);
247
                            prepStatem.setString(5, url);
248
                            prepStatem.setString(6, oaipmh);
249
                            prepStatem.setString(7, sourceItemType);
250
                            prepStatem.setString(8, simpleDateFormat.format(timestamp));
251
                            prepStatem.setString(9, referrerName);
252
                            prepStatem.setString(10, agent);
253
                            prepStatem.addBatch();
254
                            batch_size++;
255
                            if (batch_size == 10000) {
256
                                prepStatem.executeBatch();
257
                                ConnectDB.DB_CONNECTION.commit();
258
                                batch_size = 0;
259
                            }
260
                        }
261
                    }
262
                }
263
            }
264
        }
265
        prepStatem.executeBatch();
266
        ConnectDB.DB_CONNECTION.commit();
267
        stmt.close();
268
        //conn.close();
269
    }
270

    
271
       public void removeDoubleClicks() throws Exception {
272
        if (ConnectDB.DB_CONNECTION.isClosed()) {
273
            connectDB();
274
        }
275

    
276
        Statement stmt = ConnectDB.DB_CONNECTION.createStatement();
277
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
278

    
279
        //clean download double clicks
280
        String sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
281
        stmt.executeUpdate(sql);
282
        stmt.close();
283
        ConnectDB.DB_CONNECTION.commit();
284

    
285
        stmt = ConnectDB.DB_CONNECTION.createStatement();
286

    
287
        //clean view double clicks
288
        sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
289
        stmt.executeUpdate(sql);
290
        stmt.close();
291
        ConnectDB.DB_CONNECTION.commit();
292
        //conn.close();
293
    }
294

    
295
         public void viewsStats() throws Exception {
296
        if (ConnectDB.DB_CONNECTION.isClosed()) {
297
            connectDB();
298
        }
299

    
300
        Statement stmt = ConnectDB.DB_CONNECTION.createStatement();
301
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
302

    
303
        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
304
        String sql = "CREATE OR REPLACE VIEW result_views_monthly_tmp AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
305
        stmt.executeUpdate(sql);
306

    
307
        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
308
        sql = "CREATE TABLE IF NOT EXISTS views_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly_tmp p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
309
        stmt.executeUpdate(sql);
310

    
311
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
312
        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly_tmp p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
313
        stmt.executeUpdate(sql);
314

    
315
        sql = "DROP VIEW IF EXISTS result_views_monthly_tmp;";
316
        stmt.executeUpdate(sql);
317

    
318
        stmt.close();
319
        ConnectDB.DB_CONNECTION.commit();
320
        ConnectDB.DB_CONNECTION.close();
321
    }
322

    
323
    public void viewsStats(String piwikid) throws Exception {
324
        if (ConnectDB.DB_CONNECTION.isClosed()) {
325
            connectDB();
326
        }
327

    
328
        stmt = ConnectDB.DB_CONNECTION.createStatement();
329
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
330

    
331
        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
332
        String sql = "CREATE OR REPLACE VIEW result_views_monthly" + piwikid + " AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog" + piwikid + " where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
333
        stmt.executeUpdate(sql);
334

    
335
        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
336
        sql = "CREATE TABLE IF NOT EXISTS views_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source!='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
337
        stmt.executeUpdate(sql);
338

    
339
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
340
        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
341
        stmt.executeUpdate(sql);
342

    
343
        sql = "DROP VIEW IF EXISTS result_views_monthly" + piwikid + ";";
344
        stmt.executeUpdate(sql);
345

    
346
        stmt.close();
347
        ConnectDB.DB_CONNECTION.commit();
348
        ConnectDB.DB_CONNECTION.close();
349
    }
350

    
351
     private void downloadsStats() throws Exception {
352
        if (ConnectDB.DB_CONNECTION.isClosed()) {
353
            connectDB();
354
        }
355

    
356
        Statement stmt = ConnectDB.DB_CONNECTION.createStatement();
357
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
358

    
359
        //String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
360
        String sql = "CREATE OR REPLACE VIEW result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
361
        stmt.executeUpdate(sql);
362

    
363
        //sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
364
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
365
        sql = "CREATE TABLE IF NOT EXISTS downloads_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM result_downloads_monthly_tmp p, datasource d, result_oids ro where p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
366
        stmt.executeUpdate(sql);
367

    
368
        sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp;";
369
        stmt.executeUpdate(sql);
370

    
371
        stmt.close();
372
        ConnectDB.DB_CONNECTION.commit();
373
        ConnectDB.DB_CONNECTION.close();
374
    }
375

    
376
    public void finalizeStats() throws Exception {
377
        if (ConnectDB.DB_CONNECTION.isClosed()) {
378
            connectDB();
379
        }
380

    
381
        stmt = ConnectDB.DB_CONNECTION.createStatement();
382
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
383

    
384
        Calendar startCalendar = Calendar.getInstance();
385
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
386
        Calendar endCalendar = Calendar.getInstance();
387
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
388
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
389

    
390
//        String sql = "SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date INTO full_dates FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
391
        String sql = "CREATE TABLE IF NOT EXISTS full_dates AS SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
392
        stmt.executeUpdate(sql);
393

    
394
        sql = "CREATE INDEX full_dates_full_date ON full_dates USING btree(full_date);";
395
        stmt.executeUpdate(sql);
396

    
397
        sql = "CREATE INDEX views_stats_source ON views_stats USING btree(source);";
398
        stmt.executeUpdate(sql);
399

    
400
        sql = "CREATE INDEX views_stats_repository_id ON views_stats USING btree(repository_id);";
401
        stmt.executeUpdate(sql);
402

    
403
        sql = "CREATE INDEX views_stats_result_id ON views_stats USING btree(result_id);";
404
        stmt.executeUpdate(sql);
405

    
406
        sql = "CREATE INDEX views_stats_date ON views_stats USING btree(date);";
407
        stmt.executeUpdate(sql);
408

    
409
        sql = "CREATE INDEX pageviews_stats_source ON pageviews_stats USING btree(source);";
410
        stmt.executeUpdate(sql);
411

    
412
        sql = "CREATE INDEX pageviews_stats_repository_id ON pageviews_stats USING btree(repository_id);";
413
        stmt.executeUpdate(sql);
414

    
415
        sql = "CREATE INDEX pageviews_stats_result_id ON pageviews_stats USING btree(result_id);";
416
        stmt.executeUpdate(sql);
417

    
418
        sql = "CREATE INDEX pageviews_stats_date ON pageviews_stats USING btree(date);";
419
        stmt.executeUpdate(sql);
420

    
421
        sql = "CREATE INDEX downloads_stats_source ON downloads_stats USING btree(source);";
422
        stmt.executeUpdate(sql);
423

    
424
        sql = "CREATE INDEX downloads_stats_repository_id ON downloads_stats USING btree(repository_id);";
425
        stmt.executeUpdate(sql);
426

    
427
        sql = "CREATE INDEX downloads_stats_result_id ON downloads_stats USING btree(result_id);";
428
        stmt.executeUpdate(sql);
429

    
430
        sql = "CREATE INDEX downloads_stats_date ON downloads_stats USING btree(date);";
431
        stmt.executeUpdate(sql);
432

    
433
//        sql = "SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views INTO usage_stats FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
434
        sql = "CREATE TABLE IF NOT EXISTS usage_stats AS SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
435
        stmt.executeUpdate(sql);
436

    
437
        sql = "CREATE INDEX usage_stats_source ON usage_stats USING btree(source);";
438
        stmt.executeUpdate(sql);
439

    
440
        sql = "CREATE INDEX usage_stats_repository_id ON usage_stats USING btree(repository_id);";
441
        stmt.executeUpdate(sql);
442

    
443
        sql = "CREATE INDEX usage_stats_result_id ON usage_stats USING btree(result_id);";
444
        stmt.executeUpdate(sql);
445

    
446
        sql = "CREATE INDEX usage_stats_date ON usage_stats USING btree(date);";
447
        stmt.executeUpdate(sql);
448

    
449
        stmt.close();
450
        ConnectDB.DB_CONNECTION.commit();
451
        ConnectDB.DB_CONNECTION.close();
452
    }
453

    
454
    //views stats
455
//    private void viewsStatsOLD() throws Exception {
456
//        if (conn.isClosed())
457
//            connectDB();
458
//
459
//        stmt = conn.createStatement();
460
//        conn.setAutoCommit(false);
461
//
462
//        Calendar startCalendar = Calendar.getInstance();
463
//        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
464
//        Calendar endCalendar = Calendar.getInstance();
465
//        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
466
//        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
467
//
468
//        String sql = "CREATE OR REPLACE VIEW result_views_monthly as select entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'-'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
469
//        stmt.executeUpdate(sql);
470
//
471
//        sql = "SELECT d.id, d.new_date AS month, CASE when rdm.views IS NULL THEN 0 ELSE rdm.views END, d.source INTO result_views_sushi FROM (SELECT distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date, rdsm.source FROM generate_series(0, " + diffMonth + ", 1) AS offs, result_views_monthly rdsm) d LEFT JOIN (SELECT id, month, views, source FROM result_views_monthly) rdm ON d.new_date=rdm.month AND d.id=rdm.id AND d.source=rdm.source ORDER BY d.source, d.id, d.new_date;";
472
//        stmt.executeUpdate(sql);
473
//
474
//        sql = "CREATE INDEX result_views_sushi_id ON result_views_sushi USING btree (id);";
475
//        stmt.executeUpdate(sql);
476
//
477
//        sql = "CREATE INDEX result_views_sushi_month ON result_views_sushi USING btree (month);";
478
//        stmt.executeUpdate(sql);
479
//
480
//        sql = "CREATE INDEX result_views_sushi_source ON result_views_sushi USING btree (source);";
481
//        stmt.executeUpdate(sql);
482
//
483
//        sql = "SELECT roid.id, extract('year' from month::date) ||'/'|| LPAD(CAST(extract('month' from month::date) AS VARCHAR), 2, '0') as month, max(views) as views, source INTO result_views FROM result_views_sushi rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY source, roid.id, month ORDER BY source, roid.id, month;";
484
//        stmt.executeUpdate(sql);
485
//
486
//        sql = "CREATE INDEX result_views_id ON result_views USING btree (id);";
487
//        stmt.executeUpdate(sql);
488
//
489
//        sql = "CREATE INDEX result_views_month ON result_views USING btree (month);";
490
//        stmt.executeUpdate(sql);
491
//
492
//        sql = "CREATE INDEX result_views_source ON result_views USING btree (source);";
493
//        stmt.executeUpdate(sql);
494
//
495
//
496
//        sql = "DROP VIEW IF EXISTS result_views_monthly;";
497
//        stmt.executeUpdate(sql);
498
//
499
//        stmt.close();
500
//        conn.commit();
501
//        conn.close();
502
//    }
503
    //downloads stats
504
//    private void downloadsStatsOLD() throws Exception {
505
//        if (conn.isClosed())
506
//            connectDB();
507
//
508
//        stmt = conn.createStatement();
509
//        conn.setAutoCommit(false);
510
//
511
//        Calendar startCalendar = Calendar.getInstance();
512
//        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
513
//        Calendar endCalendar = Calendar.getInstance();
514
//        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
515
//        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
516
//
517
//        String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'-'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
518
//        stmt.executeUpdate(sql);
519
//
520
//        sql = "SELECT d.id, d.new_date AS month, CASE when rdm.downloads IS NULL THEN 0 ELSE rdm.downloads END, d.source INTO result_downloads_sushi FROM (SELECT distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date, rdsm.source FROM generate_series(0, " + diffMonth + ", 1) AS offs, result_downloads_monthly rdsm) d LEFT JOIN (SELECT id, month, downloads, source FROM result_downloads_monthly) rdm ON d.new_date=rdm.month AND d.id=rdm.id AND d.source=rdm.source ORDER BY d.source, d.id, d.new_date;";
521
//        stmt.executeUpdate(sql);
522
//
523
//        sql = "CREATE INDEX result_downloads_sushi_id ON result_downloads_sushi USING btree (id);";
524
//        stmt.executeUpdate(sql);
525
//
526
//        sql = "CREATE INDEX result_downloads_sushi_month ON result_downloads_sushi USING btree (month);";
527
//        stmt.executeUpdate(sql);
528
//
529
//        sql = "CREATE INDEX result_downloads_sushi_source ON result_downloads_sushi USING btree (source);";
530
//        stmt.executeUpdate(sql);
531
//
532
//        sql = "SELECT roid.id, extract('year' from month::date) ||'/'|| LPAD(CAST(extract('month' from month::date) AS VARCHAR), 2, '0') as month, downloads, source INTO result_downloads FROM result_downloads_sushi rvs, result_oids roid WHERE rvs.id=roid.orid ORDER BY source, roid.id, month;";
533
//        stmt.executeUpdate(sql);
534
//
535
//        sql = "CREATE INDEX result_downloads_id ON result_downloads USING btree (id);";
536
//        stmt.executeUpdate(sql);
537
//
538
//        sql = "CREATE INDEX result_downloads_month ON result_downloads USING btree (month);";
539
//        stmt.executeUpdate(sql);
540
//
541
//        sql = "CREATE INDEX result_downloads_source ON result_downloads USING btree (source);";
542
//        stmt.executeUpdate(sql);
543
//
544
//
545
//        sql = "DROP VIEW IF EXISTS result_downloads_monthly;";
546
//        stmt.executeUpdate(sql);
547
//
548
//        stmt.close();
549
//        conn.commit();
550
//        conn.close();
551
//    }
552
    //Create repository Views statistics
553
    private void repositoryViewsStats() throws Exception {
554
        if (ConnectDB.DB_CONNECTION.isClosed()) {
555
            connectDB();
556
        }
557

    
558
        stmt = ConnectDB.DB_CONNECTION.createStatement();
559
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
560

    
561
//        String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
562
        String sql = "CREATE TABLE IF NOT EXISTS repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
563
        stmt.executeUpdate(sql);
564

    
565
        sql = "CREATE INDEX repo_view_stats_id ON repo_view_stats USING btree (id)";
566
        stmt.executeUpdate(sql);
567

    
568
        sql = "CREATE INDEX repo_view_stats_date ON repo_view_stats USING btree(date)";
569
        stmt.executeUpdate(sql);
570

    
571
//        sql = "SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_view_stats_monthly_clean FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
572
        sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly_clean AS SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
573
        stmt.executeUpdate(sql);
574

    
575
        sql = "CREATE INDEX repo_view_stats_monthly_clean_id ON repo_view_stats_monthly_clean USING btree (id)";
576
        stmt.executeUpdate(sql);
577

    
578
        sql = "CREATE INDEX repo_view_stats_monthly_clean_month ON repo_view_stats_monthly_clean USING btree(month)";
579
        stmt.executeUpdate(sql);
580

    
581
        sql = "CREATE INDEX repo_view_stats_monthly_clean_source ON repo_view_stats_monthly_clean USING btree(source)";
582
        stmt.executeUpdate(sql);
583

    
584
        Calendar startCalendar = Calendar.getInstance();
585
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
586
        Calendar endCalendar = Calendar.getInstance();
587
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
588
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
589

    
590
        //sql="CREATE OR REPLACE view repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
591
//        sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_view_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
592
        sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
593
        stmt.executeUpdate(sql);
594

    
595
        sql = "CREATE INDEX repo_view_stats_monthly_id ON repo_view_stats_monthly USING btree (id)";
596
        stmt.executeUpdate(sql);
597

    
598
        sql = "CREATE INDEX repo_view_stats_monthly_month ON repo_view_stats_monthly USING btree(month)";
599
        stmt.executeUpdate(sql);
600

    
601
        sql = "CREATE INDEX repo_view_stats_monthly_source ON repo_view_stats_monthly USING btree(source)";
602
        stmt.executeUpdate(sql);
603

    
604
        sql = "CREATE OR REPLACE view repo_view_stats_monthly_sushi AS SELECT id, sum(number_of_views), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_view_stats group by id, month, source;";
605
        stmt.executeUpdate(sql);
606

    
607
        stmt.close();
608
        ConnectDB.DB_CONNECTION.commit();
609
        ConnectDB.DB_CONNECTION.close();
610
    }
611

    
612
    //Create repository downloads statistics
613
    private void repositoryDownloadsStats() throws Exception {
614
        if (ConnectDB.DB_CONNECTION.isClosed()) {
615
            connectDB();
616
        }
617

    
618
        stmt = ConnectDB.DB_CONNECTION.createStatement();
619
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
620

    
621
//        String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
622
        String sql = "CREATE TABLE IF NOT EXISTS repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
623
        stmt.executeUpdate(sql);
624

    
625
        sql = "CREATE INDEX repo_download_stats_id ON repo_download_stats USING btree (id)";
626
        stmt.executeUpdate(sql);
627

    
628
        sql = "CREATE INDEX repo_download_stats_date ON repo_download_stats USING btree(date)";
629
        stmt.executeUpdate(sql);
630

    
631
//        sql = "SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_download_stats_monthly_clean FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
632
        sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly_clean AS SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
633
        stmt.executeUpdate(sql);
634

    
635
        sql = "CREATE INDEX repo_download_stats_monthly_clean_id ON repo_download_stats_monthly_clean USING btree (id)";
636
        stmt.executeUpdate(sql);
637

    
638
        sql = "CREATE INDEX repo_download_stats_monthly_clean_month ON repo_download_stats_monthly_clean USING btree(month)";
639
        stmt.executeUpdate(sql);
640

    
641
        sql = "CREATE INDEX repo_download_stats_monthly_clean_source ON repo_download_stats_monthly_clean USING btree(source)";
642
        stmt.executeUpdate(sql);
643

    
644
        Calendar startCalendar = Calendar.getInstance();
645
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
646
        Calendar endCalendar = Calendar.getInstance();
647
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
648
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
649

    
650
        //sql="CREATE OR REPLACE view repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
651
        // sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_download_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
652
        sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
653
        stmt.executeUpdate(sql);
654

    
655
        sql = "CREATE INDEX repo_download_stats_monthly_id ON repo_download_stats_monthly USING btree (id)";
656
        stmt.executeUpdate(sql);
657

    
658
        sql = "CREATE INDEX repo_download_stats_monthly_month ON repo_download_stats_monthly USING btree(month)";
659
        stmt.executeUpdate(sql);
660

    
661
        sql = "CREATE INDEX repo_download_stats_monthly_source ON repo_download_stats_monthly USING btree(source)";
662
        stmt.executeUpdate(sql);
663

    
664
        sql = "CREATE OR REPLACE view repo_download_stats_monthly_sushi AS SELECT id, sum(number_of_downloads), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_download_stats group by id, month, source;";
665
        stmt.executeUpdate(sql);
666

    
667
        stmt.close();
668
        ConnectDB.DB_CONNECTION.commit();
669
        ConnectDB.DB_CONNECTION.close();
670
    }
671
// Import OPENAIRE Logs to DB
672
    public void processPortalLog() throws Exception {
673

    
674
        if (ConnectDB.DB_CONNECTION.isClosed()) {
675
            connectDB();
676
        }
677

    
678
        Statement stmt = ConnectDB.DB_CONNECTION.createStatement();
679
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
680

    
681
        //ArrayList<String> jsonFiles = listHdfsDir(logPath + "portallog");
682
        File folder = new File(this.logPortalPath);
683
        File[] jsonFiles = folder.listFiles();
684

    
685
        PreparedStatement prepStatem = ConnectDB.DB_CONNECTION.prepareStatement("INSERT INTO process_portal_log_tmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
686
        int batch_size = 0;
687
        JSONParser parser = new JSONParser();
688
        for (File jsonFile : jsonFiles) {
689
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile.getAbsolutePath()));
690

    
691
            for (Object aJsonArray : jsonArray) {
692
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
693
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
694
                String idVisit = jsonObjectRow.get("idVisit").toString();
695
                String country = jsonObjectRow.get("country").toString();
696
                String referrerName = jsonObjectRow.get("referrerName").toString();
697
                String agent = jsonObjectRow.get("browser").toString();
698
                boolean botFound = false;
699
                Iterator it = robotsList.iterator();
700
                while (it.hasNext()) {
701
                    // Create a Pattern object
702
                    Pattern r = Pattern.compile(it.next().toString());
703
                    // Now create matcher object.
704
                    Matcher m = r.matcher(agent);
705
                    if (m.find()) {
706
                        System.out.println("Found value: " + m.group(0));
707
                        botFound = true;
708
                        break;
709
                    }
710
                }
711
                if (botFound == false) {
712
                    JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
713
                    for (Object actionDetail : actionDetails) {
714
                        JSONObject actionDetailsObj = (JSONObject) actionDetail;
715

    
716
                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
717
                        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
718
                        Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
719

    
720
                        String action = actionDetailsObj.get("type").toString();
721
                        String url = actionDetailsObj.get("url").toString();
722

    
723
                        String entityID = processPortalURL(url);
724
                        String sourceItemType = "";
725

    
726
                        if (entityID.indexOf("|") > 0) {
727
                            sourceItemType = entityID.substring(0, entityID.indexOf("|"));
728
                            entityID = entityID.substring(entityID.indexOf("|") + 1);
729
                        }
730

    
731
                        prepStatem.setInt(1, idSite);
732
                        prepStatem.setString(2, idVisit);
733
                        prepStatem.setString(3, country);
734
                        prepStatem.setString(4, action);
735
                        prepStatem.setString(5, url);
736
                        prepStatem.setString(6, entityID);
737
                        prepStatem.setString(7, sourceItemType);
738
                        prepStatem.setString(8, simpleDateFormat.format(timestamp));
739
                        prepStatem.setString(9, referrerName);
740
                        prepStatem.setString(10, agent);
741

    
742
                        prepStatem.addBatch();
743
                        batch_size++;
744
                        if (batch_size == 10000) {
745
                            prepStatem.executeBatch();
746
                            ConnectDB.DB_CONNECTION.commit();
747
                            batch_size = 0;
748
                        }
749
                    }
750
                }
751
            }
752
        }
753
        prepStatem.executeBatch();
754
        ConnectDB.DB_CONNECTION.commit();
755

    
756
        stmt.close();
757
        ConnectDB.DB_CONNECTION.close();
758
    }
759

    
760
 
761
    public void portalStats() throws Exception {
762
        if (ConnectDB.DB_CONNECTION.isClosed()) {
763
            connectDB();
764
        }
765

    
766
        Statement stmt = ConnectDB.DB_CONNECTION.createStatement();
767
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
768

    
769
        String sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'oaItem\', timestamp, referrer_name, agent FROM process_portal_log_tmp, result_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
770
        //SELECT DISTINCT process_portal_log.source, id_visit, country, action, url, roid.orid, 'oaItem', timestamp, referrer_name, agent FROM process_portal_log, result r, result_oids roid WHERE entity_id IS NOT null AND entity_id=r.original_id AND r.id = roid.id AND roid.orid IS NOT null;
771
        stmt.executeUpdate(sql);
772
        stmt.close();
773
        ConnectDB.DB_CONNECTION.commit();
774

    
775
        stmt = ConnectDB.DB_CONNECTION.createStatement();
776
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'datasource\', timestamp, referrer_name, agent FROM process_portal_log_tmp, datasource_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
777
        stmt.executeUpdate(sql);
778
        stmt.close();
779
        ConnectDB.DB_CONNECTION.commit();
780

    
781
        stmt = ConnectDB.DB_CONNECTION.createStatement();
782
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'organization\', timestamp, referrer_name, agent FROM process_portal_log_tmp, organization_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
783
        stmt.executeUpdate(sql);
784
        stmt.close();
785
        ConnectDB.DB_CONNECTION.commit();
786

    
787
        stmt = ConnectDB.DB_CONNECTION.createStatement();
788
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'project\', timestamp, referrer_name, agent FROM process_portal_log_tmp, project_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
789
        stmt.executeUpdate(sql);
790
        stmt.close();
791
        ConnectDB.DB_CONNECTION.commit();
792
        /*
793
        stmt = conn.createStatement();
794
        sql = "DROP TABLE process_portal_log;";
795
        stmt.executeUpdate(sql);
796
        stmt.close();
797
        conn.commit();
798
         */
799

    
800
        ConnectDB.DB_CONNECTION.close();
801
    }
802

    
803
   
804

    
805
    private void cleanOAI() throws Exception {
806
        if (ConnectDB.DB_CONNECTION.isClosed()) {
807
            connectDB();
808
        }
809

    
810
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
811

    
812
        stmt = ConnectDB.DB_CONNECTION.createStatement();
813
        String sql = "UPDATE piwiklogmtp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chlc.min-saude.pt/','oai:repositorio.chlc.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.chlc.min-saude.pt/%';";
814
        stmt.executeUpdate(sql);
815
        stmt.close();
816
        ConnectDB.DB_CONNECTION.commit();
817

    
818
        stmt = ConnectDB.DB_CONNECTION.createStatement();
819
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hospitaldebraga.pt/','oai:repositorio.hospitaldebraga.pt:') WHERE entity_id LIKE 'oai:repositorio.hospitaldebraga.pt/%';";
820
        stmt.executeUpdate(sql);
821
        stmt.close();
822
        ConnectDB.DB_CONNECTION.commit();
823

    
824
        stmt = ConnectDB.DB_CONNECTION.createStatement();
825
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipl.pt/','oai:repositorio.ipl.pt:') WHERE entity_id LIKE 'oai:repositorio.ipl.pt/%';";
826
        stmt.executeUpdate(sql);
827
        stmt.close();
828
        ConnectDB.DB_CONNECTION.commit();
829

    
830
        stmt = ConnectDB.DB_CONNECTION.createStatement();
831
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:bibliotecadigital.ipb.pt/','oai:bibliotecadigital.ipb.pt:') WHERE entity_id LIKE 'oai:bibliotecadigital.ipb.pt/%';";
832
        stmt.executeUpdate(sql);
833
        stmt.close();
834
        ConnectDB.DB_CONNECTION.commit();
835

    
836
        stmt = ConnectDB.DB_CONNECTION.createStatement();
837
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ismai.pt/','oai:repositorio.ismai.pt:') WHERE entity_id LIKE 'oai:repositorio.ismai.pt/%';";
838
        stmt.executeUpdate(sql);
839
        stmt.close();
840
        ConnectDB.DB_CONNECTION.commit();
841

    
842
        stmt = ConnectDB.DB_CONNECTION.createStatement();
843
        sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorioaberto.uab.pt/','oai:repositorioaberto.uab.pt:') WHERE entity_id LIKE 'oai:repositorioaberto.uab.pt/%';";
844
        stmt.executeUpdate(sql);
845
        stmt.close();
846
        ConnectDB.DB_CONNECTION.commit();
847

    
848
        stmt = ConnectDB.DB_CONNECTION.createStatement();
849
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.uac.pt/','oai:repositorio.uac.pt:') WHERE entity_id LIKE 'oai:repositorio.uac.pt/%';";
850
        stmt.executeUpdate(sql);
851
        stmt.close();
852
        ConnectDB.DB_CONNECTION.commit();
853

    
854
        stmt = ConnectDB.DB_CONNECTION.createStatement();
855
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.insa.pt/','oai:repositorio.insa.pt:') WHERE entity_id LIKE 'oai:repositorio.insa.pt/%';";
856
        stmt.executeUpdate(sql);
857
        stmt.close();
858
        ConnectDB.DB_CONNECTION.commit();
859

    
860
        stmt = ConnectDB.DB_CONNECTION.createStatement();
861
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipcb.pt/','oai:repositorio.ipcb.pt:') WHERE entity_id LIKE 'oai:repositorio.ipcb.pt/%';";
862
        stmt.executeUpdate(sql);
863
        stmt.close();
864
        ConnectDB.DB_CONNECTION.commit();
865

    
866
        stmt = ConnectDB.DB_CONNECTION.createStatement();
867
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ispa.pt/','oai:repositorio.ispa.pt:') WHERE entity_id LIKE 'oai:repositorio.ispa.pt/%';";
868
        stmt.executeUpdate(sql);
869
        stmt.close();
870
        ConnectDB.DB_CONNECTION.commit();
871

    
872
        stmt = ConnectDB.DB_CONNECTION.createStatement();
873
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chporto.pt/','oai:repositorio.chporto.pt:') WHERE entity_id LIKE 'oai:repositorio.chporto.pt/%';";
874
        stmt.executeUpdate(sql);
875
        stmt.close();
876
        ConnectDB.DB_CONNECTION.commit();
877

    
878
        stmt = ConnectDB.DB_CONNECTION.createStatement();
879
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ucp.pt/','oai:repositorio.ucp.pt:') WHERE entity_id LIKE 'oai:repositorio.ucp.pt/%';";
880
        stmt.executeUpdate(sql);
881
        stmt.close();
882
        ConnectDB.DB_CONNECTION.commit();
883

    
884
        stmt = ConnectDB.DB_CONNECTION.createStatement();
885
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:rihuc.huc.min-saude.pt/','oai:rihuc.huc.min-saude.pt:') WHERE entity_id LIKE 'oai:rihuc.huc.min-saude.pt/%';";
886
        stmt.executeUpdate(sql);
887
        stmt.close();
888
        ConnectDB.DB_CONNECTION.commit();
889

    
890
        stmt = ConnectDB.DB_CONNECTION.createStatement();
891
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipv.pt/','oai:repositorio.ipv.pt:') WHERE entity_id LIKE 'oai:repositorio.ipv.pt/%';";
892
        stmt.executeUpdate(sql);
893
        stmt.close();
894
        ConnectDB.DB_CONNECTION.commit();
895

    
896
        stmt = ConnectDB.DB_CONNECTION.createStatement();
897
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:www.repository.utl.pt/','oai:www.repository.utl.pt:') WHERE entity_id LIKE 'oai:www.repository.utl.pt/%';";
898
        stmt.executeUpdate(sql);
899
        stmt.close();
900
        ConnectDB.DB_CONNECTION.commit();
901

    
902
        stmt = ConnectDB.DB_CONNECTION.createStatement();
903
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:run.unl.pt/','oai:run.unl.pt:') WHERE entity_id LIKE 'oai:run.unl.pt/%';";
904
        stmt.executeUpdate(sql);
905
        stmt.close();
906
        ConnectDB.DB_CONNECTION.commit();
907

    
908
        stmt = ConnectDB.DB_CONNECTION.createStatement();
909
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:sapientia.ualg.pt/','oai:sapientia.ualg.pt:') WHERE entity_id LIKE 'oai:sapientia.ualg.pt/%';";
910
        stmt.executeUpdate(sql);
911
        stmt.close();
912
        ConnectDB.DB_CONNECTION.commit();
913

    
914
        stmt = ConnectDB.DB_CONNECTION.createStatement();
915
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipsantarem.pt/','oai:repositorio.ipsantarem.pt:') WHERE entity_id LIKE 'oai:repositorio.ipsantarem.pt/%';";
916
        stmt.executeUpdate(sql);
917
        stmt.close();
918
        ConnectDB.DB_CONNECTION.commit();
919

    
920
        stmt = ConnectDB.DB_CONNECTION.createStatement();
921
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:arca.igc.gulbenkian.pt/','oai:arca.igc.gulbenkian.pt:') WHERE entity_id LIKE 'oai:arca.igc.gulbenkian.pt/%';";
922
        stmt.executeUpdate(sql);
923
        stmt.close();
924
        ConnectDB.DB_CONNECTION.commit();
925

    
926
        stmt = ConnectDB.DB_CONNECTION.createStatement();
927
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:ubibliorum.ubi.pt/','oai:ubibliorum.ubi.pt:') WHERE entity_id LIKE 'oai:ubibliorum.ubi.pt/%';";
928
        stmt.executeUpdate(sql);
929
        stmt.close();
930
        ConnectDB.DB_CONNECTION.commit();
931

    
932
        stmt = ConnectDB.DB_CONNECTION.createStatement();
933
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:digituma.uma.pt/','oai:digituma.uma.pt:') WHERE entity_id LIKE 'oai:digituma.uma.pt/%';";
934
        stmt.executeUpdate(sql);
935
        stmt.close();
936
        ConnectDB.DB_CONNECTION.commit();
937

    
938
        stmt = ConnectDB.DB_CONNECTION.createStatement();
939
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ul.pt/','oai:repositorio.ul.pt:') WHERE entity_id LIKE 'oai:repositorio.ul.pt/%';";
940
        stmt.executeUpdate(sql);
941
        stmt.close();
942
        ConnectDB.DB_CONNECTION.commit();
943

    
944
        stmt = ConnectDB.DB_CONNECTION.createStatement();
945
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hff.min-saude.pt/','oai:repositorio.hff.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.hff.min-saude.pt/%';";
946
        stmt.executeUpdate(sql);
947
        stmt.close();
948
        ConnectDB.DB_CONNECTION.commit();
949

    
950
        stmt = ConnectDB.DB_CONNECTION.createStatement();
951
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorium.sdum.uminho.pt/','oai:repositorium.sdum.uminho.pt:') WHERE entity_id LIKE 'oai:repositorium.sdum.uminho.pt/%';";
952
        stmt.executeUpdate(sql);
953
        stmt.close();
954
        ConnectDB.DB_CONNECTION.commit();
955

    
956
        stmt = ConnectDB.DB_CONNECTION.createStatement();
957
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:recipp.ipp.pt/','oai:recipp.ipp.pt:') WHERE entity_id LIKE 'oai:recipp.ipp.pt/%';";
958
        stmt.executeUpdate(sql);
959
        stmt.close();
960
        ConnectDB.DB_CONNECTION.commit();
961

    
962
        stmt = ConnectDB.DB_CONNECTION.createStatement();
963
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:bdigital.ufp.pt/','oai:bdigital.ufp.pt:') WHERE entity_id LIKE 'oai:bdigital.ufp.pt/%';";
964
        stmt.executeUpdate(sql);
965
        stmt.close();
966
        ConnectDB.DB_CONNECTION.commit();
967

    
968
        stmt = ConnectDB.DB_CONNECTION.createStatement();
969
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.lneg.pt/','oai:repositorio.lneg.pt:') WHERE entity_id LIKE 'oai:repositorio.lneg.pt/%';";
970
        stmt.executeUpdate(sql);
971
        stmt.close();
972
        ConnectDB.DB_CONNECTION.commit();
973

    
974
        stmt = ConnectDB.DB_CONNECTION.createStatement();
975
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:iconline.ipleiria.pt/','oai:iconline.ipleiria.pt:') WHERE entity_id LIKE 'oai:iconline.ipleiria.pt/%';";
976
        stmt.executeUpdate(sql);
977
        stmt.close();
978
        ConnectDB.DB_CONNECTION.commit();
979

    
980
        stmt = ConnectDB.DB_CONNECTION.createStatement();
981
        sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:comum.rcaap.pt/','oai:comum.rcaap.pt:') WHERE entity_id LIKE 'oai:comum.rcaap.pt/%';";
982
        stmt.executeUpdate(sql);
983
        stmt.close();
984
        ConnectDB.DB_CONNECTION.commit();
985

    
986
        ConnectDB.DB_CONNECTION.close();
987
    }
988

    
989
    //Create OpenAIRE's portal datasource statistics
990
    private void dataSourceStats() throws Exception {
991
        if (ConnectDB.DB_CONNECTION.isClosed()) {
992
            connectDB();
993
        }
994

    
995
        stmt = ConnectDB.DB_CONNECTION.createStatement();
996
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
997

    
998
        // String sql = "SELECT  orgid AS id, max(viewcount) AS number_of_views, date INTO datasource_stats FROM (select entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date as date FROM piwiklog, datasource_oids ooid WHERE entity_id=ooid.orid AND source_item_type='datasource' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
999
        String sql = "CREATE TABLE IF NOT EXISTS datasource_stats AS SELECT  orgid AS id, max(viewcount) AS number_of_views, date FROM (select entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date as date FROM piwiklog, datasource_oids ooid WHERE entity_id=ooid.orid AND source_item_type='datasource' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
1000
        stmt.executeUpdate(sql);
1001

    
1002
        sql = "CREATE INDEX datasource_stats_id ON datasource_stats USING btree (id)";
1003
        stmt.executeUpdate(sql);
1004

    
1005
        sql = "CREATE INDEX datasource_stats_date ON datasource_stats USING btree(date)";
1006
        stmt.executeUpdate(sql);
1007

    
1008
        // sql = "SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month INTO datasource_stats_monthly_clean FROM datasource_stats GROUP BY id, month;";
1009
        sql = "CREATE TABLE IF NOT EXISIS datasource_stats_monthly_clean AS SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month FROM datasource_stats GROUP BY id, month;";
1010
        stmt.executeUpdate(sql);
1011

    
1012
        sql = "CREATE INDEX datasource_stats_monthly_clean_id ON datasource_stats_monthly_clean USING btree (id)";
1013
        stmt.executeUpdate(sql);
1014

    
1015
        sql = "CREATE INDEX datasource_stats_monthly_clean_month ON datasource_stats_monthly_clean USING btree(month)";
1016
        stmt.executeUpdate(sql);
1017

    
1018
        Calendar startCalendar = Calendar.getInstance();
1019
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
1020
        Calendar endCalendar = Calendar.getInstance();
1021
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
1022
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
1023

    
1024
        //sql="CREATE OR REPLACE view datasource_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth +", 1) AS offs, datasource_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from datasource_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1025
        //sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end INTO datasource_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, datasource_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from datasource_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1026
        sql = "CREATE TABLE IF NOT EXISTS datasource_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, datasource_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from datasource_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1027
        stmt.executeUpdate(sql);
1028

    
1029
        sql = "CREATE INDEX datasource_stats_monthly_id ON datasource_stats_monthly USING btree (id)";
1030
        stmt.executeUpdate(sql);
1031

    
1032
        sql = "CREATE INDEX datasource_stats_monthly_month ON datasource_stats_monthly USING btree(month)";
1033
        stmt.executeUpdate(sql);
1034

    
1035
        stmt.close();
1036
        ConnectDB.DB_CONNECTION.commit();
1037
        ConnectDB.DB_CONNECTION.close();
1038
    }
1039

    
1040
    //Create OpenAIRE's portal results statistics
1041
    private void resultStats() throws Exception {
1042
        if (ConnectDB.DB_CONNECTION.isClosed()) {
1043
            connectDB();
1044
        }
1045

    
1046
        stmt = ConnectDB.DB_CONNECTION.createStatement();
1047
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
1048

    
1049
        // String sql = "SELECT orgid AS id, max(viewcount) AS number_of_views, date INTO result_stats FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date as date FROM piwiklog, result_oids ooid WHERE entity_id=ooid.orid AND source_item_type='oaItem' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
1050
        String sql = "CREATE TABLE IF NOT EXISIS result_stats AS SELECT orgid AS id, max(viewcount) AS number_of_views, date  FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date as date FROM piwiklog, result_oids ooid WHERE entity_id=ooid.orid AND source_item_type='oaItem' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
1051
        stmt.executeUpdate(sql);
1052

    
1053
        sql = "CREATE INDEX result_stats_id ON result_stats USING btree (id)";
1054
        stmt.executeUpdate(sql);
1055

    
1056
        sql = "CREATE INDEX result_stats_date ON result_stats USING btree(date)";
1057
        stmt.executeUpdate(sql);
1058

    
1059
//        sql = "SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month INTO result_stats_monthly_clean FROM result_stats group by id, month;";
1060
        sql = "CREATE TABLE IF NOT EXISTS result_stats_monthly_clean AS SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month FROM result_stats group by id, month;";
1061
        stmt.executeUpdate(sql);
1062

    
1063
        sql = "CREATE INDEX result_stats_monthly_clean_id ON result_stats_monthly_clean USING btree (id)";
1064
        stmt.executeUpdate(sql);
1065

    
1066
        sql = "CREATE INDEX result_stats_monthly_clean_month ON result_stats_monthly_clean USING btree(month)";
1067
        stmt.executeUpdate(sql);
1068

    
1069
        Calendar startCalendar = Calendar.getInstance();
1070
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
1071
        Calendar endCalendar = Calendar.getInstance();
1072
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
1073
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
1074

    
1075
        //sql="CREATE OR REPLACE view result_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth +", 1) AS offs, result_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from result_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1076
//        sql = "select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end INTO result_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, result_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from result_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1077
        sql = "CREATE TABLE IF NOT EXISTS result_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, result_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from result_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1078
        stmt.executeUpdate(sql);
1079

    
1080
        sql = "CREATE INDEX result_stats_monthly_id ON result_stats_monthly USING btree (id)";
1081
        stmt.executeUpdate(sql);
1082

    
1083
        sql = "CREATE INDEX result_stats_monthly_month ON result_stats_monthly USING btree(month)";
1084
        stmt.executeUpdate(sql);
1085

    
1086
        stmt.close();
1087
        ConnectDB.DB_CONNECTION.commit();
1088
        ConnectDB.DB_CONNECTION.close();
1089
    }
1090

    
1091
    //Create OpenAIRE's portal organization statistics
1092
    private void organizationsStats() throws Exception {
1093
        if (ConnectDB.DB_CONNECTION.isClosed()) {
1094
            connectDB();
1095
        }
1096

    
1097
        stmt = ConnectDB.DB_CONNECTION.createStatement();
1098
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
1099

    
1100
        // String sql = "SELECT orgid AS id, max(viewcount) AS number_of_views, date INTO organization_stats FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date AS date FROM piwiklog, organization_oids ooid WHERE entity_id=ooid.orid AND source_item_type='organization' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
1101
        String sql = "CREATE TABLE IF NOT EXISTS organization_stats AS SELECT orgid AS id, max(viewcount) AS number_of_views, date FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date AS date FROM piwiklog, organization_oids ooid WHERE entity_id=ooid.orid AND source_item_type='organization' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
1102
        stmt.executeUpdate(sql);
1103

    
1104
        sql = "CREATE INDEX organization_stats_id ON organization_stats USING btree (id)";
1105
        stmt.executeUpdate(sql);
1106

    
1107
        sql = "CREATE INDEX organization_stats_date ON organization_stats USING btree(date)";
1108
        stmt.executeUpdate(sql);
1109

    
1110
        // sql = "SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month INTO organization_stats_monthly_clean FROM organization_stats group by id, month;";
1111
        sql = "CREATE TABLE IF NOT EXISTS organization_stats_monthly_clean AS SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month FROM organization_stats group by id, month;";
1112
        stmt.executeUpdate(sql);
1113

    
1114
        sql = "CREATE INDEX organization_stats_monthly_clean_id ON organization_stats_monthly_clean USING btree (id)";
1115
        stmt.executeUpdate(sql);
1116

    
1117
        sql = "CREATE INDEX organization_stats_monthly_clean_month ON organization_stats_monthly_clean USING btree(month)";
1118
        stmt.executeUpdate(sql);
1119

    
1120
        Calendar startCalendar = Calendar.getInstance();
1121
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
1122
        Calendar endCalendar = Calendar.getInstance();
1123
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
1124
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
1125

    
1126
        //sql="CREATE OR REPLACE view organization_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth +", 1) AS offs, organization_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from organization_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1127
        // sql = "select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end INTO organization_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, organization_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from organization_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1128
        sql = "CREATE TABLE IF NOT EXISTS organization_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY-MM-DD') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, organization_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from organization_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1129
        stmt.executeUpdate(sql);
1130

    
1131
        sql = "CREATE INDEX organization_stats_monthly_id ON organization_stats_monthly USING btree (id)";
1132
        stmt.executeUpdate(sql);
1133

    
1134
        sql = "CREATE INDEX organization_stats_monthly_month ON organization_stats_monthly USING btree(month)";
1135
        stmt.executeUpdate(sql);
1136

    
1137
        stmt.close();
1138
        ConnectDB.DB_CONNECTION.commit();
1139
        ConnectDB.DB_CONNECTION.close();
1140
    }
1141

    
1142
    //Create OpenAIRE's portal projects statistics
1143
    private void projectsStats() throws Exception {
1144
        if (ConnectDB.DB_CONNECTION.isClosed()) {
1145
            connectDB();
1146
        }
1147

    
1148
        stmt = ConnectDB.DB_CONNECTION.createStatement();
1149
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
1150

    
1151
        // String sql = "SELECT orgid AS id, max(viewcount) AS number_of_views, date INTO project_stats FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date AS date FROM piwiklog, project_oids ooid WHERE entity_id=ooid.orid AND source_item_type='project' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
1152
        String sql = "CREATE TABLE IF NOT EXISTS project_stats AS SELECT orgid AS id, max(viewcount) AS number_of_views, date FROM (SELECT entity_id, ooid.id AS orgid, count(entity_id) viewcount, timestamp::date AS date FROM piwiklog, project_oids ooid WHERE entity_id=ooid.orid AND source_item_type='project' GROUP BY entity_id, ooid.id, date) AS temp GROUP BY orgid, date ORDER BY orgid, date ASC, max(viewcount) DESC;";
1153
        stmt.executeUpdate(sql);
1154

    
1155
        sql = "CREATE INDEX project_stats_id ON project_stats USING btree (id)";
1156
        stmt.executeUpdate(sql);
1157

    
1158
        sql = "CREATE INDEX project_stats_date ON project_stats USING btree(date)";
1159
        stmt.executeUpdate(sql);
1160

    
1161
        // sql = "SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month INTO project_stats_monthly_clean FROM project_stats group by id, month;";
1162
        sql = "CREATE TABLE IF NOT EXISTS project_stats_monthly_clean AS SELECT id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month FROM project_stats group by id, month;";
1163
        stmt.executeUpdate(sql);
1164

    
1165
        sql = "CREATE INDEX project_stats_monthly_clean_id ON project_stats_monthly_clean USING btree (id)";
1166
        stmt.executeUpdate(sql);
1167

    
1168
        sql = "CREATE INDEX project_stats_monthly_clean_month ON project_stats_monthly_clean USING btree(month)";
1169
        stmt.executeUpdate(sql);
1170

    
1171
        Calendar startCalendar = Calendar.getInstance();
1172
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
1173
        Calendar endCalendar = Calendar.getInstance();
1174
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
1175
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
1176

    
1177
        // sql="CREATE OR REPLACE view project_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth +", 1) AS offs, project_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from project_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1178
        // sql = "select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end INTO project_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, project_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from project_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1179
        sql = "CREATE TABLE IF NOT EXISTS project_stats_monthly AS select d.id, d.new_date as month, case when rdm.sum is null then 0 else rdm.sum end from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date from generate_series(0, " + diffMonth + ", 1) AS offs, project_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum from project_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id order by d.id, d.new_date";
1180
        stmt.executeUpdate(sql);
1181

    
1182
        sql = "CREATE INDEX project_stats_monthly_id ON project_stats_monthly USING btree (id)";
1183
        stmt.executeUpdate(sql);
1184

    
1185
        sql = "CREATE INDEX project_stats_monthly_month ON project_stats_monthly USING btree(month)";
1186
        stmt.executeUpdate(sql);
1187

    
1188
        stmt.close();
1189
        ConnectDB.DB_CONNECTION.commit();
1190
        ConnectDB.DB_CONNECTION.close();
1191
    }
1192

    
1193
    private String processPortalURL(String url) {
1194

    
1195
        if (url.indexOf("explore.openaire.eu") > 0) {
1196
            try {
1197
                url = URLDecoder.decode(url, "UTF-8");
1198
            } catch (Exception e) {
1199
                log.info(url);
1200
            }
1201
            if (url.indexOf("datasourceId=") > 0 && url.substring(url.indexOf("datasourceId=") + 13).length() >= 46) {
1202
                url = "datasource|" + url.substring(url.indexOf("datasourceId=") + 13, url.indexOf("datasourceId=") + 59);
1203
            } else if (url.indexOf("datasource=") > 0 && url.substring(url.indexOf("datasource=") + 11).length() >= 46) {
1204
                url = "datasource|" + url.substring(url.indexOf("datasource=") + 11, url.indexOf("datasource=") + 57);
1205
            } else if (url.indexOf("datasourceFilter=") > 0 && url.substring(url.indexOf("datasourceFilter=") + 17).length() >= 46) {
1206
                url = "datasource|" + url.substring(url.indexOf("datasourceFilter=") + 17, url.indexOf("datasourceFilter=") + 63);
1207
            } else if (url.indexOf("articleId=") > 0 && url.substring(url.indexOf("articleId=") + 10).length() >= 46) {
1208
                url = "result|" + url.substring(url.indexOf("articleId=") + 10, url.indexOf("articleId=") + 56);
1209
            } else if (url.indexOf("datasetId=") > 0 && url.substring(url.indexOf("datasetId=") + 10).length() >= 46) {
1210
                url = "result|" + url.substring(url.indexOf("datasetId=") + 10, url.indexOf("datasetId=") + 56);
1211
            } else if (url.indexOf("projectId=") > 0 && url.substring(url.indexOf("projectId=") + 10).length() >= 46 && !url.contains("oai:dnet:corda")) {
1212
                url = "project|" + url.substring(url.indexOf("projectId=") + 10, url.indexOf("projectId=") + 56);
1213
            } else if (url.indexOf("organizationId=") > 0 && url.substring(url.indexOf("organizationId=") + 15).length() >= 46) {
1214
                url = "organization|" + url.substring(url.indexOf("organizationId=") + 15, url.indexOf("organizationId=") + 61);
1215
            } else {
1216
                url = "";
1217
            }
1218
        } else {
1219
            url = "";
1220
        }
1221

    
1222
        return url;
1223
    }
1224

    
1225
    private void updateProdTables() throws SQLException, Exception {
1226
        if (ConnectDB.DB_CONNECTION.isClosed()) {
1227
            connectDB();
1228
        }
1229

    
1230
        Statement stmt = ConnectDB.DB_CONNECTION.createStatement();
1231
        ConnectDB.DB_CONNECTION.setAutoCommit(false);
1232
        String sql = "insert into piwiklog select * from piwiklogtmp;";
1233
        stmt.executeUpdate(sql);
1234
        
1235
        sql = "insert into views_stats select * from views_stats_tmp;";
1236
        stmt.executeUpdate(sql);
1237
        
1238
        sql = "insert into download_stats select * from downloads_stats_tmp;";
1239
        stmt.executeUpdate(sql);
1240
        
1241
        sql = "insert into pageviews_stats select * from pageviews_stats_stats_tmp;";
1242
        stmt.executeUpdate(sql);
1243

    
1244
        sql = "DROP TABLE IF EXISTS piwiklogtmp;";
1245
        stmt.executeUpdate(sql);
1246

    
1247
        sql = "DROP TABLE IF EXISTS views_stats_tmp;";
1248
        stmt.executeUpdate(sql);
1249
        
1250
        sql = "DROP TABLE IF EXISTS downloads_stats_tmp;";
1251
        stmt.executeUpdate(sql);
1252
        
1253
        sql = "DROP TABLE IF EXISTS pageviews_stats_stats_tmp;";
1254
        stmt.executeUpdate(sql);
1255
        
1256
        
1257
        stmt.close();
1258
        ConnectDB.DB_CONNECTION.commit();
1259
        ConnectDB.DB_CONNECTION.close();
1260

    
1261

    
1262
    }
1263

    
1264
    private ArrayList<String> listHdfsDir(String dir) throws Exception {
1265

    
1266
        FileSystem hdfs = FileSystem.get(new Configuration());
1267
        RemoteIterator<LocatedFileStatus> Files;
1268
        ArrayList<String> fileNames = new ArrayList<>();
1269

    
1270
        try {
1271
            Path exportPath = new Path(hdfs.getUri() + dir);
1272
            Files = hdfs.listFiles(exportPath, false);
1273
            while (Files.hasNext()) {
1274
                String fileName = Files.next().getPath().toString();
1275
                //log.info("Found hdfs file " + fileName);
1276
                fileNames.add(fileName);
1277
            }
1278
            //hdfs.close();
1279
        } catch (Exception e) {
1280
            log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logPath));
1281
            throw new Exception("HDFS file path with exported data does not exist :   " + logPath, e);
1282
        }
1283

    
1284
        return fileNames;
1285
    }
1286

    
1287
    private String readHDFSFile(String filename) throws Exception {
1288
        String result;
1289
        try {
1290

    
1291
            FileSystem fs = FileSystem.get(new Configuration());
1292
            //log.info("reading file : " + filename);
1293

    
1294
            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
1295

    
1296
            StringBuilder sb = new StringBuilder();
1297
            String line = br.readLine();
1298

    
1299
            while (line != null) {
1300
                if (!line.equals("[]")) {
1301
                    sb.append(line);
1302
                }
1303
                //sb.append(line);
1304
                line = br.readLine();
1305
            }
1306
            result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
1307
            if (result.equals("")) {
1308
                result = "[]";
1309
            }
1310

    
1311
            //fs.close();
1312
        } catch (Exception e) {
1313
            log.error(e);
1314
            throw new Exception(e);
1315
        }
1316

    
1317
        return result;
1318
    }
1319

    
1320
}
(3-3/5)