1
|
package eu.dnetlib.usagestats.export;
|
2
|
|
3
|
import java.io.*;
|
4
|
import java.net.URLDecoder;
|
5
|
import java.sql.Connection;
|
6
|
import java.sql.PreparedStatement;
|
7
|
import java.sql.SQLException;
|
8
|
import java.sql.Statement;
|
9
|
import java.sql.Timestamp;
|
10
|
import java.text.SimpleDateFormat;
|
11
|
import java.util.*;
|
12
|
import java.util.regex.Matcher;
|
13
|
import java.util.regex.Pattern;
|
14
|
import java.util.stream.Stream;
|
15
|
|
16
|
import org.apache.hadoop.conf.Configuration;
|
17
|
import org.apache.hadoop.fs.LocatedFileStatus;
|
18
|
import org.apache.hadoop.fs.Path;
|
19
|
import org.apache.hadoop.fs.FileSystem;
|
20
|
import org.apache.hadoop.fs.RemoteIterator;
|
21
|
import org.apache.log4j.Logger;
|
22
|
import org.json.simple.JSONArray;
|
23
|
import org.json.simple.JSONObject;
|
24
|
import org.json.simple.parser.JSONParser;
|
25
|
|
26
|
public class PiwikStatsDB {
|
27
|
|
28
|
private String logPath;
|
29
|
private String logRepoPath;
|
30
|
private String logPortalPath;
|
31
|
|
32
|
private Statement stmt = null;
|
33
|
|
34
|
private final Logger log = Logger.getLogger(this.getClass());
|
35
|
private String CounterRobotsURL;
|
36
|
private ArrayList robotsList;
|
37
|
|
38
|
|
39
|
public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception {
|
40
|
this.logRepoPath = logRepoPath;
|
41
|
this.logPortalPath = logPortalPath;
|
42
|
this.createTables();
|
43
|
this.createTmpTables();
|
44
|
}
|
45
|
|
46
|
public void foo() {
|
47
|
Stream<String> s = Arrays.stream(new String[] {"a", "b", "c", "d"});
|
48
|
|
49
|
System.out.println(s.parallel().count());
|
50
|
}
|
51
|
|
52
|
public ArrayList getRobotsList() {
|
53
|
return robotsList;
|
54
|
}
|
55
|
|
56
|
public void setRobotsList(ArrayList robotsList) {
|
57
|
this.robotsList = robotsList;
|
58
|
}
|
59
|
|
60
|
public String getCounterRobotsURL() {
|
61
|
return CounterRobotsURL;
|
62
|
}
|
63
|
|
64
|
public void setCounterRobotsURL(String CounterRobotsURL) {
|
65
|
this.CounterRobotsURL = CounterRobotsURL;
|
66
|
}
|
67
|
|
68
|
private void createTables() throws Exception {
|
69
|
try {
|
70
|
stmt = ConnectDB.getConnection().createStatement();
|
71
|
String sqlCreateTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklog(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
72
|
String sqlcreateRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
73
|
+ " ON INSERT TO piwiklog "
|
74
|
+ " WHERE (EXISTS ( SELECT piwiklog.source, piwiklog.id_visit,"
|
75
|
+ "piwiklog.action, piwiklog.\"timestamp\", piwiklog.entity_id "
|
76
|
+ "FROM piwiklog "
|
77
|
+ "WHERE piwiklog.source = new.source AND piwiklog.id_visit = new.id_visit AND piwiklog.action = new.action AND piwiklog.entity_id = new.entity_id AND piwiklog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
78
|
String sqlCreateRuleIndexPiwikLog = "create index if not exists piwiklog_rule on piwiklog(source, id_visit, action, entity_id, \"timestamp\");";
|
79
|
stmt.executeUpdate(sqlCreateTablePiwikLog);
|
80
|
stmt.executeUpdate(sqlcreateRulePiwikLog);
|
81
|
stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
|
82
|
|
83
|
String sqlCreateTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
|
84
|
String sqlcreateRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
85
|
+ " ON INSERT TO process_portal_log "
|
86
|
+ " WHERE (EXISTS ( SELECT process_portal_log.source, process_portal_log.id_visit,"
|
87
|
+ "process_portal_log.\"timestamp\" "
|
88
|
+ "FROM process_portal_log "
|
89
|
+ "WHERE process_portal_log.source = new.source AND process_portal_log.id_visit = new.id_visit AND process_portal_log.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
90
|
String sqlCreateRuleIndexPortalLog = "create index if not exists process_portal_log_rule on process_portal_log(source, id_visit, \"timestamp\");";
|
91
|
stmt.executeUpdate(sqlCreateTablePortalLog);
|
92
|
stmt.executeUpdate(sqlcreateRulePortalLog);
|
93
|
stmt.executeUpdate(sqlCreateRuleIndexPiwikLog);
|
94
|
|
95
|
stmt.close();
|
96
|
ConnectDB.getConnection().close();
|
97
|
log.info("Usage Tables Created");
|
98
|
|
99
|
} catch (Exception e) {
|
100
|
log.error("Failed to create tables: " + e);
|
101
|
throw new Exception("Failed to create tables: " + e.toString(), e);
|
102
|
}
|
103
|
}
|
104
|
|
105
|
private void createTmpTables() throws Exception {
|
106
|
try {
|
107
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
108
|
String sqlCreateTmpTablePiwikLog = "CREATE TABLE IF NOT EXISTS piwiklogtmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));";
|
109
|
String sqlcreateTmpRulePiwikLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
110
|
+ " ON INSERT TO piwiklogtmp "
|
111
|
+ " WHERE (EXISTS ( SELECT piwiklogtmp.source, piwiklogtmp.id_visit,"
|
112
|
+ "piwiklogtmp.action, piwiklogtmp.\"timestamp\", piwiklogtmp.entity_id "
|
113
|
+ "FROM piwiklogtmp "
|
114
|
+ "WHERE piwiklogtmp.source = new.source AND piwiklogtmp.id_visit = new.id_visit AND piwiklogtmp.action = new.action AND piwiklogtmp.entity_id = new.entity_id AND piwiklogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
115
|
stmt.executeUpdate(sqlCreateTmpTablePiwikLog);
|
116
|
stmt.executeUpdate(sqlcreateTmpRulePiwikLog);
|
117
|
|
118
|
//String sqlCopyPublicPiwiklog="insert into piwiklog select * from public.piwiklog;";
|
119
|
//stmt.executeUpdate(sqlCopyPublicPiwiklog);
|
120
|
String sqlCreateTmpTablePortalLog = "CREATE TABLE IF NOT EXISTS process_portal_log_tmp(source INTEGER, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, timestamp));";
|
121
|
String sqlcreateTmpRulePortalLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS "
|
122
|
+ " ON INSERT TO process_portal_log_tmp "
|
123
|
+ " WHERE (EXISTS ( SELECT process_portal_log_tmp.source, process_portal_log_tmp.id_visit,"
|
124
|
+ "process_portal_log_tmp.\"timestamp\" "
|
125
|
+ "FROM process_portal_log_tmp "
|
126
|
+ "WHERE process_portal_log_tmp.source = new.source AND process_portal_log_tmp.id_visit = new.id_visit AND process_portal_log_tmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;";
|
127
|
stmt.executeUpdate(sqlCreateTmpTablePortalLog);
|
128
|
stmt.executeUpdate(sqlcreateTmpRulePortalLog);
|
129
|
|
130
|
stmt.close();
|
131
|
log.info("Usage Tmp Tables Created");
|
132
|
|
133
|
} catch (Exception e) {
|
134
|
log.error("Failed to create tmptables: " + e);
|
135
|
throw new Exception("Failed to create tmp tables: " + e.toString(), e);
|
136
|
//System.exit(0);
|
137
|
}
|
138
|
}
|
139
|
|
140
|
public void processLogs() throws Exception {
|
141
|
try {
|
142
|
ReadCounterRobotsList counterRobots = new ReadCounterRobotsList(this.getCounterRobotsURL());
|
143
|
this.robotsList = counterRobots.getRobotsPatterns();
|
144
|
|
145
|
processRepositoryLog();
|
146
|
log.info("repository process done");
|
147
|
removeDoubleClicks();
|
148
|
log.info("removing double clicks done");
|
149
|
cleanOAI();
|
150
|
log.info("cleaning oai done");
|
151
|
|
152
|
viewsStats();
|
153
|
downloadsStats();
|
154
|
|
155
|
processPortalLog();
|
156
|
log.info("portal process done");
|
157
|
|
158
|
portalStats();
|
159
|
log.info("portal usagestats done");
|
160
|
|
161
|
updateProdTables();
|
162
|
log.info("updateProdTables done");
|
163
|
|
164
|
} catch (Exception e) {
|
165
|
log.error("Failed to process logs: " + e);
|
166
|
throw new Exception("Failed to process logs: " + e.toString(), e);
|
167
|
}
|
168
|
}
|
169
|
|
170
|
// public void usageStats() throws Exception {
|
171
|
// try {
|
172
|
// viewsStats();
|
173
|
// downloadsStats();
|
174
|
// log.info("stat tables and views done");
|
175
|
// } catch (Exception e) {
|
176
|
// log.error("Failed to create usage usagestats: " + e);
|
177
|
// throw new Exception("Failed to create usage usagestats: " + e.toString(), e);
|
178
|
// }
|
179
|
// }
|
180
|
|
181
|
public void processRepositoryLog() throws Exception {
|
182
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
183
|
ConnectDB.getConnection().setAutoCommit(false);
|
184
|
|
185
|
ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath);
|
186
|
// File dir = new File(this.logRepoPath);
|
187
|
// File[] jsonFiles = dir.listFiles();
|
188
|
|
189
|
PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO piwiklogtmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
|
190
|
int batch_size = 0;
|
191
|
JSONParser parser = new JSONParser();
|
192
|
for (String jsonFile : jsonFiles) {
|
193
|
System.out.println(jsonFile);
|
194
|
JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
|
195
|
for (Object aJsonArray : jsonArray) {
|
196
|
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
|
197
|
int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
|
198
|
String idVisit = jsonObjectRow.get("idVisit").toString();
|
199
|
String country = jsonObjectRow.get("country").toString();
|
200
|
String referrerName = jsonObjectRow.get("referrerName").toString();
|
201
|
String agent = jsonObjectRow.get("browser").toString();
|
202
|
boolean botFound = false;
|
203
|
Iterator it = robotsList.iterator();
|
204
|
while (it.hasNext()) {
|
205
|
// Create a Pattern object
|
206
|
Pattern r = Pattern.compile(it.next().toString());
|
207
|
// Now create matcher object.
|
208
|
Matcher m = r.matcher(agent);
|
209
|
if (m.find()) {
|
210
|
//System.out.println("Found value: " + m.group(0));
|
211
|
botFound = true;
|
212
|
break;
|
213
|
}
|
214
|
}
|
215
|
if (botFound == false) {
|
216
|
String sourceItemType = "repItem";
|
217
|
|
218
|
JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
|
219
|
for (Object actionDetail : actionDetails) {
|
220
|
JSONObject actionDetailsObj = (JSONObject) actionDetail;
|
221
|
|
222
|
if (actionDetailsObj.get("customVariables") != null) {
|
223
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
224
|
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
|
225
|
Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
|
226
|
String url = actionDetailsObj.get("url").toString();
|
227
|
String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1")).get("customVariablePageValue1").toString();
|
228
|
String action = actionDetailsObj.get("type").toString();
|
229
|
|
230
|
prepStatem.setInt(1, idSite);
|
231
|
prepStatem.setString(2, idVisit);
|
232
|
prepStatem.setString(3, country);
|
233
|
prepStatem.setString(4, action);
|
234
|
prepStatem.setString(5, url);
|
235
|
prepStatem.setString(6, oaipmh);
|
236
|
prepStatem.setString(7, sourceItemType);
|
237
|
prepStatem.setString(8, simpleDateFormat.format(timestamp));
|
238
|
prepStatem.setString(9, referrerName);
|
239
|
prepStatem.setString(10, agent);
|
240
|
prepStatem.addBatch();
|
241
|
batch_size++;
|
242
|
if (batch_size == 10000) {
|
243
|
prepStatem.executeBatch();
|
244
|
ConnectDB.getConnection().commit();
|
245
|
batch_size = 0;
|
246
|
}
|
247
|
}
|
248
|
}
|
249
|
}
|
250
|
}
|
251
|
}
|
252
|
prepStatem.executeBatch();
|
253
|
ConnectDB.getConnection().commit();
|
254
|
stmt.close();
|
255
|
}
|
256
|
|
257
|
public void removeDoubleClicks() throws Exception {
|
258
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
259
|
ConnectDB.getConnection().setAutoCommit(false);
|
260
|
|
261
|
//clean download double clicks
|
262
|
String sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
|
263
|
stmt.executeUpdate(sql);
|
264
|
stmt.close();
|
265
|
ConnectDB.getConnection().commit();
|
266
|
|
267
|
stmt = ConnectDB.getConnection().createStatement();
|
268
|
|
269
|
//clean view double clicks
|
270
|
sql = "DELETE FROM piwiklogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from piwiklogtmp p1, piwiklogtmp p2 WHERE p1.source!='5' AND p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);";
|
271
|
stmt.executeUpdate(sql);
|
272
|
stmt.close();
|
273
|
ConnectDB.getConnection().commit();
|
274
|
}
|
275
|
|
276
|
public void viewsStats() throws Exception {
|
277
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
278
|
ConnectDB.getConnection().setAutoCommit(false);
|
279
|
|
280
|
//String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
281
|
String sql = "CREATE OR REPLACE VIEW result_views_monthly_tmp AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
282
|
stmt.executeUpdate(sql);
|
283
|
|
284
|
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
285
|
sql = "CREATE TABLE IF NOT EXISTS views_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly_tmp p, public.datasource d, public.result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
286
|
stmt.executeUpdate(sql);
|
287
|
|
288
|
sql = "CREATE TABLE IF NOT EXISTS views_stats (like views_stats_tmp including all)";
|
289
|
stmt.executeUpdate(sql);
|
290
|
|
291
|
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
292
|
sql = "CREATE TABLE IF NOT EXISTS pageviews_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly_tmp p, public.datasource d, public.result_oids ro where p.source='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
293
|
stmt.executeUpdate(sql);
|
294
|
|
295
|
sql = "CREATE TABLE IF NOT EXISTS pageviews_stats (like pageviews_stats_tmp including all)";
|
296
|
stmt.executeUpdate(sql);
|
297
|
|
298
|
stmt.close();
|
299
|
ConnectDB.getConnection().commit();
|
300
|
ConnectDB.getConnection().close();
|
301
|
}
|
302
|
|
303
|
// public void viewsStats(String piwikid) throws Exception {
|
304
|
// stmt = ConnectDB.getConnection().createStatement();
|
305
|
// ConnectDB.getConnection().setAutoCommit(false);
|
306
|
//
|
307
|
// //String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
308
|
// String sql = "CREATE OR REPLACE VIEW result_views_monthly" + piwikid + " AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog" + piwikid + " where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
309
|
// stmt.executeUpdate(sql);
|
310
|
//
|
311
|
// // sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
312
|
// sql = "CREATE TABLE IF NOT EXISTS views_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source!='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
313
|
// stmt.executeUpdate(sql);
|
314
|
//
|
315
|
//// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count INTO pageviews_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
316
|
// sql = "CREATE TABLE IF NOT EXISTS pageviews_stats" + piwikid + " AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count FROM result_views_monthly" + piwikid + " p, datasource d, result_oids ro where p.source='109' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
317
|
// stmt.executeUpdate(sql);
|
318
|
//
|
319
|
// sql = "DROP VIEW IF EXISTS result_views_monthly" + piwikid + ";";
|
320
|
// stmt.executeUpdate(sql);
|
321
|
//
|
322
|
// stmt.close();
|
323
|
// ConnectDB.getConnection().commit();
|
324
|
// ConnectDB.getConnection().close();
|
325
|
// }
|
326
|
|
327
|
private void downloadsStats() throws Exception {
|
328
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
329
|
ConnectDB.getConnection().setAutoCommit(false);
|
330
|
|
331
|
//String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
332
|
String sql = "CREATE OR REPLACE VIEW result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
333
|
stmt.executeUpdate(sql);
|
334
|
|
335
|
//sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
336
|
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
337
|
sql = "CREATE TABLE IF NOT EXISTS downloads_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM result_downloads_monthly_tmp p, public.datasource d, public.result_oids ro where p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;";
|
338
|
stmt.executeUpdate(sql);
|
339
|
|
340
|
sql = "CREATE TABLE IF NOT EXISTS downloads_stats (like downloads_stats_tmp including all)";
|
341
|
stmt.executeUpdate(sql);
|
342
|
|
343
|
sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp;";
|
344
|
stmt.executeUpdate(sql);
|
345
|
|
346
|
stmt.close();
|
347
|
ConnectDB.getConnection().commit();
|
348
|
ConnectDB.getConnection().close();
|
349
|
}
|
350
|
|
351
|
public void finalizeStats() throws Exception {
|
352
|
stmt = ConnectDB.getConnection().createStatement();
|
353
|
ConnectDB.getConnection().setAutoCommit(false);
|
354
|
|
355
|
Calendar startCalendar = Calendar.getInstance();
|
356
|
startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
|
357
|
Calendar endCalendar = Calendar.getInstance();
|
358
|
int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
|
359
|
int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
|
360
|
|
361
|
// String sql = "SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date INTO full_dates FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
|
362
|
String sql = "CREATE TABLE IF NOT EXISTS full_dates AS SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
|
363
|
stmt.executeUpdate(sql);
|
364
|
|
365
|
sql = "CREATE INDEX IF NOT EXISTS full_dates_full_date ON full_dates USING btree(full_date);";
|
366
|
stmt.executeUpdate(sql);
|
367
|
|
368
|
sql = "CREATE INDEX IF NOT EXISTS views_stats_source ON views_stats USING btree(source);";
|
369
|
stmt.executeUpdate(sql);
|
370
|
|
371
|
sql = "CREATE INDEX IF NOT EXISTS views_stats_repository_id ON views_stats USING btree(repository_id);";
|
372
|
stmt.executeUpdate(sql);
|
373
|
|
374
|
sql = "CREATE INDEX IF NOT EXISTS views_stats_result_id ON views_stats USING btree(result_id);";
|
375
|
stmt.executeUpdate(sql);
|
376
|
|
377
|
sql = "CREATE INDEX IF NOT EXISTS views_stats_date ON views_stats USING btree(date);";
|
378
|
stmt.executeUpdate(sql);
|
379
|
|
380
|
sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_source ON pageviews_stats USING btree(source);";
|
381
|
stmt.executeUpdate(sql);
|
382
|
|
383
|
sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_repository_id ON pageviews_stats USING btree(repository_id);";
|
384
|
stmt.executeUpdate(sql);
|
385
|
|
386
|
sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_result_id ON pageviews_stats USING btree(result_id);";
|
387
|
stmt.executeUpdate(sql);
|
388
|
|
389
|
sql = "CREATE INDEX IF NOT EXISTS pageviews_stats_date ON pageviews_stats USING btree(date);";
|
390
|
stmt.executeUpdate(sql);
|
391
|
|
392
|
sql = "CREATE INDEX IF NOT EXISTS downloads_stats_source ON downloads_stats USING btree(source);";
|
393
|
stmt.executeUpdate(sql);
|
394
|
|
395
|
sql = "CREATE INDEX IF NOT EXISTS downloads_stats_repository_id ON downloads_stats USING btree(repository_id);";
|
396
|
stmt.executeUpdate(sql);
|
397
|
|
398
|
sql = "CREATE INDEX IF NOT EXISTS downloads_stats_result_id ON downloads_stats USING btree(result_id);";
|
399
|
stmt.executeUpdate(sql);
|
400
|
|
401
|
sql = "CREATE INDEX IF NOT EXISTS downloads_stats_date ON downloads_stats USING btree(date);";
|
402
|
stmt.executeUpdate(sql);
|
403
|
|
404
|
// sql = "SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views INTO usage_stats FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
|
405
|
sql = "CREATE TABLE IF NOT EXISTS usage_stats AS SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
|
406
|
stmt.executeUpdate(sql);
|
407
|
|
408
|
sql = "INSERT INTO usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats_tmp AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
|
409
|
stmt.executeUpdate(sql);
|
410
|
|
411
|
sql = "CREATE INDEX IF NOT EXISTS usage_stats_source ON usage_stats USING btree(source);";
|
412
|
stmt.executeUpdate(sql);
|
413
|
|
414
|
sql = "CREATE INDEX IF NOT EXISTS usage_stats_repository_id ON usage_stats USING btree(repository_id);";
|
415
|
stmt.executeUpdate(sql);
|
416
|
|
417
|
sql = "CREATE INDEX IF NOT EXISTS usage_stats_result_id ON usage_stats USING btree(result_id);";
|
418
|
stmt.executeUpdate(sql);
|
419
|
|
420
|
sql = "CREATE INDEX IF NOT EXISTS usage_stats_date ON usage_stats USING btree(date);";
|
421
|
stmt.executeUpdate(sql);
|
422
|
|
423
|
sql = "DROP TABLE IF EXISTS process_portal_log_tmp;";
|
424
|
stmt.executeUpdate(sql);
|
425
|
|
426
|
sql = "DROP TABLE IF EXISTS pageviews_stats_tmp;";
|
427
|
stmt.executeUpdate(sql);
|
428
|
|
429
|
sql = "DROP VIEW IF EXISTS result_views_monthly_tmp";
|
430
|
stmt.executeUpdate(sql);
|
431
|
|
432
|
sql = "DROP TABLE IF EXISTS piwiklogtmp;";
|
433
|
stmt.executeUpdate(sql);
|
434
|
|
435
|
sql = "DROP TABLE IF EXISTS sushilogtmp;";
|
436
|
stmt.executeUpdate(sql);
|
437
|
|
438
|
stmt.close();
|
439
|
ConnectDB.getConnection().commit();
|
440
|
ConnectDB.getConnection().close();
|
441
|
}
|
442
|
|
443
|
//Create repository Views statistics
|
444
|
private void repositoryViewsStats() throws Exception {
|
445
|
stmt = ConnectDB.getConnection().createStatement();
|
446
|
ConnectDB.getConnection().setAutoCommit(false);
|
447
|
|
448
|
// String sql = "SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source INTO repo_view_stats FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
449
|
String sql = "CREATE TABLE IF NOT EXISTS repo_view_stats AS SELECT entity_id AS id , COUNT(entity_id) AS number_of_views, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'action\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
450
|
stmt.executeUpdate(sql);
|
451
|
|
452
|
sql = "CREATE INDEX repo_view_stats_id ON repo_view_stats USING btree (id)";
|
453
|
stmt.executeUpdate(sql);
|
454
|
|
455
|
sql = "CREATE INDEX repo_view_stats_date ON repo_view_stats USING btree(date)";
|
456
|
stmt.executeUpdate(sql);
|
457
|
|
458
|
// sql = "SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_view_stats_monthly_clean FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
|
459
|
sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly_clean AS SELECT roid.id, sum(number_of_views), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_view_stats rvs, result_oids roid where rvs.id=roid.orid group by roid.id, month, source;";
|
460
|
stmt.executeUpdate(sql);
|
461
|
|
462
|
sql = "CREATE INDEX repo_view_stats_monthly_clean_id ON repo_view_stats_monthly_clean USING btree (id)";
|
463
|
stmt.executeUpdate(sql);
|
464
|
|
465
|
sql = "CREATE INDEX repo_view_stats_monthly_clean_month ON repo_view_stats_monthly_clean USING btree(month)";
|
466
|
stmt.executeUpdate(sql);
|
467
|
|
468
|
sql = "CREATE INDEX repo_view_stats_monthly_clean_source ON repo_view_stats_monthly_clean USING btree(source)";
|
469
|
stmt.executeUpdate(sql);
|
470
|
|
471
|
Calendar startCalendar = Calendar.getInstance();
|
472
|
startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
|
473
|
Calendar endCalendar = Calendar.getInstance();
|
474
|
int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
|
475
|
int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
|
476
|
|
477
|
//sql="CREATE OR REPLACE view repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
|
478
|
// sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_view_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
|
479
|
sql = "CREATE TABLE IF NOT EXISTS repo_view_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_view_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_view_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
|
480
|
stmt.executeUpdate(sql);
|
481
|
|
482
|
sql = "CREATE INDEX repo_view_stats_monthly_id ON repo_view_stats_monthly USING btree (id)";
|
483
|
stmt.executeUpdate(sql);
|
484
|
|
485
|
sql = "CREATE INDEX repo_view_stats_monthly_month ON repo_view_stats_monthly USING btree(month)";
|
486
|
stmt.executeUpdate(sql);
|
487
|
|
488
|
sql = "CREATE INDEX repo_view_stats_monthly_source ON repo_view_stats_monthly USING btree(source)";
|
489
|
stmt.executeUpdate(sql);
|
490
|
|
491
|
sql = "CREATE OR REPLACE view repo_view_stats_monthly_sushi AS SELECT id, sum(number_of_views), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_view_stats group by id, month, source;";
|
492
|
stmt.executeUpdate(sql);
|
493
|
|
494
|
stmt.close();
|
495
|
ConnectDB.getConnection().commit();
|
496
|
ConnectDB.getConnection().close();
|
497
|
}
|
498
|
|
499
|
//Create repository downloads statistics
|
500
|
private void repositoryDownloadsStats() throws Exception {
|
501
|
stmt = ConnectDB.getConnection().createStatement();
|
502
|
ConnectDB.getConnection().setAutoCommit(false);
|
503
|
|
504
|
// String sql = "SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source INTO repo_download_stats FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
505
|
String sql = "CREATE TABLE IF NOT EXISTS repo_download_stats AS SELECT entity_id AS id, COUNT(entity_id) AS number_of_downloads, timestamp::date AS date, source FROM piwiklog WHERE source!='5' AND action=\'download\' AND source_item_type=\'repItem\' GROUP BY entity_id, date, source ORDER BY entity_id, date ASC, COUNT(entity_id) DESC;";
|
506
|
stmt.executeUpdate(sql);
|
507
|
|
508
|
sql = "CREATE INDEX repo_download_stats_id ON repo_download_stats USING btree (id)";
|
509
|
stmt.executeUpdate(sql);
|
510
|
|
511
|
sql = "CREATE INDEX repo_download_stats_date ON repo_download_stats USING btree(date)";
|
512
|
stmt.executeUpdate(sql);
|
513
|
|
514
|
// sql = "SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source INTO repo_download_stats_monthly_clean FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
|
515
|
sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly_clean AS SELECT roid.id, sum(number_of_downloads), extract('year' from date) ||'/'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') AS month, source FROM repo_download_stats rvs, result_oids roid WHERE rvs.id=roid.orid GROUP BY roid.id, month, source;";
|
516
|
stmt.executeUpdate(sql);
|
517
|
|
518
|
sql = "CREATE INDEX repo_download_stats_monthly_clean_id ON repo_download_stats_monthly_clean USING btree (id)";
|
519
|
stmt.executeUpdate(sql);
|
520
|
|
521
|
sql = "CREATE INDEX repo_download_stats_monthly_clean_month ON repo_download_stats_monthly_clean USING btree(month)";
|
522
|
stmt.executeUpdate(sql);
|
523
|
|
524
|
sql = "CREATE INDEX repo_download_stats_monthly_clean_source ON repo_download_stats_monthly_clean USING btree(source)";
|
525
|
stmt.executeUpdate(sql);
|
526
|
|
527
|
Calendar startCalendar = Calendar.getInstance();
|
528
|
startCalendar.setTime(new SimpleDateFormat("yyyy-MM-dd").parse("2016-01-01"));
|
529
|
Calendar endCalendar = Calendar.getInstance();
|
530
|
int diffYear = endCalendar.get(Calendar.YEAR) - startCalendar.get(Calendar.YEAR);
|
531
|
int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH);
|
532
|
|
533
|
//sql="CREATE OR REPLACE view repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth +", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
|
534
|
// sql = "select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source INTO repo_download_stats_monthly from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
|
535
|
sql = "CREATE TABLE IF NOT EXISTS repo_download_stats_monthly AS select d.id, d.new_date AS month, case when rdm.sum is null then 0 else rdm.sum end, d.source from (select distinct rdsm.id, to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS new_date, rdsm.source from generate_series(0, " + diffMonth + ", 1) AS offs, repo_download_stats_monthly_clean rdsm) d LEFT JOIN (select id, month, sum, source from repo_download_stats_monthly_clean) rdm ON d.new_date=rdm.month and d.id=rdm.id and d.source=rdm.source order by d.id, d.new_date";
|
536
|
stmt.executeUpdate(sql);
|
537
|
|
538
|
sql = "CREATE INDEX repo_download_stats_monthly_id ON repo_download_stats_monthly USING btree (id)";
|
539
|
stmt.executeUpdate(sql);
|
540
|
|
541
|
sql = "CREATE INDEX repo_download_stats_monthly_month ON repo_download_stats_monthly USING btree(month)";
|
542
|
stmt.executeUpdate(sql);
|
543
|
|
544
|
sql = "CREATE INDEX repo_download_stats_monthly_source ON repo_download_stats_monthly USING btree(source)";
|
545
|
stmt.executeUpdate(sql);
|
546
|
|
547
|
sql = "CREATE OR REPLACE view repo_download_stats_monthly_sushi AS SELECT id, sum(number_of_downloads), extract('year' from date) ||'-'|| LPAD(CAST(extract('month' from date) AS VARCHAR), 2, '0') ||'-01' AS month, source FROM repo_download_stats group by id, month, source;";
|
548
|
stmt.executeUpdate(sql);
|
549
|
|
550
|
stmt.close();
|
551
|
ConnectDB.getConnection().commit();
|
552
|
ConnectDB.getConnection().close();
|
553
|
}
|
554
|
|
555
|
// Import OPENAIRE Logs to DB
|
556
|
public void processPortalLog() throws Exception {
|
557
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
558
|
ConnectDB.getConnection().setAutoCommit(false);
|
559
|
|
560
|
ArrayList<String> jsonFiles = listHdfsDir(this.logPortalPath);
|
561
|
// File folder = new File(this.logPortalPath);
|
562
|
// File[] jsonFiles = folder.listFiles();
|
563
|
|
564
|
PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO process_portal_log_tmp (source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?)");
|
565
|
int batch_size = 0;
|
566
|
JSONParser parser = new JSONParser();
|
567
|
for (String jsonFile : jsonFiles) {
|
568
|
JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile));
|
569
|
|
570
|
for (Object aJsonArray : jsonArray) {
|
571
|
JSONObject jsonObjectRow = (JSONObject) aJsonArray;
|
572
|
int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString());
|
573
|
String idVisit = jsonObjectRow.get("idVisit").toString();
|
574
|
String country = jsonObjectRow.get("country").toString();
|
575
|
String referrerName = jsonObjectRow.get("referrerName").toString();
|
576
|
String agent = jsonObjectRow.get("browser").toString();
|
577
|
boolean botFound = false;
|
578
|
Iterator it = robotsList.iterator();
|
579
|
while (it.hasNext()) {
|
580
|
// Create a Pattern object
|
581
|
Pattern r = Pattern.compile(it.next().toString());
|
582
|
// Now create matcher object.
|
583
|
Matcher m = r.matcher(agent);
|
584
|
if (m.find()) {
|
585
|
botFound = true;
|
586
|
break;
|
587
|
}
|
588
|
}
|
589
|
if (botFound == false) {
|
590
|
JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails"));
|
591
|
for (Object actionDetail : actionDetails) {
|
592
|
JSONObject actionDetailsObj = (JSONObject) actionDetail;
|
593
|
|
594
|
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
595
|
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
|
596
|
Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000);
|
597
|
|
598
|
String action = actionDetailsObj.get("type").toString();
|
599
|
String url = actionDetailsObj.get("url").toString();
|
600
|
|
601
|
String entityID = processPortalURL(url);
|
602
|
String sourceItemType = "";
|
603
|
|
604
|
if (entityID.indexOf("|") > 0) {
|
605
|
sourceItemType = entityID.substring(0, entityID.indexOf("|"));
|
606
|
entityID = entityID.substring(entityID.indexOf("|") + 1);
|
607
|
}
|
608
|
|
609
|
prepStatem.setInt(1, idSite);
|
610
|
prepStatem.setString(2, idVisit);
|
611
|
prepStatem.setString(3, country);
|
612
|
prepStatem.setString(4, action);
|
613
|
prepStatem.setString(5, url);
|
614
|
prepStatem.setString(6, entityID);
|
615
|
prepStatem.setString(7, sourceItemType);
|
616
|
prepStatem.setString(8, simpleDateFormat.format(timestamp));
|
617
|
prepStatem.setString(9, referrerName);
|
618
|
prepStatem.setString(10, agent);
|
619
|
|
620
|
prepStatem.addBatch();
|
621
|
batch_size++;
|
622
|
if (batch_size == 10000) {
|
623
|
prepStatem.executeBatch();
|
624
|
ConnectDB.getConnection().commit();
|
625
|
batch_size = 0;
|
626
|
}
|
627
|
}
|
628
|
}
|
629
|
}
|
630
|
}
|
631
|
prepStatem.executeBatch();
|
632
|
ConnectDB.getConnection().commit();
|
633
|
|
634
|
stmt.close();
|
635
|
ConnectDB.getConnection().close();
|
636
|
}
|
637
|
|
638
|
public void portalStats() throws SQLException {
|
639
|
Connection con = ConnectDB.getConnection();
|
640
|
Statement stmt = con.createStatement();
|
641
|
con.setAutoCommit(false);
|
642
|
|
643
|
String sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'oaItem\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.result_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
|
644
|
stmt.executeUpdate(sql);
|
645
|
stmt.close();
|
646
|
// con.commit();
|
647
|
|
648
|
stmt = con.createStatement();
|
649
|
sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'datasource\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.datasource_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
|
650
|
stmt.executeUpdate(sql);
|
651
|
stmt.close();
|
652
|
// con.commit();
|
653
|
|
654
|
stmt = con.createStatement();
|
655
|
sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'organization\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.organization_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
|
656
|
stmt.executeUpdate(sql);
|
657
|
stmt.close();
|
658
|
// con.commit();
|
659
|
|
660
|
stmt = con.createStatement();
|
661
|
sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'project\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.project_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;";
|
662
|
stmt.executeUpdate(sql);
|
663
|
stmt.close();
|
664
|
// con.commit();
|
665
|
|
666
|
con.close();
|
667
|
}
|
668
|
|
669
|
private void cleanOAI() throws Exception {
|
670
|
ConnectDB.getConnection().setAutoCommit(false);
|
671
|
|
672
|
stmt = ConnectDB.getConnection().createStatement();
|
673
|
String sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chlc.min-saude.pt/','oai:repositorio.chlc.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.chlc.min-saude.pt/%';";
|
674
|
stmt.executeUpdate(sql);
|
675
|
stmt.close();
|
676
|
ConnectDB.getConnection().commit();
|
677
|
|
678
|
stmt = ConnectDB.getConnection().createStatement();
|
679
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hospitaldebraga.pt/','oai:repositorio.hospitaldebraga.pt:') WHERE entity_id LIKE 'oai:repositorio.hospitaldebraga.pt/%';";
|
680
|
stmt.executeUpdate(sql);
|
681
|
stmt.close();
|
682
|
ConnectDB.getConnection().commit();
|
683
|
|
684
|
stmt = ConnectDB.getConnection().createStatement();
|
685
|
sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipl.pt/','oai:repositorio.ipl.pt:') WHERE entity_id LIKE 'oai:repositorio.ipl.pt/%';";
|
686
|
stmt.executeUpdate(sql);
|
687
|
stmt.close();
|
688
|
ConnectDB.getConnection().commit();
|
689
|
|
690
|
stmt = ConnectDB.getConnection().createStatement();
|
691
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:bibliotecadigital.ipb.pt/','oai:bibliotecadigital.ipb.pt:') WHERE entity_id LIKE 'oai:bibliotecadigital.ipb.pt/%';";
|
692
|
stmt.executeUpdate(sql);
|
693
|
stmt.close();
|
694
|
ConnectDB.getConnection().commit();
|
695
|
|
696
|
stmt = ConnectDB.getConnection().createStatement();
|
697
|
sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ismai.pt/','oai:repositorio.ismai.pt:') WHERE entity_id LIKE 'oai:repositorio.ismai.pt/%';";
|
698
|
stmt.executeUpdate(sql);
|
699
|
stmt.close();
|
700
|
ConnectDB.getConnection().commit();
|
701
|
|
702
|
stmt = ConnectDB.getConnection().createStatement();
|
703
|
sql = "UPDATE piwiklog SET entity_id = regexp_replace(entity_id, '^oai:repositorioaberto.uab.pt/','oai:repositorioaberto.uab.pt:') WHERE entity_id LIKE 'oai:repositorioaberto.uab.pt/%';";
|
704
|
stmt.executeUpdate(sql);
|
705
|
stmt.close();
|
706
|
ConnectDB.getConnection().commit();
|
707
|
|
708
|
stmt = ConnectDB.getConnection().createStatement();
|
709
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.uac.pt/','oai:repositorio.uac.pt:') WHERE entity_id LIKE 'oai:repositorio.uac.pt/%';";
|
710
|
stmt.executeUpdate(sql);
|
711
|
stmt.close();
|
712
|
ConnectDB.getConnection().commit();
|
713
|
|
714
|
stmt = ConnectDB.getConnection().createStatement();
|
715
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.insa.pt/','oai:repositorio.insa.pt:') WHERE entity_id LIKE 'oai:repositorio.insa.pt/%';";
|
716
|
stmt.executeUpdate(sql);
|
717
|
stmt.close();
|
718
|
ConnectDB.getConnection().commit();
|
719
|
|
720
|
stmt = ConnectDB.getConnection().createStatement();
|
721
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipcb.pt/','oai:repositorio.ipcb.pt:') WHERE entity_id LIKE 'oai:repositorio.ipcb.pt/%';";
|
722
|
stmt.executeUpdate(sql);
|
723
|
stmt.close();
|
724
|
ConnectDB.getConnection().commit();
|
725
|
|
726
|
stmt = ConnectDB.getConnection().createStatement();
|
727
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ispa.pt/','oai:repositorio.ispa.pt:') WHERE entity_id LIKE 'oai:repositorio.ispa.pt/%';";
|
728
|
stmt.executeUpdate(sql);
|
729
|
stmt.close();
|
730
|
ConnectDB.getConnection().commit();
|
731
|
|
732
|
stmt = ConnectDB.getConnection().createStatement();
|
733
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.chporto.pt/','oai:repositorio.chporto.pt:') WHERE entity_id LIKE 'oai:repositorio.chporto.pt/%';";
|
734
|
stmt.executeUpdate(sql);
|
735
|
stmt.close();
|
736
|
ConnectDB.getConnection().commit();
|
737
|
|
738
|
stmt = ConnectDB.getConnection().createStatement();
|
739
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ucp.pt/','oai:repositorio.ucp.pt:') WHERE entity_id LIKE 'oai:repositorio.ucp.pt/%';";
|
740
|
stmt.executeUpdate(sql);
|
741
|
stmt.close();
|
742
|
ConnectDB.getConnection().commit();
|
743
|
|
744
|
stmt = ConnectDB.getConnection().createStatement();
|
745
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:rihuc.huc.min-saude.pt/','oai:rihuc.huc.min-saude.pt:') WHERE entity_id LIKE 'oai:rihuc.huc.min-saude.pt/%';";
|
746
|
stmt.executeUpdate(sql);
|
747
|
stmt.close();
|
748
|
ConnectDB.getConnection().commit();
|
749
|
|
750
|
stmt = ConnectDB.getConnection().createStatement();
|
751
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipv.pt/','oai:repositorio.ipv.pt:') WHERE entity_id LIKE 'oai:repositorio.ipv.pt/%';";
|
752
|
stmt.executeUpdate(sql);
|
753
|
stmt.close();
|
754
|
ConnectDB.getConnection().commit();
|
755
|
|
756
|
stmt = ConnectDB.getConnection().createStatement();
|
757
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:www.repository.utl.pt/','oai:www.repository.utl.pt:') WHERE entity_id LIKE 'oai:www.repository.utl.pt/%';";
|
758
|
stmt.executeUpdate(sql);
|
759
|
stmt.close();
|
760
|
ConnectDB.getConnection().commit();
|
761
|
|
762
|
stmt = ConnectDB.getConnection().createStatement();
|
763
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:run.unl.pt/','oai:run.unl.pt:') WHERE entity_id LIKE 'oai:run.unl.pt/%';";
|
764
|
stmt.executeUpdate(sql);
|
765
|
stmt.close();
|
766
|
ConnectDB.getConnection().commit();
|
767
|
|
768
|
stmt = ConnectDB.getConnection().createStatement();
|
769
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:sapientia.ualg.pt/','oai:sapientia.ualg.pt:') WHERE entity_id LIKE 'oai:sapientia.ualg.pt/%';";
|
770
|
stmt.executeUpdate(sql);
|
771
|
stmt.close();
|
772
|
ConnectDB.getConnection().commit();
|
773
|
|
774
|
stmt = ConnectDB.getConnection().createStatement();
|
775
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ipsantarem.pt/','oai:repositorio.ipsantarem.pt:') WHERE entity_id LIKE 'oai:repositorio.ipsantarem.pt/%';";
|
776
|
stmt.executeUpdate(sql);
|
777
|
stmt.close();
|
778
|
ConnectDB.getConnection().commit();
|
779
|
|
780
|
stmt = ConnectDB.getConnection().createStatement();
|
781
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:arca.igc.gulbenkian.pt/','oai:arca.igc.gulbenkian.pt:') WHERE entity_id LIKE 'oai:arca.igc.gulbenkian.pt/%';";
|
782
|
stmt.executeUpdate(sql);
|
783
|
stmt.close();
|
784
|
ConnectDB.getConnection().commit();
|
785
|
|
786
|
stmt = ConnectDB.getConnection().createStatement();
|
787
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:ubibliorum.ubi.pt/','oai:ubibliorum.ubi.pt:') WHERE entity_id LIKE 'oai:ubibliorum.ubi.pt/%';";
|
788
|
stmt.executeUpdate(sql);
|
789
|
stmt.close();
|
790
|
ConnectDB.getConnection().commit();
|
791
|
|
792
|
stmt = ConnectDB.getConnection().createStatement();
|
793
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:digituma.uma.pt/','oai:digituma.uma.pt:') WHERE entity_id LIKE 'oai:digituma.uma.pt/%';";
|
794
|
stmt.executeUpdate(sql);
|
795
|
stmt.close();
|
796
|
ConnectDB.getConnection().commit();
|
797
|
|
798
|
stmt = ConnectDB.getConnection().createStatement();
|
799
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.ul.pt/','oai:repositorio.ul.pt:') WHERE entity_id LIKE 'oai:repositorio.ul.pt/%';";
|
800
|
stmt.executeUpdate(sql);
|
801
|
stmt.close();
|
802
|
ConnectDB.getConnection().commit();
|
803
|
|
804
|
stmt = ConnectDB.getConnection().createStatement();
|
805
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.hff.min-saude.pt/','oai:repositorio.hff.min-saude.pt:') WHERE entity_id LIKE 'oai:repositorio.hff.min-saude.pt/%';";
|
806
|
stmt.executeUpdate(sql);
|
807
|
stmt.close();
|
808
|
ConnectDB.getConnection().commit();
|
809
|
|
810
|
stmt = ConnectDB.getConnection().createStatement();
|
811
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorium.sdum.uminho.pt/','oai:repositorium.sdum.uminho.pt:') WHERE entity_id LIKE 'oai:repositorium.sdum.uminho.pt/%';";
|
812
|
stmt.executeUpdate(sql);
|
813
|
stmt.close();
|
814
|
ConnectDB.getConnection().commit();
|
815
|
|
816
|
stmt = ConnectDB.getConnection().createStatement();
|
817
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:recipp.ipp.pt/','oai:recipp.ipp.pt:') WHERE entity_id LIKE 'oai:recipp.ipp.pt/%';";
|
818
|
stmt.executeUpdate(sql);
|
819
|
stmt.close();
|
820
|
ConnectDB.getConnection().commit();
|
821
|
|
822
|
stmt = ConnectDB.getConnection().createStatement();
|
823
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:bdigital.ufp.pt/','oai:bdigital.ufp.pt:') WHERE entity_id LIKE 'oai:bdigital.ufp.pt/%';";
|
824
|
stmt.executeUpdate(sql);
|
825
|
stmt.close();
|
826
|
ConnectDB.getConnection().commit();
|
827
|
|
828
|
stmt = ConnectDB.getConnection().createStatement();
|
829
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:repositorio.lneg.pt/','oai:repositorio.lneg.pt:') WHERE entity_id LIKE 'oai:repositorio.lneg.pt/%';";
|
830
|
stmt.executeUpdate(sql);
|
831
|
stmt.close();
|
832
|
ConnectDB.getConnection().commit();
|
833
|
|
834
|
stmt = ConnectDB.getConnection().createStatement();
|
835
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:iconline.ipleiria.pt/','oai:iconline.ipleiria.pt:') WHERE entity_id LIKE 'oai:iconline.ipleiria.pt/%';";
|
836
|
stmt.executeUpdate(sql);
|
837
|
stmt.close();
|
838
|
ConnectDB.getConnection().commit();
|
839
|
|
840
|
stmt = ConnectDB.getConnection().createStatement();
|
841
|
sql = "UPDATE piwiklogtmp SET entity_id = regexp_replace(entity_id, '^oai:comum.rcaap.pt/','oai:comum.rcaap.pt:') WHERE entity_id LIKE 'oai:comum.rcaap.pt/%';";
|
842
|
stmt.executeUpdate(sql);
|
843
|
stmt.close();
|
844
|
ConnectDB.getConnection().commit();
|
845
|
|
846
|
ConnectDB.getConnection().close();
|
847
|
}
|
848
|
|
849
|
private String processPortalURL(String url) {
|
850
|
|
851
|
if (url.indexOf("explore.openaire.eu") > 0) {
|
852
|
try {
|
853
|
url = URLDecoder.decode(url, "UTF-8");
|
854
|
} catch (Exception e) {
|
855
|
log.info(url);
|
856
|
}
|
857
|
if (url.indexOf("datasourceId=") > 0 && url.substring(url.indexOf("datasourceId=") + 13).length() >= 46) {
|
858
|
url = "datasource|" + url.substring(url.indexOf("datasourceId=") + 13, url.indexOf("datasourceId=") + 59);
|
859
|
} else if (url.indexOf("datasource=") > 0 && url.substring(url.indexOf("datasource=") + 11).length() >= 46) {
|
860
|
url = "datasource|" + url.substring(url.indexOf("datasource=") + 11, url.indexOf("datasource=") + 57);
|
861
|
} else if (url.indexOf("datasourceFilter=") > 0 && url.substring(url.indexOf("datasourceFilter=") + 17).length() >= 46) {
|
862
|
url = "datasource|" + url.substring(url.indexOf("datasourceFilter=") + 17, url.indexOf("datasourceFilter=") + 63);
|
863
|
} else if (url.indexOf("articleId=") > 0 && url.substring(url.indexOf("articleId=") + 10).length() >= 46) {
|
864
|
url = "result|" + url.substring(url.indexOf("articleId=") + 10, url.indexOf("articleId=") + 56);
|
865
|
} else if (url.indexOf("datasetId=") > 0 && url.substring(url.indexOf("datasetId=") + 10).length() >= 46) {
|
866
|
url = "result|" + url.substring(url.indexOf("datasetId=") + 10, url.indexOf("datasetId=") + 56);
|
867
|
} else if (url.indexOf("projectId=") > 0 && url.substring(url.indexOf("projectId=") + 10).length() >= 46 && !url.contains("oai:dnet:corda")) {
|
868
|
url = "project|" + url.substring(url.indexOf("projectId=") + 10, url.indexOf("projectId=") + 56);
|
869
|
} else if (url.indexOf("organizationId=") > 0 && url.substring(url.indexOf("organizationId=") + 15).length() >= 46) {
|
870
|
url = "organization|" + url.substring(url.indexOf("organizationId=") + 15, url.indexOf("organizationId=") + 61);
|
871
|
} else {
|
872
|
url = "";
|
873
|
}
|
874
|
} else {
|
875
|
url = "";
|
876
|
}
|
877
|
|
878
|
return url;
|
879
|
}
|
880
|
|
881
|
private void updateProdTables() throws SQLException {
|
882
|
Statement stmt = ConnectDB.getConnection().createStatement();
|
883
|
ConnectDB.getConnection().setAutoCommit(false);
|
884
|
|
885
|
String sql = "insert into piwiklog select * from piwiklogtmp;";
|
886
|
stmt.executeUpdate(sql);
|
887
|
|
888
|
sql = "insert into views_stats select * from views_stats_tmp;";
|
889
|
stmt.executeUpdate(sql);
|
890
|
|
891
|
sql = "insert into downloads_stats select * from downloads_stats_tmp;";
|
892
|
stmt.executeUpdate(sql);
|
893
|
|
894
|
sql = "insert into pageviews_stats select * from pageviews_stats_tmp;";
|
895
|
stmt.executeUpdate(sql);
|
896
|
|
897
|
sql = "DROP TABLE IF EXISTS views_stats_tmp;";
|
898
|
stmt.executeUpdate(sql);
|
899
|
|
900
|
sql = "DROP TABLE IF EXISTS downloads_stats_tmp;";
|
901
|
stmt.executeUpdate(sql);
|
902
|
|
903
|
sql = "DROP TABLE IF EXISTS pageviews_stats_tmp;";
|
904
|
stmt.executeUpdate(sql);
|
905
|
|
906
|
sql = "DROP TABLE IF EXISTS process_portal_log_tmp;";
|
907
|
stmt.executeUpdate(sql);
|
908
|
|
909
|
|
910
|
stmt.close();
|
911
|
ConnectDB.getConnection().commit();
|
912
|
ConnectDB.getConnection().close();
|
913
|
|
914
|
log.info("updateProdTables done");
|
915
|
}
|
916
|
|
917
|
private ArrayList<String> listHdfsDir(String dir) throws Exception {
|
918
|
|
919
|
FileSystem hdfs = FileSystem.get(new Configuration());
|
920
|
RemoteIterator<LocatedFileStatus> Files;
|
921
|
ArrayList<String> fileNames = new ArrayList<>();
|
922
|
|
923
|
try {
|
924
|
Path exportPath = new Path(hdfs.getUri() + dir);
|
925
|
Files = hdfs.listFiles(exportPath, false);
|
926
|
while (Files.hasNext()) {
|
927
|
String fileName = Files.next().getPath().toString();
|
928
|
fileNames.add(fileName);
|
929
|
}
|
930
|
|
931
|
hdfs.close();
|
932
|
} catch (Exception e) {
|
933
|
log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logPath));
|
934
|
throw new Exception("HDFS file path with exported data does not exist : " + logPath, e);
|
935
|
}
|
936
|
|
937
|
return fileNames;
|
938
|
}
|
939
|
|
940
|
private String readHDFSFile(String filename) throws Exception {
|
941
|
String result;
|
942
|
try {
|
943
|
|
944
|
FileSystem fs = FileSystem.get(new Configuration());
|
945
|
//log.info("reading file : " + filename);
|
946
|
|
947
|
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename))));
|
948
|
|
949
|
StringBuilder sb = new StringBuilder();
|
950
|
String line = br.readLine();
|
951
|
|
952
|
while (line != null) {
|
953
|
if (!line.equals("[]")) {
|
954
|
sb.append(line);
|
955
|
}
|
956
|
//sb.append(line);
|
957
|
line = br.readLine();
|
958
|
}
|
959
|
result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\"");
|
960
|
if (result.equals("")) {
|
961
|
result = "[]";
|
962
|
}
|
963
|
|
964
|
//fs.close();
|
965
|
} catch (Exception e) {
|
966
|
log.error(e);
|
967
|
throw new Exception(e);
|
968
|
}
|
969
|
|
970
|
return result;
|
971
|
}
|
972
|
|
973
|
private Connection getConnection() throws SQLException {
|
974
|
return ConnectDB.getConnection();
|
975
|
}
|
976
|
}
|