Revision 59067
Added by Dimitris Pierrakos over 3 years ago
modules/dnet-openaire-usage-stats-export-wf/branches/usage_stats_export_v2/dnet-openaire-usage-stats-workflow/src/main/resources/eu/dnetlib/iis/core/javamapreduce/usage_stats/job.properties | ||
---|---|---|
30 | 30 |
mapred.mapper.new-api=true |
31 | 31 |
mapred.reducer.new-api=true |
32 | 32 |
oozie.service.loc=http://oozie.hadoop.dm.openaire.eu:11000/oozie |
33 |
oozie.wf.application.path=hdfs://dm-cluster-nn/user/antonis.lempesis/core/javamapreduce/usage_stats/oozie_app
|
|
33 |
oozie.wf.application.path=hdfs://dm-cluster-nn/user/dimitris.pierrakos/core/javamapreduce/usage_stats/oozie_app
|
|
34 | 34 |
piwik_filterOffset=5 |
35 | 35 |
piwik_finalDate=2016-01-03 |
36 | 36 |
piwik_httpProtocol=https |
... | ... | |
41 | 41 |
piwik_url=analytics.openaire.eu |
42 | 42 |
piwik_username=usage_openaire |
43 | 43 |
token_auth=32846584f571be9b57488bf4088f30ea |
44 |
workingDir=/user/antonis.lempesis/core/javamapreduce/usage_stats/working_dir
|
|
44 |
workingDir=/user/dimitris.pierrakos/core/javamapreduce/usage_stats/working_dir
|
|
45 | 45 |
zookeeper.znode.parent=/hbase |
46 | 46 |
zookeeper.znode.rootserver=root-region-server |
47 | 47 |
COUNTER_robots_Url=https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots_list.json |
... | ... | |
50 | 50 |
matomo_BaseUrl=analytics.openaire.eu |
51 | 51 |
repo_LogPath=${piwik_logsPath}/Repologs/ |
52 | 52 |
portal_LogPath=${piwik_logsPath}/Portallogs/ |
53 |
portal_MatomoID=109 |
|
53 |
portal_MatomoID=109 |
|
54 |
LaReferenciaBaseUrl=http://matomo.lareferencia.info |
|
55 |
LareferenciaAuthToken=484874b3655d5a831eb8db33695790c4 |
|
56 |
lareferenciaLogPath=${piwik_logsPath}/LaReferenciaLogs |
modules/dnet-openaire-usage-stats-export-wf/branches/usage_stats_export_v2/dnet-openaire-usage-stats-workflow/src/main/resources/eu/dnetlib/iis/core/javamapreduce/usage_stats/oozie_app/workflow.xml | ||
---|---|---|
32 | 32 |
<mkdir path="${nameNode}${piwik_logsPath}"/> |
33 | 33 |
<mkdir path="${nameNode}${portal_LogPath}"/> |
34 | 34 |
<mkdir path="${nameNode}${repo_LogPath}"/> |
35 |
<mkdir path="${nameNode}${lareferenciaLogPath}"/> |
|
35 | 36 |
</prepare> |
36 | 37 |
<configuration> |
37 | 38 |
<property> |
... | ... | |
70 | 71 |
<arg>-Prepo_LogPath=${repo_LogPath}</arg> |
71 | 72 |
<arg>-Pportal_LogPath=${portal_LogPath}</arg> |
72 | 73 |
<arg>-Pportal_MatomoID=${portal_MatomoID}</arg> |
74 |
<arg>-PLaReferenciaBaseUrl=${LaReferenciaBaseUrl}</arg> |
|
75 |
<arg>-PLareferenciaAuthToken=${LareferenciaAuthToken}</arg> |
|
76 |
<arg>-PlareferenciaLogPath=${lareferenciaLogPath}</arg> |
|
77 |
|
|
73 | 78 |
</java> |
74 | 79 |
|
75 | 80 |
<ok to="cleanUpHDFS"/> |
modules/dnet-openaire-usage-stats-export-wf/branches/usage_stats_export_v2/dnet-openaire-usage-stats-export/src/main/java/eu/dnetlib/usagestats/export/LaReferenciaStats.java | ||
---|---|---|
1 |
package eu.dnetlib.usagestats.export; |
|
2 |
|
|
3 |
import java.io.*; |
|
4 |
import java.net.URLDecoder; |
|
5 |
import java.sql.Connection; |
|
6 |
import java.sql.PreparedStatement; |
|
7 |
import java.sql.SQLException; |
|
8 |
import java.sql.Statement; |
|
9 |
import java.sql.Timestamp; |
|
10 |
import java.text.SimpleDateFormat; |
|
11 |
import java.util.*; |
|
12 |
import java.util.regex.Matcher; |
|
13 |
import java.util.regex.Pattern; |
|
14 |
|
|
15 |
import org.apache.hadoop.conf.Configuration; |
|
16 |
import org.apache.hadoop.fs.LocatedFileStatus; |
|
17 |
import org.apache.hadoop.fs.Path; |
|
18 |
import org.apache.hadoop.fs.FileSystem; |
|
19 |
import org.apache.hadoop.fs.RemoteIterator; |
|
20 |
import org.apache.log4j.Logger; |
|
21 |
import org.json.simple.JSONArray; |
|
22 |
import org.json.simple.JSONObject; |
|
23 |
import org.json.simple.parser.JSONParser; |
|
24 |
|
|
25 |
public class LaReferenciaStats { |
|
26 |
|
|
27 |
private String logRepoPath; |
|
28 |
|
|
29 |
private Statement stmt = null; |
|
30 |
|
|
31 |
private final Logger log = Logger.getLogger(this.getClass()); |
|
32 |
private String CounterRobotsURL; |
|
33 |
private ArrayList robotsList; |
|
34 |
|
|
35 |
public LaReferenciaStats(String logRepoPath) throws Exception { |
|
36 |
this.logRepoPath = logRepoPath; |
|
37 |
this.createTables(); |
|
38 |
this.createTmpTables(); |
|
39 |
} |
|
40 |
|
|
41 |
/* |
|
42 |
private void connectDB() throws Exception { |
|
43 |
try { |
|
44 |
ConnectDB connectDB = new ConnectDB(); |
|
45 |
} catch (Exception e) { |
|
46 |
log.error("Connect to db failed: " + e); |
|
47 |
throw new Exception("Failed to connect to db: " + e.toString(), e); |
|
48 |
} |
|
49 |
} |
|
50 |
*/ |
|
51 |
private void createTables() throws Exception { |
|
52 |
try { |
|
53 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
|
54 |
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialog(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));"; |
|
55 |
String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " |
|
56 |
+ " ON INSERT TO lareferencialog " |
|
57 |
+ " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit," |
|
58 |
+ "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id " |
|
59 |
+ "FROM lareferencialog " |
|
60 |
+ "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;"; |
|
61 |
String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");"; |
|
62 |
stmt.executeUpdate(sqlCreateTableLareferenciaLog); |
|
63 |
stmt.executeUpdate(sqlcreateRuleLaReferenciaLog); |
|
64 |
stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog); |
|
65 |
|
|
66 |
stmt.close(); |
|
67 |
ConnectDB.getConnection().close(); |
|
68 |
log.info("Lareferencia Tables Created"); |
|
69 |
|
|
70 |
} catch (Exception e) { |
|
71 |
log.error("Failed to create tables: " + e); |
|
72 |
throw new Exception("Failed to create tables: " + e.toString(), e); |
|
73 |
//System.exit(0); |
|
74 |
} |
|
75 |
} |
|
76 |
|
|
77 |
private void createTmpTables() throws Exception { |
|
78 |
|
|
79 |
try { |
|
80 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
|
81 |
String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));"; |
|
82 |
String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " |
|
83 |
+ " ON INSERT TO lareferencialogtmp " |
|
84 |
+ " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit," |
|
85 |
+ "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id " |
|
86 |
+ "FROM lareferencialogtmp " |
|
87 |
+ "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;"; |
|
88 |
stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog); |
|
89 |
stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog); |
|
90 |
|
|
91 |
stmt.close(); |
|
92 |
log.info("Lareferencia Tmp Tables Created"); |
|
93 |
|
|
94 |
} catch (Exception e) { |
|
95 |
log.error("Failed to create tmptables: " + e); |
|
96 |
throw new Exception("Failed to create tmp tables: " + e.toString(), e); |
|
97 |
//System.exit(0); |
|
98 |
} |
|
99 |
} |
|
100 |
|
|
101 |
public void processLogs() throws Exception { |
|
102 |
try { |
|
103 |
|
|
104 |
processlaReferenciaLog(); |
|
105 |
log.info("LaReferencia repository process done"); |
|
106 |
removeDoubleClicks(); |
|
107 |
log.info("LaReferencia removing double clicks done"); |
|
108 |
viewsStats(); |
|
109 |
log.info("LaReferencia views done"); |
|
110 |
downloadsStats(); |
|
111 |
log.info("LaReferencia downloads done"); |
|
112 |
updateProdTables(); |
|
113 |
log.info("LaReferencia update productions tables done"); |
|
114 |
|
|
115 |
} catch (Exception e) { |
|
116 |
log.error("Failed to process logs: " + e); |
|
117 |
throw new Exception("Failed to process logs: " + e.toString(), e); |
|
118 |
} |
|
119 |
} |
|
120 |
|
|
121 |
public void processlaReferenciaLog() throws Exception { |
|
122 |
|
|
123 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
|
124 |
ConnectDB.getConnection().setAutoCommit(false); |
|
125 |
ArrayList<String> jsonFiles = listHdfsDir(this.logRepoPath); |
|
126 |
|
|
127 |
//File dir = new File(this.logRepoPath); |
|
128 |
//File[] jsonFiles = dir.listFiles(); |
|
129 |
PreparedStatement prepStatem = ConnectDB.getConnection().prepareStatement("INSERT INTO lareferencialogtmp (matomoid, source, id_visit, country, action, url, entity_id, source_item_type, timestamp, referrer_name, agent) VALUES (?,?,?,?,?,?,?,?,?,?,?)"); |
|
130 |
int batch_size = 0; |
|
131 |
|
|
132 |
JSONParser parser = new JSONParser(); |
|
133 |
for (String jsonFile : jsonFiles) { |
|
134 |
System.out.println(jsonFile); |
|
135 |
JSONArray jsonArray = (JSONArray) parser.parse(readHDFSFile(jsonFile)); |
|
136 |
for (Object aJsonArray : jsonArray) { |
|
137 |
JSONObject jsonObjectRow = (JSONObject) aJsonArray; |
|
138 |
int idSite = Integer.parseInt(jsonObjectRow.get("idSite").toString()); |
|
139 |
String idVisit = jsonObjectRow.get("idVisit").toString(); |
|
140 |
String country = jsonObjectRow.get("country").toString(); |
|
141 |
String referrerName = jsonObjectRow.get("referrerName").toString(); |
|
142 |
String agent = jsonObjectRow.get("browser").toString(); |
|
143 |
String sourceItemType = "repItem"; |
|
144 |
|
|
145 |
JSONArray actionDetails = (JSONArray) jsonObjectRow.get(("actionDetails")); |
|
146 |
for (Object actionDetail : actionDetails) { |
|
147 |
JSONObject actionDetailsObj = (JSONObject) actionDetail; |
|
148 |
|
|
149 |
if (actionDetailsObj.get("customVariables") != null) { |
|
150 |
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); |
|
151 |
simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC")); |
|
152 |
Timestamp timestamp = new Timestamp(Long.parseLong(actionDetailsObj.get("timestamp").toString()) * 1000); |
|
153 |
String url = actionDetailsObj.get("url").toString(); |
|
154 |
String oaipmh = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("1")).get("customVariablePageValue1").toString(); |
|
155 |
String opendoar = ((JSONObject) ((JSONObject) actionDetailsObj.get("customVariables")).get("2")).get("customVariablePageValue2").toString(); |
|
156 |
String action = actionDetailsObj.get("type").toString(); |
|
157 |
prepStatem.setInt(1, idSite); |
|
158 |
prepStatem.setString(2, "opendoar____::" + opendoar); |
|
159 |
prepStatem.setString(3, idVisit); |
|
160 |
prepStatem.setString(4, country); |
|
161 |
prepStatem.setString(5, action); |
|
162 |
prepStatem.setString(6, url); |
|
163 |
prepStatem.setString(7, oaipmh); |
|
164 |
prepStatem.setString(8, sourceItemType); |
|
165 |
prepStatem.setString(9, simpleDateFormat.format(timestamp)); |
|
166 |
prepStatem.setString(10, referrerName); |
|
167 |
prepStatem.setString(11, agent); |
|
168 |
//prepStatem.setString(11, ); |
|
169 |
prepStatem.addBatch(); |
|
170 |
batch_size++; |
|
171 |
if (batch_size == 10000) { |
|
172 |
prepStatem.executeBatch(); |
|
173 |
ConnectDB.getConnection().commit(); |
|
174 |
batch_size = 0; |
|
175 |
} |
|
176 |
} |
|
177 |
} |
|
178 |
} |
|
179 |
} |
|
180 |
try { |
|
181 |
prepStatem.executeBatch(); |
|
182 |
ConnectDB.getConnection().commit(); |
|
183 |
stmt.close(); |
|
184 |
} catch (Exception e) { |
|
185 |
|
|
186 |
if (e instanceof java.sql.SQLException) { |
|
187 |
java.sql.SQLException ne = ((java.sql.SQLException) e).getNextException(); |
|
188 |
System.out.println(ne.getMessage()); |
|
189 |
} |
|
190 |
} |
|
191 |
|
|
192 |
} |
|
193 |
|
|
194 |
public void removeDoubleClicks() throws Exception { |
|
195 |
|
|
196 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
|
197 |
ConnectDB.getConnection().setAutoCommit(false); |
|
198 |
|
|
199 |
//clean download double clicks |
|
200 |
String sql = "DELETE FROM lareferencialogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp FROM lareferencialogtmp p1, lareferencialogtmp p2 WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='download' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<30 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);"; |
|
201 |
stmt.executeUpdate(sql); |
|
202 |
stmt.close(); |
|
203 |
ConnectDB.getConnection().commit(); |
|
204 |
|
|
205 |
stmt = ConnectDB.getConnection().createStatement(); |
|
206 |
|
|
207 |
//clean view double clicks |
|
208 |
sql = "DELETE FROM lareferencialogtmp p WHERE EXISTS (SELECT DISTINCT p1.source, p1.id_visit, p1.action, p1.entity_id, p1.timestamp from lareferencialogtmp p1, lareferencialogtmp p2 WHERE p1.source=p2.source AND p1.id_visit=p2.id_visit AND p1.entity_id=p2.entity_id AND p1.action=p2.action AND p1.action='action' AND p1.timestamp!=p2.timestamp AND p1.timestamp<p2.timestamp AND extract(EPOCH FROM p2.timestamp::timestamp-p1.timestamp::timestamp)<10 AND p.source=p1.source AND p.id_visit=p1.id_visit AND p.action=p1.action AND p.entity_id=p1.entity_id AND p.timestamp=p1.timestamp);"; |
|
209 |
stmt.executeUpdate(sql); |
|
210 |
stmt.close(); |
|
211 |
ConnectDB.getConnection().commit(); |
|
212 |
//conn.close(); |
|
213 |
} |
|
214 |
|
|
215 |
public void viewsStats() throws Exception { |
|
216 |
|
|
217 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
|
218 |
ConnectDB.getConnection().setAutoCommit(false); |
|
219 |
|
|
220 |
//String sql = "CREATE OR REPLACE VIEW result_views_monthly AS SELECT entity_id AS id, COUNT(entity_id) as views, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialog where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;"; |
|
221 |
String sql = "CREATE OR REPLACE VIEW la_result_views_monthly_tmp AS SELECT entity_id AS id, COUNT(entity_id) as views, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialogtmp where action='action' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;"; |
|
222 |
stmt.executeUpdate(sql); |
|
223 |
|
|
224 |
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire INTO views_stats FROM result_views_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;"; |
|
225 |
sql = "CREATE TABLE IF NOT EXISTS la_views_stats_tmp AS SELECT 'LaReferencia'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(views) AS count, max(openaire_referrer) AS openaire FROM la_result_views_monthly_tmp p, public.datasource_oids d, public.result_oids ro where p.source=d.orid and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;"; |
|
226 |
stmt.executeUpdate(sql); |
|
227 |
|
|
228 |
stmt.close(); |
|
229 |
ConnectDB.getConnection().commit(); |
|
230 |
ConnectDB.getConnection().close(); |
|
231 |
} |
|
232 |
|
|
233 |
private void downloadsStats() throws Exception { |
|
234 |
|
|
235 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
|
236 |
ConnectDB.getConnection().setAutoCommit(false); |
|
237 |
|
|
238 |
//String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;"; |
|
239 |
String sql = "CREATE OR REPLACE VIEW la_result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM lareferencialogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;"; |
|
240 |
stmt.executeUpdate(sql); |
|
241 |
|
|
242 |
//sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;"; |
|
243 |
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;"; |
|
244 |
sql = "CREATE TABLE IF NOT EXISTS la_downloads_stats_tmp AS SELECT 'LaReferencia'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM la_result_downloads_monthly_tmp p, public.datasource_oids d, public.result_oids ro where p.source=d.orid and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;"; |
|
245 |
stmt.executeUpdate(sql); |
|
246 |
|
|
247 |
stmt.close(); |
|
248 |
ConnectDB.getConnection().commit(); |
|
249 |
ConnectDB.getConnection().close(); |
|
250 |
} |
|
251 |
|
|
252 |
|
|
253 |
private void updateProdTables() throws SQLException, Exception { |
|
254 |
|
|
255 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
|
256 |
ConnectDB.getConnection().setAutoCommit(false); |
|
257 |
String sql = "insert into lareferencialog select * from lareferencialogtmp;"; |
|
258 |
stmt.executeUpdate(sql); |
|
259 |
|
|
260 |
sql = "insert into views_stats select * from la_views_stats_tmp;"; |
|
261 |
stmt.executeUpdate(sql); |
|
262 |
|
|
263 |
sql = "insert into public.views_stats select * from la_views_stats_tmp;"; |
|
264 |
stmt.executeUpdate(sql); |
|
265 |
|
|
266 |
sql = "insert into downloads_stats select * from la_downloads_stats_tmp;"; |
|
267 |
stmt.executeUpdate(sql); |
|
268 |
|
|
269 |
sql = "insert into public.downloads_stats select * from la_downloads_stats_tmp;"; |
|
270 |
stmt.executeUpdate(sql); |
|
271 |
|
|
272 |
stmt.close(); |
|
273 |
ConnectDB.getConnection().commit(); |
|
274 |
ConnectDB.getConnection().close(); |
|
275 |
|
|
276 |
} |
|
277 |
|
|
278 |
private ArrayList<String> listHdfsDir(String dir) throws Exception { |
|
279 |
FileSystem hdfs = FileSystem.get(new Configuration()); |
|
280 |
RemoteIterator<LocatedFileStatus> Files; |
|
281 |
ArrayList<String> fileNames = new ArrayList<>(); |
|
282 |
|
|
283 |
try { |
|
284 |
Path exportPath = new Path(hdfs.getUri() + dir); |
|
285 |
Files = hdfs.listFiles(exportPath, false); |
|
286 |
while (Files.hasNext()) { |
|
287 |
String fileName = Files.next().getPath().toString(); |
|
288 |
//log.info("Found hdfs file " + fileName); |
|
289 |
fileNames.add(fileName); |
|
290 |
} |
|
291 |
//hdfs.close(); |
|
292 |
} catch (Exception e) { |
|
293 |
log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logRepoPath)); |
|
294 |
throw new Exception("HDFS file path with exported data does not exist : " + logRepoPath, e); |
|
295 |
} |
|
296 |
|
|
297 |
return fileNames; |
|
298 |
} |
|
299 |
|
|
300 |
private String readHDFSFile(String filename) throws Exception { |
|
301 |
String result; |
|
302 |
try { |
|
303 |
|
|
304 |
FileSystem fs = FileSystem.get(new Configuration()); |
|
305 |
//log.info("reading file : " + filename); |
|
306 |
|
|
307 |
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(new Path(filename)))); |
|
308 |
|
|
309 |
StringBuilder sb = new StringBuilder(); |
|
310 |
String line = br.readLine(); |
|
311 |
|
|
312 |
while (line != null) { |
|
313 |
if (!line.equals("[]")) { |
|
314 |
sb.append(line); |
|
315 |
} |
|
316 |
//sb.append(line); |
|
317 |
line = br.readLine(); |
|
318 |
} |
|
319 |
result = sb.toString().replace("][{\"idSite\"", ",{\"idSite\""); |
|
320 |
if (result.equals("")) { |
|
321 |
result = "[]"; |
|
322 |
} |
|
323 |
|
|
324 |
//fs.close(); |
|
325 |
} catch (Exception e) { |
|
326 |
log.error(e); |
|
327 |
throw new Exception(e); |
|
328 |
} |
|
329 |
|
|
330 |
return result; |
|
331 |
} |
|
332 |
|
|
333 |
} |
modules/dnet-openaire-usage-stats-export-wf/branches/usage_stats_export_v2/dnet-openaire-usage-stats-export/src/main/java/eu/dnetlib/usagestats/export/PiwikDownloadLogs.java | ||
---|---|---|
96 | 96 |
|
97 | 97 |
String period = "&period=day&date=" + sdf.format(date); |
98 | 98 |
String outFolder = ""; |
99 |
//portal siteId = 109; |
|
100 | 99 |
if (siteId == Integer.parseInt(portalMatomoID)) { |
101 | 100 |
outFolder = portalLogPath; |
102 | 101 |
} else { |
modules/dnet-openaire-usage-stats-export-wf/branches/usage_stats_export_v2/dnet-openaire-usage-stats-export/src/main/java/eu/dnetlib/usagestats/export/IrusStats.java | ||
---|---|---|
103 | 103 |
String sql = "INSERT INTO downloads_stats SELECT s.source, d.id AS repository_id, ro.id as result_id, extract('year' from s.date::date) ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0') as date, s.count, '0' FROM sushilogtmp s, public.datasource_oids d, public.result_oids ro WHERE s.repository=d.orid AND s.rid=ro.orid AND metric_type='ft_total' AND s.source='IRUS-UK';"; |
104 | 104 |
stmt.executeUpdate(sql); |
105 | 105 |
|
106 |
sql = "INSERT INTO public.downloads_stats SELECT s.source, d.id AS repository_id, ro.id as result_id, extract('year' from s.date::date) ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0') as date, s.count, '0' FROM sushilogtmp s, public.datasource_oids d, public.result_oids ro WHERE s.repository=d.orid AND s.rid=ro.orid AND metric_type='ft_total' AND s.source='IRUS-UK';"; |
|
107 |
stmt.executeUpdate(sql); |
|
108 |
|
|
106 | 109 |
sql = "Insert into sushilog select * from sushilogtmp;"; |
107 | 110 |
stmt.executeUpdate(sql); |
108 | 111 |
|
modules/dnet-openaire-usage-stats-export-wf/branches/usage_stats_export_v2/dnet-openaire-usage-stats-export/src/main/java/eu/dnetlib/usagestats/export/SarcStats.java | ||
---|---|---|
85 | 85 |
String sql = "INSERT INTO downloads_stats SELECT s.source, d.id AS repository_id, ro.id as result_id, extract('year' from s.date::date) ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0') as date, s.count, '0' FROM sushilog s, public.datasource_oids d, public.datasource_results dr, public.result_pids ro WHERE d.orid LIKE '%' || s.repository || '%' AND dr.id=d.id AND dr.result=ro.id AND s.rid=ro.pid AND ro.type='doi' AND metric_type='ft_total' AND s.source='SARC-OJS';"; |
86 | 86 |
stmt.executeUpdate(sql); |
87 | 87 |
|
88 |
sql = "INSERT INTO public.downloads_stats SELECT s.source, d.id AS repository_id, ro.id as result_id, extract('year' from s.date::date) ||'/'|| LPAD(CAST(extract('month' from s.date::date) AS VARCHAR), 2, '0') as date, s.count, '0' FROM sushilog s, public.datasource_oids d, public.datasource_results dr, public.result_pids ro WHERE d.orid LIKE '%' || s.repository || '%' AND dr.id=d.id AND dr.result=ro.id AND s.rid=ro.pid AND ro.type='doi' AND metric_type='ft_total' AND s.source='SARC-OJS';"; |
|
89 |
stmt.executeUpdate(sql); |
|
88 | 90 |
stmt.close(); |
89 | 91 |
ConnectDB.getConnection().commit(); |
90 | 92 |
ConnectDB.getConnection().close(); |
modules/dnet-openaire-usage-stats-export-wf/branches/usage_stats_export_v2/dnet-openaire-usage-stats-export/src/main/java/eu/dnetlib/usagestats/export/PiwikStatsDB.java | ||
---|---|---|
25 | 25 |
|
26 | 26 |
public class PiwikStatsDB { |
27 | 27 |
|
28 |
private String logPath; |
|
29 | 28 |
private String logRepoPath; |
30 | 29 |
private String logPortalPath; |
31 | 30 |
|
... | ... | |
35 | 34 |
private String CounterRobotsURL; |
36 | 35 |
private ArrayList robotsList; |
37 | 36 |
|
38 |
|
|
39 | 37 |
public PiwikStatsDB(String logRepoPath, String logPortalPath) throws Exception { |
40 | 38 |
this.logRepoPath = logRepoPath; |
41 | 39 |
this.logPortalPath = logPortalPath; |
... | ... | |
44 | 42 |
} |
45 | 43 |
|
46 | 44 |
public void foo() { |
47 |
Stream<String> s = Arrays.stream(new String[] {"a", "b", "c", "d"});
|
|
45 |
Stream<String> s = Arrays.stream(new String[]{"a", "b", "c", "d"}); |
|
48 | 46 |
|
49 | 47 |
System.out.println(s.parallel().count()); |
50 | 48 |
} |
... | ... | |
143 | 141 |
this.robotsList = counterRobots.getRobotsPatterns(); |
144 | 142 |
|
145 | 143 |
processRepositoryLog(); |
146 |
log.info("repository process done"); |
|
144 |
log.info("OpenAIRE repository process done");
|
|
147 | 145 |
removeDoubleClicks(); |
148 |
log.info("removing double clicks done"); |
|
149 |
cleanOAI(); |
|
150 |
log.info("cleaning oai done"); |
|
146 |
log.info("OpenAIRE removing double clicks done"); |
|
151 | 147 |
|
152 |
viewsStats(); |
|
153 |
downloadsStats(); |
|
154 |
|
|
155 | 148 |
processPortalLog(); |
156 | 149 |
log.info("portal process done"); |
157 |
|
|
150 |
|
|
158 | 151 |
portalStats(); |
159 | 152 |
log.info("portal usagestats done"); |
160 | 153 |
|
154 |
cleanOAI(); |
|
155 |
log.info("OpenAIREcleaning oai done"); |
|
156 |
|
|
157 |
viewsStats(); |
|
158 |
log.info("OpenAIRE views stats done"); |
|
159 |
|
|
160 |
downloadsStats(); |
|
161 |
log.info("OpenAIRE downloads stats done"); |
|
162 |
|
|
161 | 163 |
updateProdTables(); |
162 | 164 |
log.info("updateProdTables done"); |
163 | 165 |
|
... | ... | |
177 | 179 |
// throw new Exception("Failed to create usage usagestats: " + e.toString(), e); |
178 | 180 |
// } |
179 | 181 |
// } |
180 |
|
|
181 | 182 |
public void processRepositoryLog() throws Exception { |
182 | 183 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
183 | 184 |
ConnectDB.getConnection().setAutoCommit(false); |
... | ... | |
294 | 295 |
|
295 | 296 |
sql = "CREATE TABLE IF NOT EXISTS pageviews_stats (like pageviews_stats_tmp including all)"; |
296 | 297 |
stmt.executeUpdate(sql); |
297 |
|
|
298 |
|
|
298 | 299 |
stmt.close(); |
299 | 300 |
ConnectDB.getConnection().commit(); |
300 | 301 |
ConnectDB.getConnection().close(); |
... | ... | |
323 | 324 |
// ConnectDB.getConnection().commit(); |
324 | 325 |
// ConnectDB.getConnection().close(); |
325 | 326 |
// } |
326 |
|
|
327 |
private void downloadsStats() throws Exception { |
|
327 |
private void downloadsStats() throws Exception { |
|
328 | 328 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
329 | 329 |
ConnectDB.getConnection().setAutoCommit(false); |
330 | 330 |
|
331 |
//String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;";
|
|
331 |
//String sql = "CREATE OR REPLACE VIEW result_downloads_monthly as select entity_id AS id, COUNT(entity_id) as downloads, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklog where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;"; |
|
332 | 332 |
String sql = "CREATE OR REPLACE VIEW result_downloads_monthly_tmp as select entity_id AS id, COUNT(entity_id) as downloads, SUM(CASE WHEN referrer_name LIKE '%openaire%' THEN 1 ELSE 0 END) AS openaire_referrer, extract('year' from timestamp::date) ||'/'|| LPAD(CAST(extract('month' from timestamp::date) AS VARCHAR), 2, '0') AS month, source FROM piwiklogtmp where action='download' and (source_item_type='oaItem' or source_item_type='repItem') group by id, month, source order by source, id, month;"; |
333 | 333 |
stmt.executeUpdate(sql); |
334 | 334 |
|
... | ... | |
336 | 336 |
// sql = "SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire INTO downloads_stats FROM result_downloads_monthly p, datasource d, result_oids ro where p.source!='5' AND p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;"; |
337 | 337 |
sql = "CREATE TABLE IF NOT EXISTS downloads_stats_tmp AS SELECT 'OpenAIRE'::TEXT as source, d.id as repository_id, ro.id as result_id, month as date, max(downloads) AS count, max(openaire_referrer) AS openaire FROM result_downloads_monthly_tmp p, public.datasource d, public.result_oids ro where p.source=d.piwik_id and p.id=ro.orid group by repository_id, result_id, date ORDER BY repository_id, result_id, date;"; |
338 | 338 |
stmt.executeUpdate(sql); |
339 |
|
|
339 |
|
|
340 | 340 |
sql = "CREATE TABLE IF NOT EXISTS downloads_stats (like downloads_stats_tmp including all)"; |
341 | 341 |
stmt.executeUpdate(sql); |
342 |
|
|
343 |
sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp;"; |
|
344 |
stmt.executeUpdate(sql); |
|
345 | 342 |
|
343 |
//sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp;"; |
|
344 |
//stmt.executeUpdate(sql); |
|
345 |
|
|
346 | 346 |
stmt.close(); |
347 | 347 |
ConnectDB.getConnection().commit(); |
348 | 348 |
ConnectDB.getConnection().close(); |
... | ... | |
359 | 359 |
int diffMonth = diffYear * 12 + endCalendar.get(Calendar.MONTH) - startCalendar.get(Calendar.MONTH); |
360 | 360 |
|
361 | 361 |
// String sql = "SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date INTO full_dates FROM generate_series(0, " + diffMonth + ", 1) AS offs;"; |
362 |
String sql = "CREATE TABLE IF NOT EXISTS full_dates AS SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;";
|
|
362 |
String sql = "CREATE TABLE IF NOT EXISTS full_dates(full_date TEXT)";
|
|
363 | 363 |
stmt.executeUpdate(sql); |
364 |
|
|
365 |
sql = "INSERT INTO full_dates SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;"; |
|
366 |
stmt.executeUpdate(sql); |
|
364 | 367 |
|
368 |
//sql = "INSERT INTO public.full_dates SELECT to_char(date_trunc('month', ('2016-01-01'::date + interval '1 month'*offs)), 'YYYY/MM') AS full_date FROM generate_series(0, " + diffMonth + ", 1) AS offs;"; |
|
369 |
//stmt.executeUpdate(sql); |
|
370 |
|
|
365 | 371 |
sql = "CREATE INDEX IF NOT EXISTS full_dates_full_date ON full_dates USING btree(full_date);"; |
366 | 372 |
stmt.executeUpdate(sql); |
367 | 373 |
|
... | ... | |
404 | 410 |
// sql = "SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views INTO usage_stats FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;"; |
405 | 411 |
sql = "CREATE TABLE IF NOT EXISTS usage_stats AS SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;"; |
406 | 412 |
stmt.executeUpdate(sql); |
413 |
|
|
414 |
sql = "INSERT INTO usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats_tmp AS ds FULL OUTER JOIN views_stats_tmp AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;"; |
|
415 |
stmt.executeUpdate(sql); |
|
416 |
|
|
417 |
sql = "INSERT INTO usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM la_downloads_stats_tmp AS ds FULL OUTER JOIN la_views_stats_tmp AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;"; |
|
418 |
stmt.executeUpdate(sql); |
|
407 | 419 |
|
408 |
sql = "INSERT INTO usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats_tmp AS ds FULL OUTER JOIN views_stats AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
|
|
420 |
sql = "INSERT INTO public.usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM downloads_stats_tmp AS ds FULL OUTER JOIN views_stats_tmp AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;";
|
|
409 | 421 |
stmt.executeUpdate(sql); |
410 | 422 |
|
423 |
sql = "INSERT INTO public.usage_stats SELECT coalesce(ds.source, vs.source) as source, coalesce(ds.repository_id, vs.repository_id) as repository_id, coalesce(ds.result_id, vs.result_id) as result_id, coalesce(ds.date, vs.date) as date, coalesce(ds.count, 0) as downloads, coalesce(vs.count, 0) as views, coalesce(ds.openaire, 0) as openaire_downloads, coalesce(vs.openaire, 0) as openaire_views FROM la_downloads_stats_tmp AS ds FULL OUTER JOIN la_views_stats_tmp AS vs ON ds.source=vs.source AND ds.repository_id=vs.repository_id AND ds.result_id=vs.result_id AND ds.date=vs.date;"; |
|
424 |
stmt.executeUpdate(sql); |
|
425 |
|
|
411 | 426 |
sql = "CREATE INDEX IF NOT EXISTS usage_stats_source ON usage_stats USING btree(source);"; |
412 | 427 |
stmt.executeUpdate(sql); |
413 | 428 |
|
... | ... | |
419 | 434 |
|
420 | 435 |
sql = "CREATE INDEX IF NOT EXISTS usage_stats_date ON usage_stats USING btree(date);"; |
421 | 436 |
stmt.executeUpdate(sql); |
422 |
|
|
423 |
sql = "DROP TABLE IF EXISTS process_portal_log_tmp;";
|
|
437 |
|
|
438 |
sql = "DROP VIEW IF EXISTS result_downloads_monthly_tmp";
|
|
424 | 439 |
stmt.executeUpdate(sql); |
425 | 440 |
|
426 |
sql = "DROP TABLE IF EXISTS pageviews_stats_tmp;";
|
|
441 |
sql = "DROP VIEW IF EXISTS result_views_monthly_tmp";
|
|
427 | 442 |
stmt.executeUpdate(sql); |
428 | 443 |
|
429 |
sql = "DROP VIEW IF EXISTS result_views_monthly_tmp"; |
|
444 |
|
|
445 |
sql = "DROP TABLE IF EXISTS views_stats_tmp;"; |
|
430 | 446 |
stmt.executeUpdate(sql); |
431 | 447 |
|
432 |
sql = "DROP TABLE IF EXISTS views_stats_tmp;"; |
|
433 |
stmt.executeUpdate(sql); |
|
434 |
|
|
435 | 448 |
sql = "DROP TABLE IF EXISTS downloads_stats_tmp;"; |
436 | 449 |
stmt.executeUpdate(sql); |
437 |
|
|
450 |
|
|
451 |
sql = "DROP TABLE IF EXISTS pageviews_stats_tmp;"; |
|
452 |
stmt.executeUpdate(sql); |
|
453 |
|
|
438 | 454 |
sql = "DROP TABLE IF EXISTS process_portal_log_tmp;"; |
439 | 455 |
stmt.executeUpdate(sql); |
440 | 456 |
|
... | ... | |
444 | 460 |
sql = "DROP TABLE IF EXISTS sushilogtmp;"; |
445 | 461 |
stmt.executeUpdate(sql); |
446 | 462 |
|
463 |
sql = "DROP VIEW IF EXISTS la_result_views_monthly_tmp;"; |
|
464 |
stmt.executeUpdate(sql); |
|
465 |
|
|
466 |
sql = "DROP VIEW IF EXISTS la_result_downloads_monthly_tmp;"; |
|
467 |
stmt.executeUpdate(sql); |
|
468 |
|
|
469 |
sql = "DROP TABLE IF EXISTS la_downloads_stats_tmp;"; |
|
470 |
stmt.executeUpdate(sql); |
|
471 |
|
|
472 |
sql = "DROP TABLE IF EXISTS la_views_stats_tmp;"; |
|
473 |
stmt.executeUpdate(sql); |
|
474 |
|
|
475 |
|
|
476 |
sql = "DROP TABLE IF EXISTS lareferencialogtmp;"; |
|
477 |
stmt.executeUpdate(sql); |
|
478 |
|
|
447 | 479 |
stmt.close(); |
448 | 480 |
ConnectDB.getConnection().commit(); |
449 | 481 |
ConnectDB.getConnection().close(); |
... | ... | |
644 | 676 |
stmt.close(); |
645 | 677 |
ConnectDB.getConnection().close(); |
646 | 678 |
} |
647 |
|
|
679 |
|
|
648 | 680 |
public void portalStats() throws SQLException { |
649 | 681 |
Connection con = ConnectDB.getConnection(); |
650 | 682 |
Statement stmt = con.createStatement(); |
... | ... | |
670 | 702 |
stmt = con.createStatement(); |
671 | 703 |
sql = "INSERT INTO piwiklogtmp SELECT DISTINCT source, id_visit, country, action, url, roid.orid, \'project\', timestamp, referrer_name, agent FROM process_portal_log_tmp, public.project_oids roid WHERE entity_id IS NOT null AND entity_id=roid.orid AND roid.orid IS NOT null;"; |
672 | 704 |
stmt.executeUpdate(sql); |
673 |
// stmt.close(); |
|
705 |
// stmt.close();
|
|
674 | 706 |
// con.commit(); |
675 | 707 |
stmt.close(); |
676 | 708 |
ConnectDB.getConnection().commit(); |
... | ... | |
893 | 925 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
894 | 926 |
ConnectDB.getConnection().setAutoCommit(false); |
895 | 927 |
|
896 |
String sql = "insert into piwiklog select * from piwiklogtmp;"; |
|
897 |
stmt.executeUpdate(sql); |
|
898 |
|
|
928 |
String sql = "insert into piwiklog select * from piwiklogtmp;";
|
|
929 |
stmt.executeUpdate(sql);
|
|
930 |
|
|
899 | 931 |
sql = "insert into views_stats select * from views_stats_tmp;"; |
900 | 932 |
stmt.executeUpdate(sql); |
901 |
|
|
933 |
|
|
934 |
sql = "insert into public.views_stats select * from views_stats_tmp;"; |
|
935 |
stmt.executeUpdate(sql); |
|
936 |
|
|
902 | 937 |
sql = "insert into downloads_stats select * from downloads_stats_tmp;"; |
903 | 938 |
stmt.executeUpdate(sql); |
904 | 939 |
|
940 |
sql = "insert into public.downloads_stats select * from downloads_stats_tmp;"; |
|
941 |
stmt.executeUpdate(sql); |
|
942 |
|
|
905 | 943 |
sql = "insert into pageviews_stats select * from pageviews_stats_tmp;"; |
906 | 944 |
stmt.executeUpdate(sql); |
907 | 945 |
|
946 |
sql = "insert into public.pageviews_stats select * from pageviews_stats_tmp;"; |
|
947 |
stmt.executeUpdate(sql); |
|
948 |
|
|
908 | 949 |
stmt.close(); |
909 | 950 |
ConnectDB.getConnection().commit(); |
910 | 951 |
ConnectDB.getConnection().close(); |
... | ... | |
928 | 969 |
|
929 | 970 |
hdfs.close(); |
930 | 971 |
} catch (Exception e) { |
931 |
log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logPath)); |
|
932 |
throw new Exception("HDFS file path with exported data does not exist : " + logPath, e); |
|
972 |
log.error("HDFS file path with exported data does not exist : " + new Path(hdfs.getUri() + logRepoPath));
|
|
973 |
throw new Exception("HDFS file path with exported data does not exist : " + logRepoPath, e);
|
|
933 | 974 |
} |
934 | 975 |
|
935 | 976 |
return fileNames; |
modules/dnet-openaire-usage-stats-export-wf/branches/usage_stats_export_v2/dnet-openaire-usage-stats-export/src/main/java/eu/dnetlib/usagestats/export/UsageStatsExporter.java | ||
---|---|---|
23 | 23 |
String portalLogPath = properties.getProperty("portal_LogPath"); |
24 | 24 |
String portalMatomoID = properties.getProperty("portal_MatomoID"); |
25 | 25 |
String irusUKBaseURL = properties.getProperty("IRUS_UK_BaseUrl"); |
26 |
String lareferenciaLogPath = properties.getProperty("lareferenciaLogPath"); |
|
27 |
String lareferenciaBaseURL=properties.getProperty("LaReferenciaBaseUrl"); |
|
28 |
String lareferenciaAuthToken=properties.getProperty("LareferenciaAuthToken"); |
|
29 |
|
|
26 | 30 |
|
27 | 31 |
//connect to DB |
28 | 32 |
ConnectDB.init(properties); |
... | ... | |
36 | 40 |
PiwikStatsDB piwikstatsdb = new PiwikStatsDB(repoLogPath, portalLogPath); |
37 | 41 |
piwikstatsdb.setCounterRobotsURL(properties.getProperty("COUNTER_robots_Url")); |
38 | 42 |
piwikstatsdb.processLogs(); |
39 |
log.info("process logs done");
|
|
43 |
log.info("OpenAIRE logs done");
|
|
40 | 44 |
|
45 |
LaReferenciaDownloadLogs lrf = new LaReferenciaDownloadLogs(lareferenciaBaseURL,lareferenciaAuthToken); |
|
46 |
lrf.GetLaReferenciaRepos(lareferenciaLogPath); |
|
47 |
LaReferenciaStats lastats = new LaReferenciaStats(lareferenciaLogPath); |
|
48 |
lastats.processLogs(); |
|
49 |
log.info("LaReferencia logs done"); |
|
50 |
|
|
41 | 51 |
IrusStats irusstats = new IrusStats(irusUKBaseURL); |
42 | 52 |
irusstats.processIrusRRReport(); |
43 | 53 |
irusstats.irusStats(); |
44 |
log.info("irus done");
|
|
54 |
log.info("Irus-UK done");
|
|
45 | 55 |
|
46 | 56 |
SarcStats sarcStats = new SarcStats(); |
47 | 57 |
sarcStats.processSarc(); |
48 | 58 |
sarcStats.sarcStats(); |
49 |
log.info("sarc done");
|
|
59 |
log.info("SARC-OJS done");
|
|
50 | 60 |
|
51 | 61 |
//finalize usagestats |
52 | 62 |
piwikstatsdb.finalizeStats(); |
modules/dnet-openaire-usage-stats-export-wf/branches/usage_stats_export_v2/dnet-openaire-usage-stats-export/src/main/java/eu/dnetlib/usagestats/export/LaReferenciaDownloadLogs.java | ||
---|---|---|
1 |
package eu.dnetlib.usagestats.export; |
|
2 |
|
|
3 |
import org.apache.hadoop.conf.Configuration; |
|
4 |
import org.apache.hadoop.fs.FSDataOutputStream; |
|
5 |
import org.apache.hadoop.fs.Path; |
|
6 |
import org.apache.hadoop.fs.FileSystem; |
|
7 |
import org.apache.log4j.Logger; |
|
8 |
|
|
9 |
import java.io.*; |
|
10 |
import java.net.URL; |
|
11 |
import java.net.URLConnection; |
|
12 |
import java.sql.PreparedStatement; |
|
13 |
import java.sql.ResultSet; |
|
14 |
import java.sql.Statement; |
|
15 |
import java.text.SimpleDateFormat; |
|
16 |
import java.util.Date; |
|
17 |
import java.util.Calendar; |
|
18 |
import org.json.simple.JSONArray; |
|
19 |
import org.json.simple.JSONObject; |
|
20 |
import org.json.simple.parser.JSONParser; |
|
21 |
|
|
22 |
public class LaReferenciaDownloadLogs { |
|
23 |
|
|
24 |
private final String piwikUrl; |
|
25 |
private Date startDate; |
|
26 |
private final String tokenAuth; |
|
27 |
|
|
28 |
/* |
|
29 |
The Piwik's API method |
|
30 |
*/ |
|
31 |
private final String APImethod = "?module=API&method=Live.getLastVisitsDetails"; |
|
32 |
private final String format = "&format=json"; |
|
33 |
private final String ApimethodGetAllSites = "?module=API&method=SitesManager.getSitesWithViewAccess"; |
|
34 |
|
|
35 |
private final Logger log = Logger.getLogger(this.getClass()); |
|
36 |
|
|
37 |
public LaReferenciaDownloadLogs(String piwikUrl, String tokenAuth) throws Exception { |
|
38 |
this.piwikUrl = piwikUrl; |
|
39 |
this.tokenAuth = tokenAuth; |
|
40 |
this.createTables(); |
|
41 |
this.createTmpTables(); |
|
42 |
} |
|
43 |
private void createTables() throws Exception { |
|
44 |
try { |
|
45 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
|
46 |
String sqlCreateTableLareferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialog(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));"; |
|
47 |
String sqlcreateRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " |
|
48 |
+ " ON INSERT TO lareferencialog " |
|
49 |
+ " WHERE (EXISTS ( SELECT lareferencialog.matomoid, lareferencialog.source, lareferencialog.id_visit," |
|
50 |
+ "lareferencialog.action, lareferencialog.\"timestamp\", lareferencialog.entity_id " |
|
51 |
+ "FROM lareferencialog " |
|
52 |
+ "WHERE lareferencialog.matomoid=new.matomoid AND lareferencialog.source = new.source AND lareferencialog.id_visit = new.id_visit AND lareferencialog.action = new.action AND lareferencialog.entity_id = new.entity_id AND lareferencialog.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;"; |
|
53 |
String sqlCreateRuleIndexLaReferenciaLog = "create index if not exists lareferencialog_rule on lareferencialog(matomoid, source, id_visit, action, entity_id, \"timestamp\");"; |
|
54 |
stmt.executeUpdate(sqlCreateTableLareferenciaLog); |
|
55 |
stmt.executeUpdate(sqlcreateRuleLaReferenciaLog); |
|
56 |
stmt.executeUpdate(sqlCreateRuleIndexLaReferenciaLog); |
|
57 |
|
|
58 |
stmt.close(); |
|
59 |
ConnectDB.getConnection().close(); |
|
60 |
log.info("Lareferencia Tables Created"); |
|
61 |
|
|
62 |
} catch (Exception e) { |
|
63 |
log.error("Failed to create tables: " + e); |
|
64 |
throw new Exception("Failed to create tables: " + e.toString(), e); |
|
65 |
//System.exit(0); |
|
66 |
} |
|
67 |
} |
|
68 |
|
|
69 |
private void createTmpTables() throws Exception { |
|
70 |
|
|
71 |
try { |
|
72 |
Statement stmt = ConnectDB.getConnection().createStatement(); |
|
73 |
String sqlCreateTmpTableLaReferenciaLog = "CREATE TABLE IF NOT EXISTS lareferencialogtmp(matomoid INTEGER, source TEXT, id_visit TEXT, country TEXT, action TEXT, url TEXT, entity_id TEXT, source_item_type TEXT, timestamp TEXT, referrer_name TEXT, agent TEXT, PRIMARY KEY(source, id_visit, action, timestamp, entity_id));"; |
|
74 |
String sqlcreateTmpRuleLaReferenciaLog = "CREATE OR REPLACE RULE ignore_duplicate_inserts AS " |
|
75 |
+ " ON INSERT TO lareferencialogtmp " |
|
76 |
+ " WHERE (EXISTS ( SELECT lareferencialogtmp.matomoid, lareferencialogtmp.source, lareferencialogtmp.id_visit," |
|
77 |
+ "lareferencialogtmp.action, lareferencialogtmp.\"timestamp\", lareferencialogtmp.entity_id " |
|
78 |
+ "FROM lareferencialogtmp " |
|
79 |
+ "WHERE lareferencialogtmp.matomoid=new.matomoid AND lareferencialogtmp.source = new.source AND lareferencialogtmp.id_visit = new.id_visit AND lareferencialogtmp.action = new.action AND lareferencialogtmp.entity_id = new.entity_id AND lareferencialogtmp.\"timestamp\" = new.\"timestamp\")) DO INSTEAD NOTHING;"; |
|
80 |
stmt.executeUpdate(sqlCreateTmpTableLaReferenciaLog); |
|
81 |
stmt.executeUpdate(sqlcreateTmpRuleLaReferenciaLog); |
|
82 |
|
|
83 |
stmt.close(); |
|
84 |
log.info("Lareferencia Tmp Tables Created"); |
|
85 |
|
|
86 |
} catch (Exception e) { |
|
87 |
log.error("Failed to create tmptables: " + e); |
|
88 |
throw new Exception("Failed to create tmp tables: " + e.toString(), e); |
|
89 |
//System.exit(0); |
|
90 |
} |
|
91 |
} |
|
92 |
private String getPiwikLogUrl() { |
|
93 |
return piwikUrl + "/"; |
|
94 |
} |
|
95 |
|
|
96 |
private String getJson(String url) throws Exception { |
|
97 |
try { |
|
98 |
URL website = new URL(url); |
|
99 |
URLConnection connection = website.openConnection(); |
|
100 |
|
|
101 |
StringBuilder response; |
|
102 |
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { |
|
103 |
response = new StringBuilder(); |
|
104 |
String inputLine; |
|
105 |
while ((inputLine = in.readLine()) != null) { |
|
106 |
response.append(inputLine); |
|
107 |
response.append("\n"); |
|
108 |
} |
|
109 |
} |
|
110 |
return response.toString(); |
|
111 |
} catch (Exception e) { |
|
112 |
log.error("Failed to get URL: " + e); |
|
113 |
throw new Exception("Failed to get URL: " + e.toString(), e); |
|
114 |
} |
|
115 |
} |
|
116 |
|
|
117 |
public void GetLaReferenciaRepos(String repoLogsPath) throws Exception { |
|
118 |
|
|
119 |
String baseApiUrl = getPiwikLogUrl() + ApimethodGetAllSites + format + "&token_auth=" + this.tokenAuth; |
|
120 |
String content = ""; |
|
121 |
|
|
122 |
content = getJson(baseApiUrl); |
|
123 |
JSONParser parser = new JSONParser(); |
|
124 |
JSONArray jsonArray = (JSONArray) parser.parse(content); |
|
125 |
for (Object aJsonArray : jsonArray) { |
|
126 |
JSONObject jsonObjectRow = (JSONObject) aJsonArray; |
|
127 |
int idSite = Integer.parseInt(jsonObjectRow.get("idsite").toString()); |
|
128 |
this.GetLaReFerenciaLogs(repoLogsPath, idSite); |
|
129 |
} |
|
130 |
} |
|
131 |
|
|
132 |
public void GetLaReFerenciaLogs(String repoLogsPath, |
|
133 |
int laReferencialMatomoID) throws Exception { |
|
134 |
|
|
135 |
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("YYYY-MM"); |
|
136 |
|
|
137 |
Calendar start = Calendar.getInstance(); |
|
138 |
start.set(Calendar.YEAR, 2020); |
|
139 |
start.set(Calendar.MONTH, Calendar.JANUARY); |
|
140 |
start.set(Calendar.DAY_OF_MONTH, 1); |
|
141 |
|
|
142 |
Calendar end = Calendar.getInstance(); |
|
143 |
end.add(Calendar.DAY_OF_MONTH, -1); |
|
144 |
|
|
145 |
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd"); |
|
146 |
PreparedStatement st = ConnectDB.getConnection().prepareStatement("SELECT max(timestamp) FROM lareferencialog WHERE matomoid=? HAVING max(timestamp) is not null;"); |
|
147 |
st.setInt(1, laReferencialMatomoID); |
|
148 |
|
|
149 |
ResultSet rs_date = st.executeQuery(); |
|
150 |
while (rs_date.next()) { |
|
151 |
if (rs_date.getString(1) != null && !rs_date.getString(1).equals("null") && !rs_date.getString(1).equals("")) { |
|
152 |
start.setTime(sdf.parse(rs_date.getString(1))); |
|
153 |
} |
|
154 |
} |
|
155 |
rs_date.close(); |
|
156 |
|
|
157 |
for (Date date = start.getTime(); start.before(end); start.add(Calendar.DATE, 1), date = start.getTime()) { |
|
158 |
log.info("Downloading logs for LaReferencia repoid " + laReferencialMatomoID + " and for " + sdf.format(date)); |
|
159 |
|
|
160 |
String period = "&period=day&date=" + sdf.format(date); |
|
161 |
String outFolder = ""; |
|
162 |
outFolder = repoLogsPath; |
|
163 |
|
|
164 |
FileSystem fs = FileSystem.get(new Configuration()); |
|
165 |
|
|
166 |
String baseApiUrl = getPiwikLogUrl() + APImethod + "&idSite=" + laReferencialMatomoID + period + format + "&expanded=5&filter_limit=1000&token_auth=" + tokenAuth; |
|
167 |
String content = ""; |
|
168 |
|
|
169 |
int i = 0; |
|
170 |
|
|
171 |
while (!content.equals("[]\n")) { |
|
172 |
|
|
173 |
FSDataOutputStream fin = fs.create(new Path(outFolder + "/" + laReferencialMatomoID + "_LaRefPiwiklog" + sdf.format((date)) + "_" + i + ".json"), true); |
|
174 |
String apiUrl = baseApiUrl; |
|
175 |
|
|
176 |
if (i > 0) { |
|
177 |
apiUrl += "&filter_offset=" + (i * 1000); |
|
178 |
} |
|
179 |
|
|
180 |
content = getJson(apiUrl); |
|
181 |
|
|
182 |
fin.write(content.getBytes()); |
|
183 |
|
|
184 |
i++; |
|
185 |
fin.close(); |
|
186 |
} |
|
187 |
//fin.close(); |
|
188 |
//out.close(); |
|
189 |
|
|
190 |
} |
|
191 |
|
|
192 |
// } |
|
193 |
} |
|
194 |
} |
modules/dnet-openaire-usage-stats-export-wf/branches/usage_stats_export_v2/dnet-openaire-usage-stats-export/src/main/resources/usagestats.properties | ||
---|---|---|
6 | 6 |
Stats_db_Schema=usagestats |
7 | 7 |
matomo_AuthToken=703bd17d845acdaf795e01bb1e0895b9 |
8 | 8 |
matomo_BaseUrl=analytics.openaire.eu |
9 |
matomo.LaReferenciaBaseUrl=http://matomo.lareferencia.info |
|
10 |
matomo.LareferenciaAuthToken=484874b3655d5a831eb8db33695790c4 |
|
9 | 11 |
repo_LogPath=/Users/dpie/Desktop/Repologs/ |
10 | 12 |
portal_LogPath=/Users/dpie/Desktop/Portallogs/ |
13 |
lareferencia.LogPath=/Volumes/Ulysses/LaReferenciaLogs |
|
11 | 14 |
portal_MatomoID=109 |
12 | 15 |
COUNTER_robots_Url=https://raw.githubusercontent.com/atmire/COUNTER-Robots/master/COUNTER_Robots_list.json |
13 | 16 |
IRUS_UK_BaseUrl=https://irus.jisc.ac.uk/api/sushilite/v1_7/ |
Also available in: Unified diff
Added LaReferencia Repos to WF