Project

General

Profile

« Previous | Next » 

Revision 58415

a ton of fixes. Close to running smoothly in all cases

View differences:

modules/dnet-openaire-usage-stats-export-wf/trunk/dnet-openaire-usage-stats-export/src/main/java/eu/dnetlib/usagestats/export/PiwikDownloadLogs.java
1
package eu.dnetlib.usagestats.export;
2

  
3
import org.apache.hadoop.conf.Configuration;
4
import org.apache.hadoop.fs.FSDataOutputStream;
5
import org.apache.hadoop.fs.Path;
6
import org.apache.hadoop.fs.FileSystem;
7
import org.apache.log4j.Logger;
8

  
9
import java.io.*;
10
import java.net.URL;
11
import java.net.URLConnection;
12
import java.sql.PreparedStatement;
13
import java.sql.ResultSet;
14
import java.sql.Statement;
15
import java.text.SimpleDateFormat;
16
import java.util.Date;
17
import java.util.Calendar;
18

  
19
public class PiwikDownloadLogs {
20

  
21
    private final String piwikUrl;
22
    private Date startDate;
23
    private final String tokenAuth;
24

  
25
    /*
26
       The Piwik's API method 
27
     */
28
    private final String APImethod = "?module=API&method=Live.getLastVisitsDetails";
29
    private final String format = "&format=json";
30

  
31
    private final Logger log = Logger.getLogger(this.getClass());
32

  
33
    public PiwikDownloadLogs(String piwikUrl, String tokenAuth) {
34
        this.piwikUrl = piwikUrl;
35
        this.tokenAuth = tokenAuth;
36

  
37
    }
38

  
39
    private String getPiwikLogUrl() {
40
        return "https://" + piwikUrl + "/";
41
    }
42

  
43
    private String getJson(String url) throws Exception {
44
        try {
45
            URL website = new URL(url);
46
            URLConnection connection = website.openConnection();
47

  
48
            //connection.setRequestProperty ("Authorization", "Basic "+encoded);
49
            StringBuilder response;
50
            try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
51
                response = new StringBuilder();
52
                String inputLine;
53
                while ((inputLine = in.readLine()) != null) {
54
                    response.append(inputLine);
55
                    response.append("\n");
56
                }
57
            }
58
            return response.toString();
59
        } catch (Exception e) {
60
            log.error("Failed to get URL: " + e);
61
            throw new Exception("Failed to get URL: " + e.toString(), e);
62
        }
63
    }
64

  
65
    public void GetOpenAIRELogs(String repoLogsPath, String portalLogPath, String portalMatomoID) throws Exception {
66

  
67
        Statement statement = ConnectDB.getConnection().createStatement();
68
        ResultSet rs = statement.executeQuery("SELECT distinct piwik_id from public.datasource order by piwik_id;");
69
        PreparedStatement st = ConnectDB.getConnection().prepareStatement("SELECT max(timestamp) FROM piwiklog WHERE source=? HAVING max(timestamp) is not null;");
70

  
71
        while (rs.next()) {
72
            int siteId = rs.getInt(1);
73
            st.setInt(1, siteId);
74
            ResultSet rs_date = st.executeQuery();
75

  
76
            while (rs_date.next()) {
77
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
78
                Date dateMax = sdf.parse(rs_date.getString(1));
79
                //Date dateMax = sdf.parse("2020-02-01");
80
                Calendar start = Calendar.getInstance();
81
                start.setTime(dateMax);
82
                Calendar end = Calendar.getInstance();
83
                end.add(Calendar.DAY_OF_MONTH, -1);
84

  
85
                for (Date date = start.getTime(); start.before(end); start.add(Calendar.DATE, 1), date = start.getTime()) {
86
                    log.info("Downloading logs for repoid " + siteId + " and for " + sdf.format(date));
87

  
88
                    String period = "&period=day&date=" + sdf.format(date);
89
                    String outFolder = "";
90
                    //portal siteId = 109;
91
                    if (siteId == Integer.parseInt(portalMatomoID)) {
92
                        outFolder = portalLogPath;
93
                    } else {
94
                        outFolder = repoLogsPath;
95
                    }
96
                    FileSystem fs = FileSystem.get(new Configuration());
97
                    FSDataOutputStream fin = fs.create(new Path(outFolder + "/" + siteId + "_Piwiklog" + sdf.format((date)) + ".json"), true);
98

  
99
                    String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + siteId + period + format + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth;
100
                    String content = "";
101

  
102
                    int i = 0;
103

  
104
                    while (!content.equals("[]\n")) {
105
                        String apiUrl = baseApiUrl;
106

  
107
                        if (i > 0) {
108
                            apiUrl += "&filter_offset=" + (i * 1000);
109
                        }
110

  
111
                        content = getJson(apiUrl);
112

  
113
                        fin.write(content.getBytes());
114

  
115
                        i++;
116
                    }
117
                    fin.close();
118
                }
119
            }
120
        }
121
    }
122
}
modules/dnet-openaire-usage-stats-export-wf/trunk/dnet-openaire-usage-stats-workflow/dnet-openaire-usage-stats-workflow.iml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
3
  <component name="NewModuleRootManager" LANGUAGE_LEVEL="JDK_1_8">
4
    <output url="file://$MODULE_DIR$/target/classes" />
5
    <output-test url="file://$MODULE_DIR$/target/test-classes" />
6
    <content url="file://$MODULE_DIR$">
7
      <sourceFolder url="file://$MODULE_DIR$/src/main/java" isTestSource="false" />
8
      <sourceFolder url="file://$MODULE_DIR$/src/main/resources" type="java-resource" />
9
      <excludeFolder url="file://$MODULE_DIR$/target" />
10
    </content>
11
    <orderEntry type="inheritedJdk" />
12
    <orderEntry type="sourceFolder" forTests="false" />
13
    <orderEntry type="library" name="Maven: eu.dnetlib:icm-iis-core:1.0.2" level="project" />
14
    <orderEntry type="library" name="Maven: eu.dnetlib:icm-iis-3rdparty-avrojsoncoders:1.0.3" level="project" />
15
    <orderEntry type="library" name="Maven: org.apache.commons:commons-lang3:3.1" level="project" />
16
    <orderEntry type="library" name="Maven: commons-cli:commons-cli:1.2" level="project" />
17
    <orderEntry type="library" name="Maven: org.apache.avro:avro:1.7.4" level="project" />
18
    <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-core-asl:1.8.8" level="project" />
19
    <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-mapper-asl:1.8.8" level="project" />
20
    <orderEntry type="library" name="Maven: com.thoughtworks.paranamer:paranamer:2.3" level="project" />
21
    <orderEntry type="library" name="Maven: org.xerial.snappy:snappy-java:1.0.4.1" level="project" />
22
    <orderEntry type="library" name="Maven: org.apache.commons:commons-compress:1.4.1" level="project" />
23
    <orderEntry type="library" name="Maven: org.tukaani:xz:1.0" level="project" />
24
    <orderEntry type="library" name="Maven: org.slf4j:slf4j-api:1.6.4" level="project" />
25
    <orderEntry type="library" name="Maven: org.apache.avro:avro-mapred:hadoop2:1.7.4" level="project" />
26
    <orderEntry type="library" name="Maven: org.apache.avro:avro-ipc:1.7.4" level="project" />
27
    <orderEntry type="library" name="Maven: io.netty:netty:3.4.0.Final" level="project" />
28
    <orderEntry type="library" name="Maven: org.apache.velocity:velocity:1.7" level="project" />
29
    <orderEntry type="library" name="Maven: org.mortbay.jetty:servlet-api:2.5-20081211" level="project" />
30
    <orderEntry type="library" name="Maven: org.apache.avro:avro-ipc:tests:1.7.4" level="project" />
31
    <orderEntry type="library" name="Maven: com.google.code.gson:gson:2.2.4" level="project" />
32
    <orderEntry type="module" module-name="dnet-openaire-usage-stats" />
33
    <orderEntry type="library" name="Maven: com.googlecode.json-simple:json-simple:1.1.1" level="project" />
34
    <orderEntry type="library" name="Maven: junit:junit:4.9" level="project" />
35
    <orderEntry type="library" name="Maven: org.hamcrest:hamcrest-core:1.1" level="project" />
36
    <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-common:2.0.0-cdh4.7.0" level="project" />
37
    <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-annotations:2.0.0-cdh4.7.0" level="project" />
38
    <orderEntry type="library" name="Maven: com.google.guava:guava:11.0.2" level="project" />
39
    <orderEntry type="library" name="Maven: org.apache.commons:commons-math:2.1" level="project" />
40
    <orderEntry type="library" name="Maven: xmlenc:xmlenc:0.52" level="project" />
41
    <orderEntry type="library" name="Maven: commons-httpclient:commons-httpclient:3.1" level="project" />
42
    <orderEntry type="library" name="Maven: commons-codec:commons-codec:1.4" level="project" />
43
    <orderEntry type="library" name="Maven: commons-io:commons-io:2.1" level="project" />
44
    <orderEntry type="library" name="Maven: commons-net:commons-net:3.1" level="project" />
45
    <orderEntry type="library" name="Maven: javax.servlet:servlet-api:2.5" level="project" />
46
    <orderEntry type="library" name="Maven: org.mortbay.jetty:jetty:6.1.26.cloudera.2" level="project" />
47
    <orderEntry type="library" name="Maven: org.mortbay.jetty:jetty-util:6.1.26.cloudera.2" level="project" />
48
    <orderEntry type="library" name="Maven: com.sun.jersey:jersey-core:1.8" level="project" />
49
    <orderEntry type="library" name="Maven: com.sun.jersey:jersey-json:1.8" level="project" />
50
    <orderEntry type="library" name="Maven: org.codehaus.jettison:jettison:1.1" level="project" />
51
    <orderEntry type="library" name="Maven: stax:stax-api:1.0.1" level="project" />
52
    <orderEntry type="library" name="Maven: com.sun.xml.bind:jaxb-impl:2.2.3-1" level="project" />
53
    <orderEntry type="library" name="Maven: javax.xml.bind:jaxb-api:2.2.2" level="project" />
54
    <orderEntry type="library" name="Maven: javax.activation:activation:1.1" level="project" />
55
    <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-jaxrs:1.7.1" level="project" />
56
    <orderEntry type="library" name="Maven: org.codehaus.jackson:jackson-xc:1.7.1" level="project" />
57
    <orderEntry type="library" name="Maven: com.sun.jersey:jersey-server:1.8" level="project" />
58
    <orderEntry type="library" name="Maven: asm:asm:3.1" level="project" />
59
    <orderEntry type="library" scope="RUNTIME" name="Maven: tomcat:jasper-compiler:5.5.23" level="project" />
60
    <orderEntry type="library" scope="RUNTIME" name="Maven: tomcat:jasper-runtime:5.5.23" level="project" />
61
    <orderEntry type="library" scope="RUNTIME" name="Maven: javax.servlet.jsp:jsp-api:2.1" level="project" />
62
    <orderEntry type="library" scope="RUNTIME" name="Maven: commons-el:commons-el:1.0" level="project" />
63
    <orderEntry type="library" name="Maven: commons-logging:commons-logging:1.1.1" level="project" />
64
    <orderEntry type="library" name="Maven: org.apache.hadoop:cloudera-jets3t:2.0.0-cdh4.7.0" level="project" />
65
    <orderEntry type="library" name="Maven: net.java.dev.jets3t:jets3t:0.6.1" level="project" />
66
    <orderEntry type="library" name="Maven: commons-lang:commons-lang:2.5" level="project" />
67
    <orderEntry type="library" name="Maven: commons-configuration:commons-configuration:1.6" level="project" />
68
    <orderEntry type="library" name="Maven: commons-collections:commons-collections:3.2.1" level="project" />
69
    <orderEntry type="library" name="Maven: commons-digester:commons-digester:1.8" level="project" />
70
    <orderEntry type="library" name="Maven: commons-beanutils:commons-beanutils:1.7.0" level="project" />
71
    <orderEntry type="library" name="Maven: commons-beanutils:commons-beanutils-core:1.8.0" level="project" />
72
    <orderEntry type="library" scope="RUNTIME" name="Maven: org.slf4j:slf4j-log4j12:1.6.1" level="project" />
73
    <orderEntry type="library" name="Maven: org.mockito:mockito-all:1.8.5" level="project" />
74
    <orderEntry type="library" name="Maven: net.sf.kosmosfs:kfs:0.3" level="project" />
75
    <orderEntry type="library" name="Maven: com.google.protobuf:protobuf-java:2.4.0a" level="project" />
76
    <orderEntry type="library" name="Maven: org.apache.hadoop:hadoop-auth:2.0.0-cdh4.7.0" level="project" />
77
    <orderEntry type="library" name="Maven: com.jcraft:jsch:0.1.42" level="project" />
78
    <orderEntry type="library" name="Maven: org.apache.zookeeper:zookeeper:3.4.5-cdh4.7.0" level="project" />
79
    <orderEntry type="library" name="Maven: jline:jline:0.9.94" level="project" />
80
    <orderEntry type="library" name="Maven: org.json:json:20180130" level="project" />
81
    <orderEntry type="library" name="Maven: eu.dnetlib:icm-iis-assembly-resources:1.0.1" level="project" />
82
    <orderEntry type="library" name="Maven: log4j:log4j:1.2.17" level="project" />
83
  </component>
84
</module>
modules/dnet-openaire-usage-stats-export-wf/trunk/dnet-openaire-usage-stats-export/src/main/java/eu/dnetlib/usagestats/export/IrusStats.java
1
package eu.dnetlib.usagestats.export;
2
/**
3
 *
4
 * @author dpie
5
 */
6
import java.io.*;
7
//import java.io.BufferedReader;
8
//import java.io.InputStreamReader;
9
import java.net.URL;
10
import java.net.URLConnection;
11
import java.sql.ResultSet;
12
import java.text.SimpleDateFormat;
13
import java.util.Date;
14
import java.util.Calendar;
15

  
16
import java.sql.Connection;
17
import java.sql.PreparedStatement;
18
import java.sql.Statement;
19

  
20
import org.json.simple.JSONArray;
21
import org.json.simple.JSONObject;
22
import org.json.simple.parser.JSONParser;
23

  
24
import org.apache.log4j.Logger;
25

  
26
/**
27
 * Created by dpie on 20/01/2020.
28
 */
29
public class IrusStats {
30

  
31
    private String irusUKURL;
32

  
33
//    private Connection conn = null;
34
//    private Statement stmt = null;
35

  
36
    private final Logger log = Logger.getLogger(this.getClass());
37

  
38
    public IrusStats(String irusUKURL) throws Exception {
39
        this.irusUKURL = irusUKURL;
40
        createTables();
41
        createTmpTables();
42
    }
43

  
44
       private void createTables() throws Exception {
45
        try {
46

  
47
            Statement  stmt = ConnectDB.getConnection().createStatement();
48
            String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilog(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
49
            stmt.executeUpdate(sqlCreateTableSushiLog);
50
            String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
51
                    + " ON INSERT TO sushilog "
52
                    + " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository,"
53
                    + "sushilog.rid, sushilog.date "
54
                    + "FROM sushilog "
55
                    + "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
56
            stmt.executeUpdate(sqlcreateRuleSushiLog);
57
            String createSushiIndex = "create index if not exists sushilog_duplicates on sushilog(source, repository, rid, date, metric_type);";
58
            stmt.executeUpdate(createSushiIndex);
59

  
60
            stmt.close();
61
            ConnectDB.getConnection().close();
62
            log.info("Sushi Tables Created");
63
        } catch (Exception e) {
64
            log.error("Failed to create tables: " + e);
65
            throw new Exception("Failed to create tables: " + e.toString(), e);
66
        }
67
    }
68

  
69
    private void createTmpTables() throws Exception {
70
        try {
71

  
72
            Statement stmt = ConnectDB.getConnection().createStatement();
73
            String sqlCreateTableSushiLog = "CREATE TABLE IF NOT EXISTS sushilogtmp(source TEXT, repository TEXT, rid TEXT, date TEXT, metric_type TEXT, count INT, PRIMARY KEY(source, repository, rid, date, metric_type));";
74
            stmt.executeUpdate(sqlCreateTableSushiLog);
75

  
76
            //stmt.executeUpdate("CREATE TABLE IF NOT EXISTS public.sushilog AS TABLE sushilog;");
77
            //String sqlCopyPublicSushiLog = "INSERT INTO sushilog SELECT * FROM public.sushilog;";
78
            //stmt.executeUpdate(sqlCopyPublicSushiLog);
79
            String sqlcreateRuleSushiLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
80
                    + " ON INSERT TO sushilogtmp "
81
                    + " WHERE (EXISTS ( SELECT sushilog.source, sushilog.repository,"
82
                    + "sushilog.rid, sushilog.date "
83
                    + "FROM sushilog "
84
                    + "WHERE sushilog.source = new.source AND sushilog.repository = new.repository AND sushilog.rid = new.rid AND sushilog.date = new.date AND sushilog.metric_type = new.metric_type)) DO INSTEAD NOTHING;";
85
            stmt.executeUpdate(sqlcreateRuleSushiLog);
86

  
87
            stmt.close();
88
            ConnectDB.getConnection().close();
89
            log.info("Sushi Tmp Tables Created");
90
        } catch (Exception e) {
91
            log.error("Failed to create tables: " + e);
92
            throw new Exception("Failed to create tables: " + e.toString(), e);
93
        }
94
    }
95

  
96
    public void irusStats() throws Exception {
97
        Statement stmt = ConnectDB.getConnection().createStatement();
98
        ConnectDB.getConnection().setAutoCommit(false);
99

  
100
        //String sql = "INSERT INTO sushi_result_downloads SELECT s.source, d.id AS repository, ro.id, s.date, s.count FROM sushilog s, datasource_oids d, result_oids ro WHERE s.repository=d.orid AND s.oai=ro.orid AND metric_type='ft_total'";
101
        //String sql = "SELECT s.source, d.id AS repository_id, ro.id as result_id, extract('year' from s.date::date) ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0') as date, s.count INTO downloads_stats FROM sushilog s, datasource_oids d, result_oids ro WHERE s.repository=d.orid AND s.oai=ro.orid AND metric_type='ft_total'";
102
        //String sql = "INSERT INTO downloads_stats SELECT s.source, d.id AS repository_id, ro.id as result_id, extract('year' from s.date::date) ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0') as date, s.count FROM sushilog s, datasource_oids d, result_oids ro WHERE s.repository=d.orid AND s.oai=ro.orid AND metric_type='ft_total';";
103
        String sql = "INSERT INTO downloads_stats SELECT s.source, d.id AS repository_id, ro.id as result_id, extract('year' from s.date::date) ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0') as date, s.count, '0' FROM sushilogtmp s, public.datasource_oids d, public.result_oids ro WHERE s.repository=d.orid AND s.rid=ro.orid AND metric_type='ft_total' AND s.source='IRUS-UK';";
104
        stmt.executeUpdate(sql);
105

  
106
        sql = "Insert into sushilog select * from sushilogtmp;";
107
        stmt.executeUpdate(sql);
108
        
109
        sql = "drop table sushilogtmp;";
110
        stmt.executeUpdate(sql);
111
        
112
        ConnectDB.getConnection().commit();
113
        ConnectDB.getConnection().close();
114
    }
115

  
116
    public void processIrusRRReport() throws Exception {
117
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
118
        //String reportUrl = "https://irus.jisc.ac.uk" + "/api/sushilite/v1_7/GetReport/?Report=RR1&Release=4&RequestorID=OpenAIRE&BeginDate=2016-01&EndDate=" + simpleDateFormat.format(new Date()) + "&RepositoryIdentifier=&ItemDataType=&NewJiscBand=&Granularity=Monthly&Callback=";
119
        String reportUrl = irusUKURL+"GetReport/?Report=RR1&Release=4&RequestorID=OpenAIRE&BeginDate=2016-01&EndDate=" + simpleDateFormat.format(new Date()) + "&RepositoryIdentifier=&ItemDataType=&NewJiscBand=&Granularity=Monthly&Callback=";
120

  
121
        log.info("Getting Irus report: " + reportUrl);
122

  
123
        String text = getJson(reportUrl, "", "");
124

  
125
        log.info("Report: " + text);
126

  
127
        JSONParser parser = new JSONParser();
128
        JSONObject jsonObject = (JSONObject) parser.parse(text);
129
        jsonObject = (JSONObject) jsonObject.get("ReportResponse");
130
        jsonObject = (JSONObject) jsonObject.get("Report");
131
        jsonObject = (JSONObject) jsonObject.get("Report");
132
        jsonObject = (JSONObject) jsonObject.get("Customer");
133
        JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems");
134
        int i = 0;
135
        for (Object aJsonArray : jsonArray) {
136
            JSONObject jsonObjectRow = (JSONObject) aJsonArray;
137
            JSONArray itemIdentifier = (JSONArray) jsonObjectRow.get("ItemIdentifier");
138
            for (Object identifier : itemIdentifier) {
139
                JSONObject opendoar = (JSONObject) identifier;
140
                if (opendoar.get("Type").toString().equals("OpenDOAR")) {
141
                    //System.out.println(i + ": " + opendoar.get("Value").toString());
142
                    log.info(i + ": " + opendoar.get("Value").toString());
143
                    i++;
144
                    processIrusIRReport(opendoar.get("Value").toString());
145
                    break;
146
                }
147
            }
148
            //break;
149
        }
150
    }
151

  
152
    private void processIrusIRReport(String opendoar) throws Exception {
153
        System.out.println(opendoar);
154
        ConnectDB.getConnection().setAutoCommit(false);
155

  
156
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
157

  
158
        Calendar start = Calendar.getInstance();
159
        start.set(Calendar.YEAR, 2016);
160
        start.set(Calendar.MONTH, Calendar.JANUARY);
161
        //start.setTime(simpleDateFormat.parse("2016-01"));
162

  
163
        Calendar end = Calendar.getInstance();
164
        end.add(Calendar.DAY_OF_MONTH, -1);
165

  
166
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
167
        PreparedStatement st = ConnectDB.getConnection().prepareStatement("SELECT max(date) FROM sushilog WHERE repository=?;");
168
        st.setString(1, "opendoar____::" + opendoar);
169
        ResultSet rs_date = st.executeQuery();
170
        while (rs_date.next()) {
171
            if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") && !rs_date.getString(1).equals("")) {
172
                start.setTime(sdf.parse(rs_date.getString(1)));
173
            }
174
        }
175
        rs_date.close();
176
        PreparedStatement preparedStatement = ConnectDB.getConnection().prepareStatement("INSERT INTO sushilogtmp (source, repository, rid, date, metric_type, count) VALUES (?,?,?,?,?,?)");
177
        int batch_size = 0;
178

  
179
        while (start.before(end)) {
180
            //log.info("date: " + simpleDateFormat.format(start.getTime()));
181
            String reportUrl = this.irusUKURL+"GetReport/?Report=IR1&Release=4&RequestorID=OpenAIRE&BeginDate=" + simpleDateFormat.format(start.getTime()) + "&EndDate=" + simpleDateFormat.format(start.getTime()) + "&RepositoryIdentifier=opendoar%3A" + opendoar + "&ItemIdentifier=&ItemDataType=&hasDOI=&Granularity=Monthly&Callback=";
182
            start.add(Calendar.MONTH, 1);
183

  
184
            String text = getJson(reportUrl, "", "");
185
            if (text == null) {
186
                continue;
187
            }
188

  
189
            JSONParser parser = new JSONParser();
190
            JSONObject jsonObject = (JSONObject) parser.parse(text);
191
            jsonObject = (JSONObject) jsonObject.get("ReportResponse");
192
            jsonObject = (JSONObject) jsonObject.get("Report");
193
            jsonObject = (JSONObject) jsonObject.get("Report");
194
            jsonObject = (JSONObject) jsonObject.get("Customer");
195
            JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems");
196
            if (jsonArray == null) {
197
                continue;
198
            }
199
            String oai = "";
200
            for (Object aJsonArray : jsonArray) {
201
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
202
                JSONArray itemIdentifier = (JSONArray) jsonObjectRow.get("ItemIdentifier");
203
                for (Object identifier : itemIdentifier) {
204
                    JSONObject oaiPmh = (JSONObject) identifier;
205
                    if (oaiPmh.get("Type").toString().equals("OAI")) {
206
                        oai = oaiPmh.get("Value").toString();
207
                        //System.out.println("OAI: " + oai);
208
                        break;
209
                    }
210
                }
211

  
212
                JSONArray itemPerformance = (JSONArray) jsonObjectRow.get("ItemPerformance");
213
                String period;
214
                String type;
215
                String count;
216
                for (Object perf : itemPerformance) {
217
                    JSONObject performance = (JSONObject) perf;
218
                    JSONObject periodObj = (JSONObject) performance.get("Period");
219
                    period = periodObj.get("Begin").toString();
220
                    JSONObject instanceObj = (JSONObject) performance.get("Instance");
221
                    type = instanceObj.get("MetricType").toString();
222
                    count = instanceObj.get("Count").toString();
223
                    //System.out.println(oai + " : " + period + " : " + count);
224

  
225
                    preparedStatement.setString(1, "IRUS-UK");
226
                    preparedStatement.setString(2, "opendoar____::" + opendoar);
227
                    preparedStatement.setString(3, oai);
228
                    preparedStatement.setString(4, period);
229
                    preparedStatement.setString(5, type);
230
                    preparedStatement.setInt(6, Integer.parseInt(count));
231
                    preparedStatement.addBatch();
232
                    batch_size++;
233
                    if (batch_size == 10000) {
234
                        preparedStatement.executeBatch();
235
                        ConnectDB.getConnection().commit();
236
                        batch_size = 0;
237
                    }
238
                }
239
                //break;
240
            }
241
            //break;
242
        }
243

  
244
        preparedStatement.executeBatch();
245
        ConnectDB.getConnection().commit();
246
        ConnectDB.getConnection().close();
247
    }
248

  
249
    public void processIrusIRReport(String opendoar, String startDate) throws Exception {
250
        ConnectDB.getConnection().setAutoCommit(false);
251

  
252
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM");
253

  
254
        Calendar start = Calendar.getInstance();
255
        start.set(Calendar.YEAR, 2016);
256
        start.set(Calendar.MONTH, Calendar.JANUARY);
257
        //start.setTime(simpleDateFormat.parse("2016-01"));
258

  
259
        Calendar end = Calendar.getInstance();
260
        end.add(Calendar.DAY_OF_MONTH, -1);
261

  
262
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
263
        start.setTime(sdf.parse(startDate));
264

  
265
        String createTablesQuery = "-- Table: shadow.sushilog" + opendoar + "\n"
266
                + "\n"
267
                + "-- DROP TABLE shadow.sushilog" + opendoar + ";\n"
268
                + "\n"
269
                + "CREATE TABLE shadow.sushilog" + opendoar + "\n"
270
                + "(\n"
271
                + "    source text COLLATE pg_catalog.\"default\" NOT NULL,\n"
272
                + "    repository text COLLATE pg_catalog.\"default\" NOT NULL,\n"
273
                + "    rid text COLLATE pg_catalog.\"default\" NOT NULL,\n"
274
                + "    date text COLLATE pg_catalog.\"default\" NOT NULL,\n"
275
                + "    metric_type text COLLATE pg_catalog.\"default\" NOT NULL,\n"
276
                + "    count integer,\n"
277
                + "    CONSTRAINT sushilog" + opendoar + "_pkey PRIMARY KEY (source, repository, rid, date, metric_type)\n"
278
                + "        USING INDEX TABLESPACE index_storage\n"
279
                + ")\n"
280
                + "\n"
281
                + "TABLESPACE pg_default;\n"
282
                + "\n"
283
                + "ALTER TABLE shadow.sushilog" + opendoar + "\n"
284
                + "    OWNER to sqoop;\n"
285
                + "\n"
286
                + "-- Rule: ignore_duplicate_inserts ON shadow.sushilog" + opendoar + "\n"
287
                + "\n"
288
                + "-- DROP Rule ignore_duplicate_inserts ON shadow.sushilog" + opendoar + ";\n"
289
                + "\n"
290
                + "CREATE OR REPLACE RULE ignore_duplicate_inserts AS\n"
291
                + "    ON INSERT TO shadow.sushilog" + opendoar + "\n"
292
                + "    WHERE (EXISTS ( SELECT sushilog" + opendoar + ".source,\n"
293
                + "            sushilog" + opendoar + ".repository,\n"
294
                + "            sushilog" + opendoar + ".rid,\n"
295
                + "            sushilog" + opendoar + ".date\n"
296
                + "           FROM sushilog" + opendoar + "\n"
297
                + "          WHERE sushilog" + opendoar + ".source = new.source AND sushilog" + opendoar + ".repository = new.repository AND sushilog" + opendoar + ".rid = new.rid AND sushilog" + opendoar + ".date = new.date AND sushilog" + opendoar + ".metric_type = new.metric_type))\n"
298
                + "    DO INSTEAD\n"
299
                + "NOTHING;";
300

  
301
        Statement stCreateTables = ConnectDB.getConnection().createStatement();
302
        stCreateTables.execute(createTablesQuery);
303
        ConnectDB.getConnection().commit();
304

  
305
        PreparedStatement preparedStatement = ConnectDB.getConnection().prepareStatement("INSERT INTO sushilog" + opendoar + " (source, repository, rid, date, metric_type, count) VALUES (?,?,?,?,?,?)");
306
        int batch_size = 0;
307

  
308
        while (start.before(end)) {
309
            //log.info("date: " + simpleDateFormat.format(start.getTime()));
310
            String reportUrl = "https://irus.jisc.ac.uk/api/sushilite/v1_7/GetReport/?Report=IR1&Release=4&RequestorID=OpenAIRE&BeginDate=" + simpleDateFormat.format(start.getTime()) + "&EndDate=2019-10-31&RepositoryIdentifier=opendoar%3A" + opendoar + "&ItemIdentifier=&ItemDataType=&hasDOI=&Granularity=Monthly&Callback=";
311
            start.add(Calendar.MONTH, 1);
312

  
313
            String text = getJson(reportUrl, "", "");
314
            if (text == null) {
315
                continue;
316
            }
317

  
318
            JSONParser parser = new JSONParser();
319
            JSONObject jsonObject = (JSONObject) parser.parse(text);
320
            jsonObject = (JSONObject) jsonObject.get("ReportResponse");
321
            jsonObject = (JSONObject) jsonObject.get("Report");
322
            jsonObject = (JSONObject) jsonObject.get("Report");
323
            jsonObject = (JSONObject) jsonObject.get("Customer");
324
            JSONArray jsonArray = (JSONArray) jsonObject.get("ReportItems");
325
            if (jsonArray == null) {
326
                continue;
327
            }
328
            String oai = "";
329
            for (Object aJsonArray : jsonArray) {
330
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
331
                JSONArray itemIdentifier = (JSONArray) jsonObjectRow.get("ItemIdentifier");
332
                for (Object identifier : itemIdentifier) {
333
                    JSONObject oaiPmh = (JSONObject) identifier;
334
                    if (oaiPmh.get("Type").toString().equals("OAI")) {
335
                        oai = oaiPmh.get("Value").toString();
336
                        //System.out.println("OAI: " + oai);
337
                        break;
338
                    }
339
                }
340

  
341
                JSONArray itemPerformance = (JSONArray) jsonObjectRow.get("ItemPerformance");
342
                String period;
343
                String type;
344
                String count;
345
                for (Object perf : itemPerformance) {
346
                    JSONObject performance = (JSONObject) perf;
347
                    JSONObject periodObj = (JSONObject) performance.get("Period");
348
                    period = periodObj.get("Begin").toString();
349
                    JSONObject instanceObj = (JSONObject) performance.get("Instance");
350
                    type = instanceObj.get("MetricType").toString();
351
                    count = instanceObj.get("Count").toString();
352
                    //System.out.println(oai + " : " + period + " : " + count);
353

  
354
                    preparedStatement.setString(1, "IRUS-UK");
355
                    preparedStatement.setString(2, "opendoar____::" + opendoar);
356
                    preparedStatement.setString(3, oai);
357
                    preparedStatement.setString(4, period);
358
                    preparedStatement.setString(5, type);
359
                    preparedStatement.setInt(6, Integer.parseInt(count));
360
                    preparedStatement.addBatch();
361
                    batch_size++;
362
                    if (batch_size == 10000) {
363
                        preparedStatement.executeBatch();
364
                        ConnectDB.getConnection().commit();
365
                        batch_size = 0;
366
                    }
367
                }
368
                //break;
369
            }
370
            //break;
371
        }
372

  
373
        preparedStatement.executeBatch();
374
        ConnectDB.getConnection().commit();
375
        ConnectDB.getConnection().close();
376
    }
377

  
378
    private String getJson(String url, String username, String password) throws Exception {
379
        //String cred=username+":"+password;
380
        //String encoded = new sun.misc.BASE64Encoder().encode (cred.getBytes());
381
        try {
382
            URL website = new URL(url);
383
            URLConnection connection = website.openConnection();
384
            //connection.setRequestProperty ("Authorization", "Basic "+encoded);
385
            StringBuilder response;
386
            try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
387
                response = new StringBuilder();
388
                String inputLine;
389
                while ((inputLine = in.readLine()) != null) {
390
                    response.append(inputLine);
391
                    response.append("\n");
392
                }
393
            }
394
            return response.toString();
395
        } catch (Exception e) {
396
            log.error("Failed to get URL", e);
397
            return null;
398
        }
399
    }
400
}
modules/dnet-openaire-usage-stats-export-wf/trunk/dnet-openaire-usage-stats-workflow/src/main/java/eu/dnetlib/iis/core/workflows/usage_stats/UsageStatsWrapper.java
1
package eu.dnetlib.iis.core.workflows.usage_stats;
2

  
3
import eu.dnetlib.iis.core.java.PortBindings;
4
import eu.dnetlib.iis.core.java.Process;
5
import eu.dnetlib.iis.core.java.porttype.PortType;
6
import eu.dnetlib.usagestats.export.UsageStatsExporter;
7
import org.apache.hadoop.conf.Configuration;
8
import org.apache.log4j.Logger;
9
import java.util.HashMap;
10
import java.util.Map;
11
import java.util.Properties;
12

  
13
public class UsageStatsWrapper implements Process {
14
    private Logger log = Logger.getLogger(this.getClass());
15
    private UsageStatsExporter usageStatsExporter;
16

  
17
    @Override
18
    public void run(PortBindings portBindings, Configuration context, Map<String, String> parameters) throws Exception {
19

  
20
        Properties properties = new Properties();
21

  
22
        for (String key:parameters.keySet()) {
23
            properties.setProperty(key, parameters.get(key));
24
        }
25
        usageStatsExporter = new UsageStatsExporter(properties);
26

  
27
//        usageStatsExporter.setUsername(parameters.get("piwik_username"));
28
//        usageStatsExporter.setPassword(parameters.get("piwik_pass"));
29
//        usageStatsExporter.setHttpProtocol(parameters.get("piwik_httpProtocol"));
30
//        usageStatsExporter.setPiwikUrl(parameters.get("piwik_url"));
31
//        usageStatsExporter.setSiteId(parameters.get("piwik_siteId"));
32
//        usageStatsExporter.setStartDate(parameters.get("piwik_startDate"));
33
//        usageStatsExporter.setStartDate(parameters.get("piwik_startDate"));
34
//        usageStatsExporter.setFinalDate(parameters.get("piwik_finalDate"));
35
//        usageStatsExporter.setLogsPath(parameters.get("piwik_logsPath"));
36
//        usageStatsExporter.setFilterOffset(parameters.get("piwik_filterOffset"));
37
//        //usageStatsExporter.setDbSchema(parameters.get("piwik_schema"));
38
//        usageStatsExporter.setDbUrl(parameters.get("Stats_db_Url"));
39
//        usageStatsExporter.setDbUsername(parameters.get("Stats_db_User"));
40
//        usageStatsExporter.setDbPassword(parameters.get("Stats_db_Pass"));
41
//        usageStatsExporter.setTokenAuth(parameters.get("token_auth"));
42

  
43

  
44
//        log.info("Parameters: " + usageStatsExporter.getDbUrl() + usageStatsExporter.getUsername() + usageStatsExporter.getDbPassword());
45
//        log.info("Parameters: " + usageStatsExporter.getDbSchema() + usageStatsExporter.getLogsPath());
46

  
47
        for (Map.Entry<String, String> e : parameters.entrySet()) {
48
            log.info(e.getKey() + ":" + e.getValue());
49
        }
50

  
51
        try {
52
            usageStatsExporter.export();
53
            log.info("Usage Stats Exported!...");
54
        } catch (Exception e){
55
            log.error("Usage Stats Export failed: ", e);
56
            throw new Exception("Usage Stats Export failed: " + e.toString(), e);
57
        }
58

  
59
    }
60

  
61

  
62
    @Override
63
    public Map<String, PortType> getInputPorts() {
64
        return createInputPorts();
65

  
66
    }
67

  
68
    @Override
69
    public Map<String, PortType> getOutputPorts() {
70
        return createOutputPorts();
71

  
72
    }
73

  
74
    private static HashMap<String, PortType> createInputPorts() {
75
        HashMap<String, PortType> inputPorts = new HashMap<String, PortType>();
76

  
77
        return inputPorts;
78
    }
79

  
80
    private static HashMap<String, PortType> createOutputPorts() {
81
        HashMap<String, PortType> outputPorts = new HashMap<String, PortType>();
82

  
83
        return outputPorts;
84
    }
85
}
modules/dnet-openaire-usage-stats-export-wf/trunk/dnet-openaire-usage-stats-workflow/src/main/resources/eu/dnetlib/iis/core/javamapreduce/usage_stats/job.properties
1
Stats_db_Driver=org.postgresql.Driver
2
Stats_db_Pass=sqoop
3
Stats_db_Url=jdbc:postgresql://88.197.53.70:5432/stats2
4
Stats_db_User=sqoop
5
Stats_db_Schema=usagestats
6
Stats_db_table_map=datasourceLanguage=datasource_languages,datasource=datasource,project=project,result=result,organization=organization,datasourceOrganization=datasource_organizations,datasourceTopic=datasource_topics,projectOrganization=project_organizations,resultClaim=result_claims,resultClassification=result_classifications,resultConcept=result_concepts,resultLanguage=result_languages,resultOrganization=result_organizations,resultResult=result_results,resultProject=project_results,resultResult=result_results,resultTopic=result_topics,category=category,claim=claim,concept=concept,category=category,context=context,datasourceLanguage=datasource_languages,resultLanguage=result_languages,resultDatasource=result_datasources, resultDescription=result_descriptions,datasourceStats=datasource_stats,organizationStats=organization_stats,projectStats=project_stats,resultStats=result_stats,resultOid=result_oids,projectOid=project_oids,datasourceOid=datasource_oids,organizationOid=organization_oids,resultPid=result_pids,resultCitation=result_citations
7
Stats_delim_Character=!
8
Stats_enclosing_Character=#
9
Stats_output_Path=/tmp/tstats/
10
Stats_sqoop_RecsPerStatement=10000
11
Stats_sqoop_ReducersCount=8
12
Stats_sqoop_StatementPerTrans=1000000
13
dfs.client.failover.proxy.provider.dm-cluster-nn=org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider
14
dfs.ha.namenodes.dm-cluster-nn=nn1,nn2
15
dfs.namenode.http-address.dm-cluster-nn.nn2=namenode2.hadoop.dm.openaire.eu:50070
16
dfs.namenode.rpc-address.dm-cluster-nn.nn2=namenode2.hadoop.dm.openaire.eu:8020
17
dfs.replication=2
18
fs.defaultFS=hdfs://dm-cluster-nn
19
hadoop.rpc.socket.factory.class.default=org.apache.hadoop.net.StandardSocketFactory
20
hadoop.security.auth_to_local=DEFAULT
21
hadoop.security.authentication=simple
22
hbase.rootdir=hdfs://dm-cluster-nn/hbase
23
hbase.security.authentication=simple
24
hbase.zookeeper.client.port=2181
25
hbase.zookeeper.property.clientPort=2181
26
hbase.zookeeper.quorum=namenode1.hadoop.dm.openaire.eu,namenode2.hadoop.dm.openaire.eu,jobtracker1.hadoop.dm.openaire.eu,jobtracker2.hadoop.dm.openaire.eu,hbase-master1.hadoop.dm.openaire.eu
27
mapred.client.failover.proxy.provider.dm-cluster-jt=org.apache.hadoop.mapred.ConfiguredFailoverProxyProvider
28
mapred.job.tracker=dm-cluster-jt
29
mapred.jobtrackers.dm-cluster-jt=jt1,jt2
30
mapred.mapper.new-api=true
31
mapred.reducer.new-api=true
32
oozie.service.loc=http://oozie.hadoop.dm.openaire.eu:11000/oozie
33
oozie.wf.application.path=hdfs://dm-cluster-nn/user/antonis.lempesis/core/javamapreduce/usage_stats/oozie_app
34
piwik_filterOffset=5
35
piwik_finalDate=2016-01-03
36
piwik_httpProtocol=https
37
piwik_logsPath=/tmp/tpiwikLogs_update/
38
piwik_pass=eiLae9EiyahXea9aemiesiiJ
39
piwik_siteId=5
40
piwik_startDate=2016-01-01
41
piwik_url=analytics.openaire.eu
42
piwik_username=usage_openaire
43
token_auth=32846584f571be9b57488bf4088f30ea
44
workingDir=/user/antonis.lempesis/core/javamapreduce/usage_stats/working_dir
45
zookeeper.znode.parent=/hbase
46
zookeeper.znode.rootserver=root-region-server
47
COUNTER_robots_Url=https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots_list.json
48
IRUS_UK_BaseUrl=https://irus.jisc.ac.uk/api/sushilite/v1_7/
49
matomo_AuthToken=703bd17d845acdaf795e01bb1e0895b9
50
matomo_BaseUrl=analytics.openaire.eu
51
repo_LogPath=${piwik_logsPath}/Repologs/
52
portal_LogPath=${piwik_logsPath}/Portallogs/
53
portal_MatomoID=109
modules/dnet-openaire-usage-stats-export-wf/trunk/dnet-openaire-usage-stats-workflow/src/main/resources/eu/dnetlib/iis/core/javamapreduce/usage_stats/oozie_app/workflow.xml
1
<workflow-app name="usage-stats-export"
2
              xmlns="uri:oozie:workflow:0.4">
3
    <global>
4
        <job-tracker>${jobTracker}</job-tracker>
5
        <name-node>${nameNode}</name-node>
6
        <configuration>
7
            <property>
8
                <name>mapred.job.queue.name</name>
9
                <value>${queueName}</value>
10
            </property>
11
            <property>
12
                <name>oozie.sqoop.log.level</name>
13
                <value>DEBUG</value>
14
            </property>
15
            <property>
16
                <name>oozie.launcher.mapred.job.priority</name>
17
                <value>NORMAL</value>
18
            </property>
19
            <property>
20
                <name>mapred.job.priority</name>
21
                <value>NORMAL</value>
22
            </property>
23
        </configuration>
24
    </global>
25

  
26
    <start to="exportUsageStats"/>
27

  
28
    <action name='exportUsageStats'>
29
        <java>
30
            <prepare>
31
                <delete path="${nameNode}${piwik_logsPath}"/>
32
                <mkdir path="${nameNode}${piwik_logsPath}"/>
33
                <mkdir path="${nameNode}${portal_LogPath}"/>
34
                <mkdir path="${nameNode}${repo_LogPath}"/>
35
            </prepare>
36
            <configuration>
37
                <property>
38
                    <name>mapred.job.queue.name</name>
39
                    <value>${queueName}</value>
40
                </property>
41

  
42
                <property>
43
                    <name>oozie.sqoop.log.level</name>
44
                    <value>DEBUG</value>
45
                </property>
46
            </configuration>
47

  
48
            <main-class>eu.dnetlib.iis.core.java.ProcessWrapper</main-class>
49
            <arg>-SworkingDir=${workingDir}</arg>
50
            <arg>eu.dnetlib.iis.core.workflows.usage_stats.UsageStatsWrapper</arg>
51
            <arg>-Ppiwik_username=${piwik_username}</arg>
52
            <arg>-Ppiwik_pass=${piwik_pass}</arg>
53
            <arg>-Ppiwik_httpProtocol=${piwik_httpProtocol}</arg>
54
            <arg>-Ppiwik_url=${piwik_url}</arg>
55
            <arg>-Ppiwik_siteId=${piwik_siteId}</arg>
56
            <arg>-Ppiwik_startDate=${piwik_startDate}</arg>
57
            <arg>-Ppiwik_finalDate=${piwik_finalDate}</arg>
58
            <arg>-Ppiwik_logsPath=${piwik_logsPath}</arg>
59
            <arg>-Ppiwik_filterOffset=${piwik_filterOffset}</arg>
60
            <arg>-PStats_db_Url=${Stats_db_Url}</arg>
61
            <arg>-PStats_db_User=${Stats_db_User}</arg>
62
            <arg>-PStats_db_Pass=${Stats_db_Pass}</arg>
63
            <arg>-PStats_db_Schema=${Stats_db_Schema}</arg>
64
            <arg>-PStats_db_Driver=${Stats_db_Driver}</arg>
65
            <arg>-Ptoken_auth=${token_auth}</arg>
66
            <arg>-PCOUNTER_robots_Url=${COUNTER_robots_Url}</arg>
67
            <arg>-PIRUS_UK_BaseUrl=${IRUS_UK_BaseUrl}</arg>
68
            <arg>-Pmatomo_AuthToken=${matomo_AuthToken}</arg>
69
            <arg>-Pmatomo_BaseUrl=${matomo_BaseUrl}</arg>
70
            <arg>-Prepo_LogPath=${repo_LogPath}</arg>
71
            <arg>-Pportal_LogPath=${portal_LogPath}</arg>
72
            <arg>-Pportal_MatomoID=${portal_MatomoID}</arg>
73
        </java>
74

  
75
        <ok to="cleanUpHDFS"/>
76
        <error to="fail"/>
77
    </action>
78
    <action name="cleanUpHDFS">
79
        <fs>
80
            <delete path="${nameNode}${piwik_logsPath}"/>
81
        </fs>
82
        <ok to="end"/>
83
        <error to="fail"/>
84
    </action>
85
    <kill name="fail">
86
        <message>
87
            Unfortunately, the process failed -- error message: [${wf:errorMessage(wf:lastErrorNode())}]
88
        </message>
89
    </kill>
90
    <end name="end"/>
91
</workflow-app>
modules/dnet-openaire-usage-stats-export-wf/trunk/dnet-openaire-usage-stats-export/src/main/java/eu/dnetlib/usagestats/export/PiwikStatsDB.java
1
package eu.dnetlib.usagestats.export;
2

  
3
import java.io.*;
4
import java.net.URLDecoder;
5
import java.sql.Connection;
6
import java.sql.PreparedStatement;
7
import java.sql.SQLException;
8
import java.sql.Statement;
9
import java.sql.Timestamp;
10
import java.text.SimpleDateFormat;
11
import java.util.*;
12
import java.util.regex.Matcher;
13
import java.util.regex.Pattern;
14
import java.util.stream.Stream;
15

  
16
import org.apache.hadoop.conf.Configuration;
17
import org.apache.hadoop.fs.LocatedFileStatus;
18
import org.apache.hadoop.fs.Path;
19
import org.apache.hadoop.fs.FileSystem;
20
import org.apache.hadoop.fs.RemoteIterator;
21
import org.apache.log4j.Logger;
22
import org.json.simple.JSONArray;
23
import org.json.simple.JSONObject;
24
import org.json.simple.parser.JSONParser;
25

  
26
public class PiwikStatsDB {
27

  
28
    private String logPath;
29
    private String logRepoPath;
30
    private String logPortalPath;
31

  
32
    private Statement stmt = null;
33

  
34
    private final Logger log = Logger.getLogger(this.getClass());
35
    private String CounterRobotsURL;
36
    private ArrayList robotsList;
37

  
38

  
39
    public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception {
40
        this.logRepoPath = logRepoPath;
41
        this.logPortalPath = logPortalPath;
42
        this.createTables();
43
        this.createTmpTables();
44
    }
45

  
46
    public void foo() {
47
        Stream<String> s = Arrays.stream(new String[] {"a", "b", "c", "d"});
48

  
49
        System.out.println(s.parallel().count());
50
    }
51

  
52
    public ArrayList getRobotsList() {
53
        return robotsList;
54
    }
55

  
56
    public void setRobotsList(ArrayList robotsList) {
57
        this.robotsList = robotsList;
58
    }
59

  
60
    public String getCounterRobotsURL() {
61
        return CounterRobotsURL;
62
    }
63

  
64
    public void setCounterRobotsURL(String CounterRobotsURL) {
65
        this.CounterRobotsURL = CounterRobotsURL;
66
    }
67

  
68
    private void createTables() throws Exception {
69
        try {
70
            stmt = ConnectDB.getConnection().createStatement();
71
            String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklog(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
72
            String sqlcreateRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
73
                    + " ON INSERT TO piwiklog "
74
                    + " WHERE (EXISTS ( SELECT piwiklog.source, piwiklog.id_visit,"
75
                    + "piwiklog.action, piwiklog.\"timestamp\", piwiklog.entity_id "
76
                    + "FROM piwiklog "
77
                    + "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
78
            String sqlCreateRuleIndexPiwikLog = "create index if not exists piwiklog_rule on piwiklog(source, id_visit, action, entity_id, \"timestamp\");";
79
            stmt.executeUpdate(sqlCreateTablePiwikLog);
80
            stmt.executeUpdate(sqlcreateRulePiwikLog);
81
            stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
82

  
83
            String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
84
            String sqlcreateRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
85
                    + " ON INSERT TO process_portal_log "
86
                    + " WHERE (EXISTS ( SELECT process_portal_log.source, process_portal_log.id_visit,"
87
                    + "process_portal_log.\"timestamp\" "
88
                    + "FROM process_portal_log "
89
                    + "WHERE process_portal_log.source = new.source AND process_portal_log.id_visit = new.id_visit AND process_portal_log.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
90
            String sqlCreateRuleIndexPortalLog = "create index if not exists process_portal_log_rule on process_portal_log(source, id_visit, \"timestamp\");";
91
            stmt.executeUpdate(sqlCreateTablePortalLog);
92
            stmt.executeUpdate(sqlcreateRulePortalLog);
93
            stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
94

  
95
            stmt.close();
96
            ConnectDB.getConnection().close();
97
            log.info("Usage Tables Created");
98

  
99
        } catch (Exception e) {
100
            log.error("Failed to create tables: " + e);
101
            throw new Exception("Failed to create tables: " + e.toString(), e);
102
        }
103
    }
104

  
105
    private void createTmpTables() throws Exception {
106
        try {
107
            Statement stmt = ConnectDB.getConnection().createStatement();
108
            String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklogtmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
109
            String sqlcreateTmpRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
110
                    + " ON INSERT TO piwiklogtmp "
111
                    + " WHERE (EXISTS ( SELECT piwiklogtmp.source, piwiklogtmp.id_visit,"
112
                    + "piwiklogtmp.action, piwiklogtmp.\"timestamp\", piwiklogtmp.entity_id "
113
                    + "FROM piwiklogtmp "
114
                    + "WHERE piwiklogtmp.source = new.source AND piwiklogtmp.id_visit = new.id_visit AND piwiklogtmp.action = new.action AND piwiklogtmp.entity_id = new.entity_id AND piwiklogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
115
            stmt.executeUpdate(sqlCreateTmpTablePiwikLog);
116
            stmt.executeUpdate(sqlcreateTmpRulePiwikLog);
117

  
118
            //String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;";
119
            //stmt.executeUpdate(sqlCopyPublicPiwiklog);
120
            String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
121
            String sqlcreateTmpRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
122
                    + " ON INSERT TO process_portal_log_tmp "
123
                    + " WHERE (EXISTS ( SELECT process_portal_log_tmp.source, process_portal_log_tmp.id_visit,"
124
                    + "process_portal_log_tmp.\"timestamp\" "
125
                    + "FROM process_portal_log_tmp "
126
                    + "WHERE process_portal_log_tmp.source = new.source AND process_portal_log_tmp.id_visit = new.id_visit AND process_portal_log_tmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
127
            stmt.executeUpdate(sqlCreateTmpTablePortalLog);
128
            stmt.executeUpdate(sqlcreateTmpRulePortalLog);
129

  
130
            stmt.close();
131
            log.info("Usage Tmp Tables Created");
132

  
133
        } catch (Exception e) {
134
            log.error("Failed to create tmptables: " + e);
135
            throw new Exception("Failed to create tmp tables: " + e.toString(), e);
136
            //System.exit(0);
137
        }
138
    }
139

  
140
    public void processLogs() throws Exception {
141
        try {
142
            ReadCounterRobotsList counterRobots = new ReadCounterRobotsList(this.getCounterRobotsURL());
143
            this.robotsList = counterRobots.getRobotsPatterns();
144

  
145
            processRepositoryLog();
146
            log.info("repository process done");
147
            removeDoubleClicks();
148
            log.info("removing double clicks done");
149
            cleanOAI();
150
            log.info("cleaning oai done");
151

  
152
            viewsStats();
153
            downloadsStats();
154

  
155
            processPortalLog();
156
            log.info("portal process done");
157
            
158
            portalStats();
159
            log.info("portal usagestats done");
160

  
161
            updateProdTables();
162
            log.info("updateProdTables done");
163

  
164
        } catch (Exception e) {
165
            log.error("Failed to process logs: " + e);
166
            throw new Exception("Failed to process logs: " + e.toString(), e);
167
        }
168
    }
169

  
170
//    public void usageStats() throws Exception {
171
//        try {
172
//            viewsStats();
173
//            downloadsStats();
174
//            log.info("stat tables and views done");
175
//        } catch (Exception e) {
176
//            log.error("Failed to create usage usagestats: " + e);
177
//            throw new Exception("Failed to create usage usagestats: " + e.toString(), e);
178
//        }
179
//    }
180

  
181
    public void processRepositoryLog() throws Exception {
182
        Statement stmt = ConnectDB.getConnection().createStatement();
183
        ConnectDB.getConnection().setAutoCommit(false);
184

  
185
        ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
186
//        File dir = new File(this.logRepoPath);
187
//        File[] jsonFiles = dir.listFiles();
188

  
189
        PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO piwiklogtmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
190
        int batch_size = 0;
191
        JSONParser parser = new JSONParser();
192
        for (String jsonFile : jsonFiles) {
193
            System.out.println(jsonFile);
194
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
195
            for (Object aJsonArray : jsonArray) {
196
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
197
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
198
                String idVisit = jsonObjectRow.get("idVisit").toString();
199
                String country = jsonObjectRow.get("country").toString();
200
                String referrerName = jsonObjectRow.get("referrerName").toString();
201
                String agent = jsonObjectRow.get("browser").toString();
202
                boolean botFound = false;
203
                Iterator it = robotsList.iterator();
204
                while (it.hasNext()) {
205
                    // Create a Pattern object
206
                    Pattern r = Pattern.compile(it.next().toString());
207
                    // Now create matcher object.
208
                    Matcher m = r.matcher(agent);
209
                    if (m.find()) {
210
                        //System.out.println("Found value: " + m.group(0));
211
                        botFound = true;
212
                        break;
213
                    }
214
                }
215
                if (botFound == false) {
216
                    String sourceItemType = "repItem";
217

  
218
                    JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
219
                    for (Object actionDetail : actionDetails) {
220
                        JSONObject actionDetailsObj = (JSONObject) actionDetail;
221

  
222
                        if (actionDetailsObj.get("customVariables") != null) {
223
                            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
224
                            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
225
                            Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
226
                            String url = actionDetailsObj.get("url").toString();
227
                            String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1")).get("customVariablePageValue1").toString();
228
                            String action = actionDetailsObj.get("type").toString();
229

  
230
                            prepStatem.setInt(1, idSite);
231
                            prepStatem.setString(2, idVisit);
232
                            prepStatem.setString(3, country);
233
                            prepStatem.setString(4, action);
234
                            prepStatem.setString(5, url);
235
                            prepStatem.setString(6, oaipmh);
236
                            prepStatem.setString(7, sourceItemType);
237
                            prepStatem.setString(8, simpleDateFormat.format(timestamp));
238
                            prepStatem.setString(9, referrerName);
239
                            prepStatem.setString(10, agent);
240
                            prepStatem.addBatch();
241
                            batch_size++;
242
                            if (batch_size == 10000) {
243
                                prepStatem.executeBatch();
244
                                ConnectDB.getConnection().commit();
245
                                batch_size = 0;
246
                            }
247
                        }
248
                    }
249
                }
250
            }
251
        }
252
        prepStatem.executeBatch();
253
        ConnectDB.getConnection().commit();
254
        stmt.close();
255
    }
256

  
257
    public void removeDoubleClicks() throws Exception {
258
        Statement stmt = ConnectDB.getConnection().createStatement();
259
        ConnectDB.getConnection().setAutoCommit(false);
260

  
261
        //clean download double clicks
262
        String sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
263
        stmt.executeUpdate(sql);
264
        stmt.close();
265
        ConnectDB.getConnection().commit();
266

  
267
        stmt = ConnectDB.getConnection().createStatement();
268

  
269
        //clean view double clicks
270
        sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
271
        stmt.executeUpdate(sql);
272
        stmt.close();
273
        ConnectDB.getConnection().commit();
274
    }
275

  
276
    public void viewsStats() throws Exception {
277
        Statement stmt = ConnectDB.getConnection().createStatement();
278
        ConnectDB.getConnection().setAutoCommit(false);
279

  
280
        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
281
        String sql = "CREATE OR REPLACE VIEW result_views_monthly_tmp AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
282
        stmt.executeUpdate(sql);
283

  
284
        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
285
        sql = "CREATE TABLE IF NOT EXISTS views_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly_tmp p, public.datasource d, public.result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
286
        stmt.executeUpdate(sql);
287

  
288
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
289
        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly_tmp p, public.datasource d, public.result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
290
        stmt.executeUpdate(sql);
291

  
292
        sql = "DROP VIEW IF EXISTS result_views_monthly_tmp;";
293
        stmt.executeUpdate(sql);
294

  
295
        stmt.close();
296
        ConnectDB.getConnection().commit();
297
        ConnectDB.getConnection().close();
298
    }
299

  
300
//    public void viewsStats(String piwikid) throws Exception {
301
//        stmt = ConnectDB.getConnection().createStatement();
302
//        ConnectDB.getConnection().setAutoCommit(false);
303
//
304
//        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
305
//        String sql = "CREATE OR REPLACE VIEW result_views_monthly" + piwikid + " AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog" + piwikid + " where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
306
//        stmt.executeUpdate(sql);
307
//
308
//        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
309
//        sql = "CREATE TABLE IF NOT EXISTS views_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source!='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
310
//        stmt.executeUpdate(sql);
311
//
312
////        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
313
//        sql = "CREATE TABLE IF NOT EXISTS pageviews_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
314
//        stmt.executeUpdate(sql);
315
//
316
//        sql = "DROP VIEW IF EXISTS result_views_monthly" + piwikid + ";";
317
//        stmt.executeUpdate(sql);
318
//
319
//        stmt.close();
320
//        ConnectDB.getConnection().commit();
321
//        ConnectDB.getConnection().close();
322
//    }
323

  
324
     private void downloadsStats() throws Exception {
325
        Statement stmt = ConnectDB.getConnection().createStatement();
326
        ConnectDB.getConnection().setAutoCommit(false);
327

  
328
        //String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
329
        String sql = "CREATE OR REPLACE VIEW result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
330
        stmt.executeUpdate(sql);
331

  
332
        //sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
333
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
334
        sql = "CREATE TABLE IF NOT EXISTS downloads_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM result_downloads_monthly_tmp p, public.datasource d, public.result_oids ro where p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
335
        stmt.executeUpdate(sql);
336

  
337
        sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp;";
338
        stmt.executeUpdate(sql);
339

  
340
        stmt.close();
341
        ConnectDB.getConnection().commit();
342
        ConnectDB.getConnection().close();
343
    }
344

  
345
    public void finalizeStats() throws Exception {
346
        stmt = ConnectDB.getConnection().createStatement();
347
        ConnectDB.getConnection().setAutoCommit(false);
348

  
349
        Calendar startCalendar = Calendar.getInstance();
350
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
351
        Calendar endCalendar = Calendar.getInstance();
352
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
353
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
354

  
355
//        String sql = "SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date INTO full_dates FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
356
        String sql = "CREATE TABLE IF NOT EXISTS full_dates AS SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
357
        stmt.executeUpdate(sql);
358

  
359
        sql = "CREATE INDEX IF NOT EXISTS full_dates_full_date ON full_dates USING btree(full_date);";
360
        stmt.executeUpdate(sql);
361

  
362
        sql = "CREATE INDEX IF NOT EXISTS views_stats_source ON views_stats USING btree(source);";
363
        stmt.executeUpdate(sql);
364

  
365
        sql = "CREATE INDEX IF NOT EXISTS views_stats_repository_id ON views_stats USING btree(repository_id);";
366
        stmt.executeUpdate(sql);
367

  
368
        sql = "CREATE INDEX IF NOT EXISTS views_stats_result_id ON views_stats USING btree(result_id);";
369
        stmt.executeUpdate(sql);
370

  
371
        sql = "CREATE INDEX IF NOT EXISTS views_stats_date ON views_stats USING btree(date);";
372
        stmt.executeUpdate(sql);
373

  
374
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_source ON pageviews_stats USING btree(source);";
375
        stmt.executeUpdate(sql);
376

  
377
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_repository_id ON pageviews_stats USING btree(repository_id);";
378
        stmt.executeUpdate(sql);
379

  
380
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_result_id ON pageviews_stats USING btree(result_id);";
381
        stmt.executeUpdate(sql);
382

  
383
        sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_date ON pageviews_stats USING btree(date);";
384
        stmt.executeUpdate(sql);
385

  
386
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_source ON downloads_stats USING btree(source);";
387
        stmt.executeUpdate(sql);
388

  
389
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_repository_id ON downloads_stats USING btree(repository_id);";
390
        stmt.executeUpdate(sql);
391

  
392
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_result_id ON downloads_stats USING btree(result_id);";
393
        stmt.executeUpdate(sql);
394

  
395
        sql = "CREATE INDEX IF NOT EXISTS downloads_stats_date ON downloads_stats USING btree(date);";
396
        stmt.executeUpdate(sql);
397

  
398
        sql = "CREATE TABLE IF NOT EXISTS usage_stats AS SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
399
        stmt.executeUpdate(sql);
400

  
401
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_source ON usage_stats USING btree(source);";
402
        stmt.executeUpdate(sql);
403

  
404
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_repository_id ON usage_stats USING btree(repository_id);";
405
        stmt.executeUpdate(sql);
406

  
407
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_result_id ON usage_stats USING btree(result_id);";
408
        stmt.executeUpdate(sql);
409

  
410
        sql = "CREATE INDEX IF NOT EXISTS usage_stats_date ON usage_stats USING btree(date);";
411
        stmt.executeUpdate(sql);
412

  
413
        stmt.close();
414
        ConnectDB.getConnection().commit();
415
        ConnectDB.getConnection().close();
416
    }
417

  
418
    //Create repository Views statistics
419
    private void repositoryViewsStats() throws Exception {
420
        stmt = ConnectDB.getConnection().createStatement();
421
        ConnectDB.getConnection().setAutoCommit(false);
422

  
423
//        String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
424
        String sql = "CREATE TABLE IF NOT EXISTS repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
425
        stmt.executeUpdate(sql);
426

  
427
        sql = "CREATE INDEX repo_view_stats_id ON repo_view_stats USING btree (id)";
428
        stmt.executeUpdate(sql);
429

  
430
        sql = "CREATE INDEX repo_view_stats_date ON repo_view_stats USING btree(date)";
431
        stmt.executeUpdate(sql);
432

  
433
//        sql = "SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_view_stats_monthly_clean FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
434
        sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly_clean AS SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
435
        stmt.executeUpdate(sql);
436

  
437
        sql = "CREATE INDEX repo_view_stats_monthly_clean_id ON repo_view_stats_monthly_clean USING btree (id)";
438
        stmt.executeUpdate(sql);
439

  
440
        sql = "CREATE INDEX repo_view_stats_monthly_clean_month ON repo_view_stats_monthly_clean USING btree(month)";
441
        stmt.executeUpdate(sql);
442

  
443
        sql = "CREATE INDEX repo_view_stats_monthly_clean_source ON repo_view_stats_monthly_clean USING btree(source)";
444
        stmt.executeUpdate(sql);
445

  
446
        Calendar startCalendar = Calendar.getInstance();
447
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
448
        Calendar endCalendar = Calendar.getInstance();
449
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
450
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
451

  
452
        //sql="CREATE OR REPLACE view repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
453
//        sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_view_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
454
        sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
455
        stmt.executeUpdate(sql);
456

  
457
        sql = "CREATE INDEX repo_view_stats_monthly_id ON repo_view_stats_monthly USING btree (id)";
458
        stmt.executeUpdate(sql);
459

  
460
        sql = "CREATE INDEX repo_view_stats_monthly_month ON repo_view_stats_monthly USING btree(month)";
461
        stmt.executeUpdate(sql);
462

  
463
        sql = "CREATE INDEX repo_view_stats_monthly_source ON repo_view_stats_monthly USING btree(source)";
464
        stmt.executeUpdate(sql);
465

  
466
        sql = "CREATE OR REPLACE view repo_view_stats_monthly_sushi AS SELECT id, sum(number_of_views), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_view_stats group by id, month, source;";
467
        stmt.executeUpdate(sql);
468

  
469
        stmt.close();
470
        ConnectDB.getConnection().commit();
471
        ConnectDB.getConnection().close();
472
    }
473

  
474
    //Create repository downloads statistics
475
    private void repositoryDownloadsStats() throws Exception {
476
        stmt = ConnectDB.getConnection().createStatement();
477
        ConnectDB.getConnection().setAutoCommit(false);
478

  
479
//        String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
480
        String sql = "CREATE TABLE IF NOT EXISTS repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
481
        stmt.executeUpdate(sql);
482

  
483
        sql = "CREATE INDEX repo_download_stats_id ON repo_download_stats USING btree (id)";
484
        stmt.executeUpdate(sql);
485

  
486
        sql = "CREATE INDEX repo_download_stats_date ON repo_download_stats USING btree(date)";
487
        stmt.executeUpdate(sql);
488

  
489
//        sql = "SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_download_stats_monthly_clean FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
490
        sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly_clean AS SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
491
        stmt.executeUpdate(sql);
492

  
493
        sql = "CREATE INDEX repo_download_stats_monthly_clean_id ON repo_download_stats_monthly_clean USING btree (id)";
494
        stmt.executeUpdate(sql);
495

  
496
        sql = "CREATE INDEX repo_download_stats_monthly_clean_month ON repo_download_stats_monthly_clean USING btree(month)";
497
        stmt.executeUpdate(sql);
498

  
499
        sql = "CREATE INDEX repo_download_stats_monthly_clean_source ON repo_download_stats_monthly_clean USING btree(source)";
500
        stmt.executeUpdate(sql);
501

  
502
        Calendar startCalendar = Calendar.getInstance();
503
        startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
504
        Calendar endCalendar = Calendar.getInstance();
505
        int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
506
        int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
507

  
508
        //sql="CREATE OR REPLACE view repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
509
        // sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_download_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
510
        sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
511
        stmt.executeUpdate(sql);
512

  
513
        sql = "CREATE INDEX repo_download_stats_monthly_id ON repo_download_stats_monthly USING btree (id)";
514
        stmt.executeUpdate(sql);
515

  
516
        sql = "CREATE INDEX repo_download_stats_monthly_month ON repo_download_stats_monthly USING btree(month)";
517
        stmt.executeUpdate(sql);
518

  
519
        sql = "CREATE INDEX repo_download_stats_monthly_source ON repo_download_stats_monthly USING btree(source)";
520
        stmt.executeUpdate(sql);
521

  
522
        sql = "CREATE OR REPLACE view repo_download_stats_monthly_sushi AS SELECT id, sum(number_of_downloads), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_download_stats group by id, month, source;";
523
        stmt.executeUpdate(sql);
524

  
525
        stmt.close();
526
        ConnectDB.getConnection().commit();
527
        ConnectDB.getConnection().close();
528
    }
529

  
530
    // Import OPENAIRE Logs to DB
531
    public void processPortalLog() throws Exception {
532
        Statement stmt = ConnectDB.getConnection().createStatement();
533
        ConnectDB.getConnection().setAutoCommit(false);
534

  
535
        ArrayList<String> jsonFiles = listHdfsDir(this.logPortalPath);
536
//        File folder = new File(this.logPortalPath);
537
//        File[] jsonFiles = folder.listFiles();
538

  
539
        PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO process_portal_log_tmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
540
        int batch_size = 0;
541
        JSONParser parser = new JSONParser();
542
        for (String jsonFile : jsonFiles) {
543
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
544

  
545
            for (Object aJsonArray : jsonArray) {
546
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
547
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
548
                String idVisit = jsonObjectRow.get("idVisit").toString();
549
                String country = jsonObjectRow.get("country").toString();
550
                String referrerName = jsonObjectRow.get("referrerName").toString();
551
                String agent = jsonObjectRow.get("browser").toString();
552
                boolean botFound = false;
553
                Iterator it = robotsList.iterator();
554
                while (it.hasNext()) {
555
                    // Create a Pattern object
556
                    Pattern r = Pattern.compile(it.next().toString());
557
                    // Now create matcher object.
558
                    Matcher m = r.matcher(agent);
559
                    if (m.find()) {
560
                        botFound = true;
561
                        break;
562
                    }
563
                }
564
                if (botFound == false) {
565
                    JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
566
                    for (Object actionDetail : actionDetails) {
567
                        JSONObject actionDetailsObj = (JSONObject) actionDetail;
568

  
569
                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
570
                        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
571
                        Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
572

  
573
                        String action = actionDetailsObj.get("type").toString();
574
                        String url = actionDetailsObj.get("url").toString();
575

  
576
                        String entityID = processPortalURL(url);
577
                        String sourceItemType = "";
578

  
579
                        if (entityID.indexOf("|") > 0) {
580
                            sourceItemType = entityID.substring(0, entityID.indexOf("|"));
581
                            entityID = entityID.substring(entityID.indexOf("|") + 1);
582
                        }
583

  
584
                        prepStatem.setInt(1, idSite);
585
                        prepStatem.setString(2, idVisit);
586
                        prepStatem.setString(3, country);
587
                        prepStatem.setString(4, action);
588
                        prepStatem.setString(5, url);
589
                        prepStatem.setString(6, entityID);
590
                        prepStatem.setString(7, sourceItemType);
591
                        prepStatem.setString(8, simpleDateFormat.format(timestamp));
592
                        prepStatem.setString(9, referrerName);
593
                        prepStatem.setString(10, agent);
594

  
595
                        prepStatem.addBatch();
596
                        batch_size++;
597
                        if (batch_size == 10000) {
598
                            prepStatem.executeBatch();
599
                            ConnectDB.getConnection().commit();
600
                            batch_size = 0;
601
                        }
602
                    }
603
                }
604
            }
605
        }
606
        prepStatem.executeBatch();
607
        ConnectDB.getConnection().commit();
608

  
609
        stmt.close();
610
        ConnectDB.getConnection().close();
611
    }
612
 
613
    public void portalStats() throws SQLException {
614
        Connection con = ConnectDB.getConnection();
615
        Statement stmt = con.createStatement();
616
        con.setAutoCommit(false);
617

  
618
        String sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'oaItem\', timestamp, referrer_name, agent FROM process_portal_log_tmp, result_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
619
        stmt.executeUpdate(sql);
620
        stmt.close();
621
//        con.commit();
622

  
623
        stmt = con.createStatement();
624
        sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'datasource\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.datasource_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
625
        stmt.executeUpdate(sql);
626
        stmt.close();
627
//        con.commit();
628

  
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff