1 |
34307
|
michele.ar
|
package eu.dnetlib.enabling.is.sn;
|
2 |
|
|
|
3 |
34408
|
michele.ar
|
import java.io.IOException;
|
4 |
34307
|
michele.ar
|
import java.sql.Connection;
|
5 |
|
|
import java.sql.ResultSet;
|
6 |
|
|
import java.sql.SQLException;
|
7 |
|
|
import java.sql.Statement;
|
8 |
34408
|
michele.ar
|
import java.util.HashMap;
|
9 |
|
|
import java.util.Map;
|
10 |
|
|
import java.util.Map.Entry;
|
11 |
34307
|
michele.ar
|
|
12 |
34408
|
michele.ar
|
import org.apache.commons.lang.StringUtils;
|
13 |
34307
|
michele.ar
|
import org.apache.commons.logging.Log;
|
14 |
|
|
import org.apache.commons.logging.LogFactory;
|
15 |
34408
|
michele.ar
|
import org.apache.cxf.helpers.IOUtils;
|
16 |
34307
|
michele.ar
|
import org.postgresql.PGConnection;
|
17 |
|
|
import org.postgresql.PGNotification;
|
18 |
|
|
import org.springframework.beans.factory.annotation.Required;
|
19 |
|
|
|
20 |
34408
|
michele.ar
|
import com.google.gson.Gson;
|
21 |
|
|
import com.google.gson.reflect.TypeToken;
|
22 |
|
|
|
23 |
|
|
import eu.dnetlib.enabling.is.jdbc.DatabaseUtils;
|
24 |
|
|
import eu.dnetlib.enabling.is.rmi.Operation;
|
25 |
|
|
|
26 |
34307
|
michele.ar
|
public class NotificationListener {
|
27 |
|
|
|
28 |
|
|
private static final Log log = LogFactory.getLog(NotificationListener.class);
|
29 |
|
|
|
30 |
34408
|
michele.ar
|
private DatabaseUtils dbUtils;
|
31 |
|
|
|
32 |
34412
|
michele.ar
|
private NotificationDispatcher dispatcher;
|
33 |
|
|
|
34 |
34307
|
michele.ar
|
private boolean running = false;
|
35 |
34408
|
michele.ar
|
private Connection conn = null;
|
36 |
|
|
private String query = "";
|
37 |
|
|
private Gson gson = new Gson();
|
38 |
34307
|
michele.ar
|
|
39 |
34408
|
michele.ar
|
public void init() throws IOException {
|
40 |
34307
|
michele.ar
|
try {
|
41 |
34408
|
michele.ar
|
this.query = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/enabling/is/templates/obtain_notifications.sql.st"));
|
42 |
|
|
this.conn = dbUtils.getDataSource().getConnection();
|
43 |
34307
|
michele.ar
|
startListening("insert", "update", "delete");
|
44 |
|
|
} catch (SQLException e) {
|
45 |
|
|
log.error("Error opening connection", e);
|
46 |
|
|
}
|
47 |
|
|
}
|
48 |
|
|
|
49 |
|
|
public void destroy() {
|
50 |
|
|
try {
|
51 |
|
|
if (conn != null && !conn.isClosed()) {
|
52 |
|
|
conn.close();
|
53 |
|
|
}
|
54 |
|
|
} catch (SQLException e) {
|
55 |
|
|
log.error("Error closing connection", e);
|
56 |
|
|
}
|
57 |
|
|
}
|
58 |
|
|
|
59 |
|
|
public void verify() {
|
60 |
|
|
if (running) { return; }
|
61 |
|
|
|
62 |
|
|
synchronized (this) {
|
63 |
|
|
running = true;
|
64 |
|
|
|
65 |
|
|
try {
|
66 |
|
|
final Statement stmt = conn.createStatement();
|
67 |
|
|
final ResultSet rs = stmt.executeQuery("SELECT 1");
|
68 |
|
|
rs.close();
|
69 |
|
|
stmt.close();
|
70 |
|
|
|
71 |
|
|
final PGNotification notifications[] = ((PGConnection) conn).getNotifications();
|
72 |
|
|
|
73 |
34408
|
michele.ar
|
if (notifications != null && notifications.length > 0) {
|
74 |
|
|
if (log.isInfoEnabled()) {
|
75 |
|
|
for (PGNotification notification : notifications) {
|
76 |
|
|
log.info("Notification " + notification.getName() + " on table " + notification.getParameter());
|
77 |
|
|
}
|
78 |
34307
|
michele.ar
|
}
|
79 |
34408
|
michele.ar
|
|
80 |
|
|
for (Map<String, Object> map : dbUtils.searchSimple(query)) {
|
81 |
|
|
final Operation operation = Operation.valueOf((String) map.get("operation"));
|
82 |
|
|
final Map<String, Object> mapCond = obtainMapFromJson((String) map.get("condition"));
|
83 |
|
|
final Map<String, Object> mapTest = obtainMapFromJson((String) map.get(operation == Operation.DELETE ? "old" : "new"));
|
84 |
|
|
|
85 |
|
|
if (compareMaps(mapCond, mapTest)) {
|
86 |
34412
|
michele.ar
|
log.info("SENDING NOTIFICATIN TO " + map.get("service_id"));
|
87 |
|
|
final NotificationMessage message = new NotificationMessage();
|
88 |
|
|
message.setSubscriptionId((String) map.get("subscr_id"));
|
89 |
|
|
message.setIsId("XXXX");
|
90 |
|
|
message.setMessage((String) map.get(operation == Operation.DELETE ? "old" : "new"));
|
91 |
|
|
message.setServiceId((String) map.get("service_id"));
|
92 |
|
|
message.setTopic(map.get("operation") + "/" + map.get("tbl"));
|
93 |
|
|
dispatcher.sendNotification(message);
|
94 |
34408
|
michele.ar
|
}
|
95 |
|
|
}
|
96 |
34307
|
michele.ar
|
}
|
97 |
|
|
} catch (Exception e) {
|
98 |
|
|
log.error(e);
|
99 |
|
|
}
|
100 |
|
|
running = false;
|
101 |
|
|
}
|
102 |
|
|
}
|
103 |
|
|
|
104 |
34408
|
michele.ar
|
private Map<String, Object> obtainMapFromJson(final String s) {
|
105 |
|
|
if (StringUtils.isBlank(s)) {
|
106 |
|
|
return new HashMap<String, Object>();
|
107 |
|
|
} else {
|
108 |
|
|
return gson.fromJson(s, new TypeToken<Map<String, Object>>() {}.getType());
|
109 |
|
|
}
|
110 |
|
|
}
|
111 |
|
|
|
112 |
|
|
private boolean compareMaps(final Map<String, Object> cond, final Map<String, Object> map) {
|
113 |
|
|
if (cond != null) {
|
114 |
|
|
for (Entry<String, Object> ec : cond.entrySet()) {
|
115 |
|
|
final Object v1 = ec.getValue();
|
116 |
|
|
final Object v2 = map.get(ec.getKey());
|
117 |
|
|
if (v1 != null && v2 != null && !v1.equals(v2)) { return false; }
|
118 |
|
|
if (v1 == null && v2 != null) { return false; }
|
119 |
|
|
if (v1 != null && v2 == null) { return false; }
|
120 |
|
|
}
|
121 |
|
|
}
|
122 |
|
|
return true;
|
123 |
|
|
}
|
124 |
|
|
|
125 |
34307
|
michele.ar
|
private void startListening(final String... messages) throws SQLException {
|
126 |
|
|
for (String message : messages) {
|
127 |
|
|
final Statement stmt = conn.createStatement();
|
128 |
|
|
stmt.execute("LISTEN " + message);
|
129 |
|
|
stmt.close();
|
130 |
|
|
}
|
131 |
|
|
}
|
132 |
|
|
|
133 |
34408
|
michele.ar
|
public DatabaseUtils getDbUtils() {
|
134 |
|
|
return dbUtils;
|
135 |
34307
|
michele.ar
|
}
|
136 |
|
|
|
137 |
|
|
@Required
|
138 |
34408
|
michele.ar
|
public void setDbUtils(final DatabaseUtils dbUtils) {
|
139 |
|
|
this.dbUtils = dbUtils;
|
140 |
34307
|
michele.ar
|
}
|
141 |
34412
|
michele.ar
|
|
142 |
|
|
public NotificationDispatcher getDispatcher() {
|
143 |
|
|
return dispatcher;
|
144 |
|
|
}
|
145 |
|
|
|
146 |
|
|
@Required
|
147 |
|
|
public void setDispatcher(final NotificationDispatcher dispatcher) {
|
148 |
|
|
this.dispatcher = dispatcher;
|
149 |
|
|
}
|
150 |
34307
|
michele.ar
|
}
|