Project

General

Profile

1 34307 michele.ar
package eu.dnetlib.enabling.is.sn;
2
3 34408 michele.ar
import java.io.IOException;
4 34307 michele.ar
import java.sql.Connection;
5
import java.sql.ResultSet;
6
import java.sql.SQLException;
7
import java.sql.Statement;
8 34408 michele.ar
import java.util.HashMap;
9
import java.util.Map;
10
import java.util.Map.Entry;
11 34307 michele.ar
12 34408 michele.ar
import org.apache.commons.lang.StringUtils;
13 34307 michele.ar
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15 34408 michele.ar
import org.apache.cxf.helpers.IOUtils;
16 34307 michele.ar
import org.postgresql.PGConnection;
17
import org.postgresql.PGNotification;
18
import org.springframework.beans.factory.annotation.Required;
19
20 34408 michele.ar
import com.google.gson.Gson;
21
import com.google.gson.reflect.TypeToken;
22
23
import eu.dnetlib.enabling.is.jdbc.DatabaseUtils;
24
import eu.dnetlib.enabling.is.rmi.Operation;
25
26 34307 michele.ar
public class NotificationListener {
27
28
	private static final Log log = LogFactory.getLog(NotificationListener.class);
29
30 34408 michele.ar
	private DatabaseUtils dbUtils;
31
32 34412 michele.ar
	private NotificationDispatcher dispatcher;
33
34 34307 michele.ar
	private boolean running = false;
35 34408 michele.ar
	private Connection conn = null;
36
	private String query = "";
37
	private Gson gson = new Gson();
38 34307 michele.ar
39 34408 michele.ar
	public void init() throws IOException {
40 34307 michele.ar
		try {
41 34408 michele.ar
			this.query = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/enabling/is/templates/obtain_notifications.sql.st"));
42
			this.conn = dbUtils.getDataSource().getConnection();
43 34307 michele.ar
			startListening("insert", "update", "delete");
44
		} catch (SQLException e) {
45
			log.error("Error opening connection", e);
46
		}
47
	}
48
49
	public void destroy() {
50
		try {
51
			if (conn != null && !conn.isClosed()) {
52
				conn.close();
53
			}
54
		} catch (SQLException e) {
55
			log.error("Error closing connection", e);
56
		}
57
	}
58
59
	public void verify() {
60
		if (running) { return; }
61
62
		synchronized (this) {
63
			running = true;
64
65
			try {
66
				final Statement stmt = conn.createStatement();
67
				final ResultSet rs = stmt.executeQuery("SELECT 1");
68
				rs.close();
69
				stmt.close();
70
71
				final PGNotification notifications[] = ((PGConnection) conn).getNotifications();
72
73 34408 michele.ar
				if (notifications != null && notifications.length > 0) {
74
					if (log.isInfoEnabled()) {
75
						for (PGNotification notification : notifications) {
76
							log.info("Notification " + notification.getName() + " on table " + notification.getParameter());
77
						}
78 34307 michele.ar
					}
79 34408 michele.ar
80
					for (Map<String, Object> map : dbUtils.searchSimple(query)) {
81
						final Operation operation = Operation.valueOf((String) map.get("operation"));
82
						final Map<String, Object> mapCond = obtainMapFromJson((String) map.get("condition"));
83
						final Map<String, Object> mapTest = obtainMapFromJson((String) map.get(operation == Operation.DELETE ? "old" : "new"));
84
85
						if (compareMaps(mapCond, mapTest)) {
86 34412 michele.ar
							log.info("SENDING NOTIFICATIN TO " + map.get("service_id"));
87
							final NotificationMessage message = new NotificationMessage();
88
							message.setSubscriptionId((String) map.get("subscr_id"));
89
							message.setIsId("XXXX");
90
							message.setMessage((String) map.get(operation == Operation.DELETE ? "old" : "new"));
91
							message.setServiceId((String) map.get("service_id"));
92
							message.setTopic(map.get("operation") + "/" + map.get("tbl"));
93
							dispatcher.sendNotification(message);
94 34408 michele.ar
						}
95
					}
96 34307 michele.ar
				}
97
			} catch (Exception e) {
98
				log.error(e);
99
			}
100
			running = false;
101
		}
102
	}
103
104 34408 michele.ar
	private Map<String, Object> obtainMapFromJson(final String s) {
105
		if (StringUtils.isBlank(s)) {
106
			return new HashMap<String, Object>();
107
		} else {
108
			return gson.fromJson(s, new TypeToken<Map<String, Object>>() {}.getType());
109
		}
110
	}
111
112
	private boolean compareMaps(final Map<String, Object> cond, final Map<String, Object> map) {
113
		if (cond != null) {
114
			for (Entry<String, Object> ec : cond.entrySet()) {
115
				final Object v1 = ec.getValue();
116
				final Object v2 = map.get(ec.getKey());
117
				if (v1 != null && v2 != null && !v1.equals(v2)) { return false; }
118
				if (v1 == null && v2 != null) { return false; }
119
				if (v1 != null && v2 == null) { return false; }
120
			}
121
		}
122
		return true;
123
	}
124
125 34307 michele.ar
	private void startListening(final String... messages) throws SQLException {
126
		for (String message : messages) {
127
			final Statement stmt = conn.createStatement();
128
			stmt.execute("LISTEN " + message);
129
			stmt.close();
130
		}
131
	}
132
133 34408 michele.ar
	public DatabaseUtils getDbUtils() {
134
		return dbUtils;
135 34307 michele.ar
	}
136
137
	@Required
138 34408 michele.ar
	public void setDbUtils(final DatabaseUtils dbUtils) {
139
		this.dbUtils = dbUtils;
140 34307 michele.ar
	}
141 34412 michele.ar
142
	public NotificationDispatcher getDispatcher() {
143
		return dispatcher;
144
	}
145
146
	@Required
147
	public void setDispatcher(final NotificationDispatcher dispatcher) {
148
		this.dispatcher = dispatcher;
149
	}
150 34307 michele.ar
}