Project

General

Profile

1 45524 tsampikos.
package eu.dnetlib.usagestats.export;
2
3 58141 dimitris.p
import java.io.*;
4 45524 tsampikos.
import java.net.URLDecoder;
5
import java.sql.Connection;
6
import java.sql.PreparedStatement;
7 58141 dimitris.p
import java.sql.SQLException;
8 45524 tsampikos.
import java.sql.Statement;
9
import java.sql.Timestamp;
10
import java.text.SimpleDateFormat;
11 58141 dimitris.p
import java.util.*;
12
import java.util.regex.Matcher;
13
import java.util.regex.Pattern;
14 45524 tsampikos.
15
import org.apache.hadoop.conf.Configuration;
16
import org.apache.hadoop.fs.LocatedFileStatus;
17
import org.apache.hadoop.fs.Path;
18
import org.apache.hadoop.fs.FileSystem;
19
import org.apache.hadoop.fs.RemoteIterator;
20
import org.apache.log4j.Logger;
21
import org.json.simple.JSONArray;
22
import org.json.simple.JSONObject;
23
import org.json.simple.parser.JSONParser;
24
25 59067 dimitris.p
public class LaReferenciaStats {
26 45524 tsampikos.
27 58141 dimitris.p
    private String logRepoPath;
28
29 45524 tsampikos.
    private Statement stmt = null;
30
31 45950 tsampikos.
    private final Logger log = Logger.getLogger(this.getClass());
32 58141 dimitris.p
    private String CounterRobotsURL;
33
    private ArrayList robotsList;
34 45524 tsampikos.
35 59067 dimitris.p
    public LaReferenciaStats(String logRepoPath) throws Exception {
36 58141 dimitris.p
        this.logRepoPath = logRepoPath;
37
        this.createTables();
38
        this.createTmpTables();
39 45524 tsampikos.
    }
40
41 59067 dimitris.p
    /*
42 45950 tsampikos.
    private void connectDB() throws Exception {
43
        try {
44 58141 dimitris.p
            ConnectDB connectDB = new ConnectDB();
45 45950 tsampikos.
        } catch (Exception e) {
46
            log.error("Connect to db failed: " + e);
47
            throw new Exception("Failed to connect to db: " + e.toString(), e);
48 45524 tsampikos.
        }
49
    }
50 59067 dimitris.p
*/
51 45950 tsampikos.
    private void createTables() throws Exception {
52 45524 tsampikos.
        try {
53 59067 dimitris.p
            Statement stmt = ConnectDB.getConnection().createStatement();
54
            String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialog(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
55
            String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
56
                    + " ON INSERT TO lareferencialog "
57
                    + " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit,"
58
                    + "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id "
59
                    + "FROM lareferencialog "
60
                    + "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
61
            String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");";
62
            stmt.executeUpdate(sqlCreateTableLareferenciaLog);
63
            stmt.executeUpdate(sqlcreateRuleLaReferenciaLog);
64
            stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog);
65 45524 tsampikos.
66
            stmt.close();
67 59067 dimitris.p
            ConnectDB.getConnection().close();
68
            log.info("Lareferencia Tables Created");
69 45524 tsampikos.
70 45950 tsampikos.
        } catch (Exception e) {
71 45524 tsampikos.
            log.error("Failed to create tables: " + e);
72
            throw new Exception("Failed to create tables: " + e.toString(), e);
73
            //System.exit(0);
74
        }
75
    }
76
77 58141 dimitris.p
    private void createTmpTables() throws Exception {
78 59067 dimitris.p
79 45524 tsampikos.
        try {
80 59067 dimitris.p
            Statement stmt = ConnectDB.getConnection().createStatement();
81
            String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
82
            String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
83
                    + " ON INSERT TO lareferencialogtmp "
84
                    + " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit,"
85
                    + "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id "
86
                    + "FROM lareferencialogtmp "
87
                    + "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
88
            stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog);
89
            stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog);
90 58141 dimitris.p
91
            stmt.close();
92 59067 dimitris.p
            log.info("Lareferencia Tmp Tables Created");
93 58141 dimitris.p
94
        } catch (Exception e) {
95
            log.error("Failed to create tmptables: " + e);
96
            throw new Exception("Failed to create tmp tables: " + e.toString(), e);
97
            //System.exit(0);
98
        }
99
    }
100
101
    public void processLogs() throws Exception {
102
        try {
103
104 59067 dimitris.p
            processlaReferenciaLog();
105
            log.info("LaReferencia repository process done");
106 45950 tsampikos.
            removeDoubleClicks();
107 59067 dimitris.p
            log.info("LaReferencia removing double clicks done");
108 58141 dimitris.p
            viewsStats();
109 59067 dimitris.p
            log.info("LaReferencia views done");
110 58141 dimitris.p
            downloadsStats();
111 59067 dimitris.p
            log.info("LaReferencia downloads done");
112 58141 dimitris.p
            updateProdTables();
113 59067 dimitris.p
            log.info("LaReferencia update productions tables done");
114 58141 dimitris.p
115 45950 tsampikos.
        } catch (Exception e) {
116 45524 tsampikos.
            log.error("Failed to process logs: " + e);
117
            throw new Exception("Failed to process logs: " + e.toString(), e);
118
        }
119
    }
120
121 59067 dimitris.p
    public void processlaReferenciaLog() throws Exception {
122 58141 dimitris.p
123 59067 dimitris.p
        Statement stmt = ConnectDB.getConnection().createStatement();
124
        ConnectDB.getConnection().setAutoCommit(false);
125
        ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
126 45524 tsampikos.
127 59067 dimitris.p
        //File dir = new File(this.logRepoPath);
128
        //File[] jsonFiles = dir.listFiles();
129
        PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO lareferencialogtmp (matomoid, source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?,?)");
130
        int batch_size = 0;
131 45524 tsampikos.
132 58141 dimitris.p
        JSONParser parser = new JSONParser();
133 59067 dimitris.p
        for (String jsonFile : jsonFiles) {
134 58141 dimitris.p
            System.out.println(jsonFile);
135 59067 dimitris.p
            JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
136 45950 tsampikos.
            for (Object aJsonArray : jsonArray) {
137
                JSONObject jsonObjectRow = (JSONObject) aJsonArray;
138 45524 tsampikos.
                int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
139 45950 tsampikos.
                String idVisit = jsonObjectRow.get("idVisit").toString();
140
                String country = jsonObjectRow.get("country").toString();
141
                String referrerName = jsonObjectRow.get("referrerName").toString();
142
                String agent = jsonObjectRow.get("browser").toString();
143 59067 dimitris.p
                String sourceItemType = "repItem";
144 45524 tsampikos.
145 59067 dimitris.p
                JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
146
                for (Object actionDetail : actionDetails) {
147
                    JSONObject actionDetailsObj = (JSONObject) actionDetail;
148 45524 tsampikos.
149 59067 dimitris.p
                    if (actionDetailsObj.get("customVariables") != null) {
150 58141 dimitris.p
                        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
151
                        simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
152
                        Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
153 59067 dimitris.p
                        String url = actionDetailsObj.get("url").toString();
154
                        String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1")).get("customVariablePageValue1").toString();
155
                        String opendoar = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("2")).get("customVariablePageValue2").toString();
156 58141 dimitris.p
                        String action = actionDetailsObj.get("type").toString();
157
                        prepStatem.setInt(1, idSite);
158 59067 dimitris.p
                        prepStatem.setString(2, "opendoar____::" + opendoar);
159
                        prepStatem.setString(3, idVisit);
160
                        prepStatem.setString(4, country);
161
                        prepStatem.setString(5, action);
162
                        prepStatem.setString(6, url);
163
                        prepStatem.setString(7, oaipmh);
164
                        prepStatem.setString(8, sourceItemType);
165
                        prepStatem.setString(9, simpleDateFormat.format(timestamp));
166
                        prepStatem.setString(10, referrerName);
167
                        prepStatem.setString(11, agent);
168
                        //prepStatem.setString(11, );
169 58141 dimitris.p
                        prepStatem.addBatch();
170
                        batch_size++;
171
                        if (batch_size == 10000) {
172
                            prepStatem.executeBatch();
173 59067 dimitris.p
                            ConnectDB.getConnection().commit();
174 58141 dimitris.p
                            batch_size = 0;
175
                        }
176 45524 tsampikos.
                    }
177
                }
178
            }
179
        }
180 59067 dimitris.p
        try {
181
            prepStatem.executeBatch();
182
            ConnectDB.getConnection().commit();
183
            stmt.close();
184
        } catch (Exception e) {
185 45524 tsampikos.
186 59067 dimitris.p
            if (e instanceof java.sql.SQLException) {
187
                java.sql.SQLException ne = ((java.sql.SQLException) e).getNextException();
188
                System.out.println(ne.getMessage());
189
            }
190 58141 dimitris.p
        }
191 45524 tsampikos.
192
    }
193
194 59067 dimitris.p
    public void removeDoubleClicks() throws Exception {
195 58141 dimitris.p
196 59067 dimitris.p
        Statement stmt = ConnectDB.getConnection().createStatement();
197
        ConnectDB.getConnection().setAutoCommit(false);
198 45524 tsampikos.
199 59067 dimitris.p
        //clean download double clicks
200
        String sql = "DELETE FROM lareferencialogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM lareferencialogtmp p1, lareferencialogtmp p2 WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
201 45524 tsampikos.
        stmt.executeUpdate(sql);
202 45950 tsampikos.
        stmt.close();
203 59067 dimitris.p
        ConnectDB.getConnection().commit();
204 45950 tsampikos.
205 59067 dimitris.p
        stmt = ConnectDB.getConnection().createStatement();
206 45950 tsampikos.
207 59067 dimitris.p
        //clean view double clicks
208
        sql = "DELETE FROM lareferencialogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from lareferencialogtmp p1, lareferencialogtmp p2 WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
209 45524 tsampikos.
        stmt.executeUpdate(sql);
210 45950 tsampikos.
        stmt.close();
211 59067 dimitris.p
        ConnectDB.getConnection().commit();
212
        //conn.close();
213 45524 tsampikos.
    }
214
215 59067 dimitris.p
    public void viewsStats() throws Exception {
216 58141 dimitris.p
217 59067 dimitris.p
        Statement stmt = ConnectDB.getConnection().createStatement();
218
        ConnectDB.getConnection().setAutoCommit(false);
219 58141 dimitris.p
220 59067 dimitris.p
        //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
221
        String sql = "CREATE OR REPLACE VIEW la_result_views_monthly_tmp AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialogtmp where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
222 58141 dimitris.p
        stmt.executeUpdate(sql);
223
224 59067 dimitris.p
        // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
225
        sql = "CREATE TABLE IF NOT EXISTS la_views_stats_tmp AS SELECT 'LaReferencia'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM la_result_views_monthly_tmp p, public.datasource_oids d, public.result_oids ro where p.source=d.orid and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
226 58141 dimitris.p
        stmt.executeUpdate(sql);
227
228
        stmt.close();
229 59067 dimitris.p
        ConnectDB.getConnection().commit();
230
        ConnectDB.getConnection().close();
231 58141 dimitris.p
    }
232
233 59067 dimitris.p
    private void downloadsStats() throws Exception {
234 58141 dimitris.p
235 59067 dimitris.p
        Statement stmt = ConnectDB.getConnection().createStatement();
236
        ConnectDB.getConnection().setAutoCommit(false);
237 58141 dimitris.p
238 59067 dimitris.p
        //String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
239
        String sql = "CREATE OR REPLACE VIEW la_result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
240 58141 dimitris.p
        stmt.executeUpdate(sql);
241
242 59067 dimitris.p
        //sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
243
//        sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
244
        sql = "CREATE TABLE IF NOT EXISTS la_downloads_stats_tmp AS SELECT 'LaReferencia'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM la_result_downloads_monthly_tmp p, public.datasource_oids d, public.result_oids ro where p.source=d.orid and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
245 58141 dimitris.p
        stmt.executeUpdate(sql);
246
247
        stmt.close();
248 59067 dimitris.p
        ConnectDB.getConnection().commit();
249
        ConnectDB.getConnection().close();
250 58141 dimitris.p
    }
251
252
253 59067 dimitris.p
    private void updateProdTables() throws SQLException, Exception {
254 58141 dimitris.p
255 59067 dimitris.p
        Statement stmt = ConnectDB.getConnection().createStatement();
256
        ConnectDB.getConnection().setAutoCommit(false);
257
        String sql = "insert into lareferencialog select * from lareferencialogtmp;";
258 58141 dimitris.p
        stmt.executeUpdate(sql);
259
260 59067 dimitris.p
        sql = "insert into views_stats select * from la_views_stats_tmp;";
261 58141 dimitris.p
        stmt.executeUpdate(sql);
262
263 59067 dimitris.p
        sql = "insert into public.views_stats select * from la_views_stats_tmp;";
264 58141 dimitris.p
        stmt.executeUpdate(sql);
265
266 59067 dimitris.p
        sql = "insert into downloads_stats select * from la_downloads_stats_tmp;";
267 58141 dimitris.p
        stmt.executeUpdate(sql);
268
269 59067 dimitris.p
        sql = "insert into public.downloads_stats select * from la_downloads_stats_tmp;";
270 58141 dimitris.p
        stmt.executeUpdate(sql);
271
272
        stmt.close();
273 59067 dimitris.p
        ConnectDB.getConnection().commit();
274
        ConnectDB.getConnection().close();
275 58141 dimitris.p
276
    }
277
278 45950 tsampikos.
    private ArrayList<String> listHdfsDir(String dir) throws Exception {
279 45524 tsampikos.
        FileSystem hdfs = FileSystem.get(new Configuration());
280
        RemoteIterator<LocatedFileStatus> Files;
281 45950 tsampikos.
        ArrayList<String> fileNames = new ArrayList<>();
282 45524 tsampikos.
283
        try {
284
            Path exportPath = new Path(hdfs.getUri() + dir);
285
            Files = hdfs.listFiles(exportPath, false);
286 45950 tsampikos.
            while (Files.hasNext()) {
287
                String fileName = Files.next().getPath().toString();
288 58141 dimitris.p
                //log.info("Found hdfs file " + fileName);
289 45950 tsampikos.
                fileNames.add(fileName);
290
            }
291 58141 dimitris.p
            //hdfs.close();
292 45524 tsampikos.
        } catch (Exception e) {
293 59067 dimitris.p
            log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logRepoPath));
294
            throw new Exception("HDFS file path with exported data does not exist :   " + logRepoPath, e);
295 45524 tsampikos.
        }
296
297
        return fileNames;
298
    }
299
300 45950 tsampikos.
    private String readHDFSFile(String filename) throws Exception {
301 47073 tsampikos.
        String result;
302 45524 tsampikos.
        try {
303
304
            FileSystem fs = FileSystem.get(new Configuration());
305 58141 dimitris.p
            //log.info("reading file : " + filename);
306
307 45524 tsampikos.
            BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
308 58141 dimitris.p
309 45524 tsampikos.
            StringBuilder sb = new StringBuilder();
310
            String line = br.readLine();
311
312
            while (line != null) {
313 45950 tsampikos.
                if (!line.equals("[]")) {
314 45524 tsampikos.
                    sb.append(line);
315
                }
316
                //sb.append(line);
317
                line = br.readLine();
318
            }
319
            result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
320 45950 tsampikos.
            if (result.equals("")) {
321
                result = "[]";
322 45524 tsampikos.
            }
323
324
            //fs.close();
325
        } catch (Exception e) {
326
            log.error(e);
327
            throw new Exception(e);
328
        }
329
330
        return result;
331
    }
332 58141 dimitris.p
333 45524 tsampikos.
}