Project

General

Profile

1
package eu.dnetlib.enabling.is.sn;
2

    
3
import java.io.IOException;
4
import java.sql.Connection;
5
import java.sql.ResultSet;
6
import java.sql.SQLException;
7
import java.sql.Statement;
8
import java.util.HashMap;
9
import java.util.Map;
10
import java.util.Map.Entry;
11

    
12
import org.apache.commons.lang.StringUtils;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.apache.cxf.helpers.IOUtils;
16
import org.postgresql.PGConnection;
17
import org.postgresql.PGNotification;
18
import org.springframework.beans.factory.annotation.Required;
19

    
20
import com.google.gson.Gson;
21
import com.google.gson.reflect.TypeToken;
22

    
23
import eu.dnetlib.enabling.is.jdbc.DatabaseUtils;
24
import eu.dnetlib.enabling.is.rmi.Operation;
25

    
26
public class NotificationListener {
27

    
28
	private static final Log log = LogFactory.getLog(NotificationListener.class);
29

    
30
	private DatabaseUtils dbUtils;
31

    
32
	private NotificationDispatcher dispatcher;
33

    
34
	private boolean running = false;
35
	private Connection conn = null;
36
	private String query = "";
37
	private Gson gson = new Gson();
38

    
39
	public void init() throws IOException {
40
		try {
41
			this.query = IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/enabling/is/templates/obtain_notifications.sql.st"));
42
			this.conn = dbUtils.getDataSource().getConnection();
43
			startListening("insert", "update", "delete");
44
		} catch (SQLException e) {
45
			log.error("Error opening connection", e);
46
		}
47
	}
48

    
49
	public void destroy() {
50
		try {
51
			if (conn != null && !conn.isClosed()) {
52
				conn.close();
53
			}
54
		} catch (SQLException e) {
55
			log.error("Error closing connection", e);
56
		}
57
	}
58

    
59
	public void verify() {
60
		if (running) { return; }
61

    
62
		synchronized (this) {
63
			running = true;
64

    
65
			try {
66
				final Statement stmt = conn.createStatement();
67
				final ResultSet rs = stmt.executeQuery("SELECT 1");
68
				rs.close();
69
				stmt.close();
70

    
71
				final PGNotification notifications[] = ((PGConnection) conn).getNotifications();
72

    
73
				if (notifications != null && notifications.length > 0) {
74
					if (log.isInfoEnabled()) {
75
						for (PGNotification notification : notifications) {
76
							log.info("Notification " + notification.getName() + " on table " + notification.getParameter());
77
						}
78
					}
79

    
80
					for (Map<String, Object> map : dbUtils.searchSimple(query)) {
81
						final Operation operation = Operation.valueOf((String) map.get("operation"));
82
						final Map<String, Object> mapCond = obtainMapFromJson((String) map.get("condition"));
83
						final Map<String, Object> mapTest = obtainMapFromJson((String) map.get(operation == Operation.DELETE ? "old" : "new"));
84

    
85
						if (compareMaps(mapCond, mapTest)) {
86
							log.info("SENDING NOTIFICATIN TO " + map.get("service_id"));
87
							final NotificationMessage message = new NotificationMessage();
88
							message.setSubscriptionId((String) map.get("subscr_id"));
89
							message.setIsId("XXXX");
90
							message.setMessage((String) map.get(operation == Operation.DELETE ? "old" : "new"));
91
							message.setServiceId((String) map.get("service_id"));
92
							message.setTopic(map.get("operation") + "/" + map.get("tbl"));
93
							dispatcher.sendNotification(message);
94
						}
95
					}
96
				}
97
			} catch (Exception e) {
98
				log.error(e);
99
			}
100
			running = false;
101
		}
102
	}
103

    
104
	private Map<String, Object> obtainMapFromJson(final String s) {
105
		if (StringUtils.isBlank(s)) {
106
			return new HashMap<String, Object>();
107
		} else {
108
			return gson.fromJson(s, new TypeToken<Map<String, Object>>() {}.getType());
109
		}
110
	}
111

    
112
	private boolean compareMaps(final Map<String, Object> cond, final Map<String, Object> map) {
113
		if (cond != null) {
114
			for (Entry<String, Object> ec : cond.entrySet()) {
115
				final Object v1 = ec.getValue();
116
				final Object v2 = map.get(ec.getKey());
117
				if (v1 != null && v2 != null && !v1.equals(v2)) { return false; }
118
				if (v1 == null && v2 != null) { return false; }
119
				if (v1 != null && v2 == null) { return false; }
120
			}
121
		}
122
		return true;
123
	}
124

    
125
	private void startListening(final String... messages) throws SQLException {
126
		for (String message : messages) {
127
			final Statement stmt = conn.createStatement();
128
			stmt.execute("LISTEN " + message);
129
			stmt.close();
130
		}
131
	}
132

    
133
	public DatabaseUtils getDbUtils() {
134
		return dbUtils;
135
	}
136

    
137
	@Required
138
	public void setDbUtils(final DatabaseUtils dbUtils) {
139
		this.dbUtils = dbUtils;
140
	}
141

    
142
	public NotificationDispatcher getDispatcher() {
143
		return dispatcher;
144
	}
145

    
146
	@Required
147
	public void setDispatcher(final NotificationDispatcher dispatcher) {
148
		this.dispatcher = dispatcher;
149
	}
150
}
(2-2/3)