Project

General

Profile

« Previous | Next » 

Revision 34412

Notification Dispatcher

View differences:

modules/dnet-information-service/trunk/src/main/java/eu/dnetlib/enabling/is/sn/NotificationMessage.java
1
package eu.dnetlib.enabling.is.sn;
2

  
3
public class NotificationMessage {
4

  
5
	private String serviceId;
6
	private String subscriptionId;
7
	private String topic;
8
	private String isId;
9
	private String message;
10

  
11
	public NotificationMessage() {}
12

  
13
	public NotificationMessage(final String serviceId, final String subscriptionId, final String topic, final String isId, final String message) {
14
		this.serviceId = serviceId;
15
		this.subscriptionId = subscriptionId;
16
		this.topic = topic;
17
		this.isId = isId;
18
		this.message = message;
19
	}
20

  
21
	public String getServiceId() {
22
		return serviceId;
23
	}
24

  
25
	public void setServiceId(final String serviceId) {
26
		this.serviceId = serviceId;
27
	}
28

  
29
	public String getSubscriptionId() {
30
		return subscriptionId;
31
	}
32

  
33
	public void setSubscriptionId(final String subscriptionId) {
34
		this.subscriptionId = subscriptionId;
35
	}
36

  
37
	public String getTopic() {
38
		return topic;
39
	}
40

  
41
	public void setTopic(final String topic) {
42
		this.topic = topic;
43
	}
44

  
45
	public String getIsId() {
46
		return isId;
47
	}
48

  
49
	public void setIsId(final String isId) {
50
		this.isId = isId;
51
	}
52

  
53
	public String getMessage() {
54
		return message;
55
	}
56

  
57
	public void setMessage(final String message) {
58
		this.message = message;
59
	}
60

  
61
}
modules/dnet-information-service/trunk/src/main/java/eu/dnetlib/enabling/is/sn/NotificationListener.java
29 29

  
30 30
	private DatabaseUtils dbUtils;
31 31

  
32
	private NotificationDispatcher dispatcher;
33

  
32 34
	private boolean running = false;
33 35
	private Connection conn = null;
34 36
	private String query = "";
......
81 83
						final Map<String, Object> mapTest = obtainMapFromJson((String) map.get(operation == Operation.DELETE ? "old" : "new"));
82 84

  
83 85
						if (compareMaps(mapCond, mapTest)) {
84
							log.info("SENDING NOTIFICATIN TO " + map.get("subscriber"));
85
							// TODO
86
							log.info("SENDING NOTIFICATIN TO " + map.get("service_id"));
87
							final NotificationMessage message = new NotificationMessage();
88
							message.setSubscriptionId((String) map.get("subscr_id"));
89
							message.setIsId("XXXX");
90
							message.setMessage((String) map.get(operation == Operation.DELETE ? "old" : "new"));
91
							message.setServiceId((String) map.get("service_id"));
92
							message.setTopic(map.get("operation") + "/" + map.get("tbl"));
93
							dispatcher.sendNotification(message);
86 94
						}
87 95
					}
88 96
				}
......
130 138
	public void setDbUtils(final DatabaseUtils dbUtils) {
131 139
		this.dbUtils = dbUtils;
132 140
	}
141

  
142
	public NotificationDispatcher getDispatcher() {
143
		return dispatcher;
144
	}
145

  
146
	@Required
147
	public void setDispatcher(final NotificationDispatcher dispatcher) {
148
		this.dispatcher = dispatcher;
149
	}
133 150
}
modules/dnet-information-service/trunk/src/main/java/eu/dnetlib/enabling/is/sn/NotificationDispatcher.java
1
package eu.dnetlib.enabling.is.sn;
2

  
3
import java.util.concurrent.BlockingQueue;
4
import java.util.concurrent.LinkedBlockingQueue;
5

  
6
import javax.annotation.Resource;
7

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10

  
11
import eu.dnetlib.common.rmi.BaseService;
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13

  
14
public class NotificationDispatcher {
15

  
16
	@Resource
17
	private UniqueServiceLocator serviceLocator;
18

  
19
	private final BlockingQueue<NotificationMessage> queue = new LinkedBlockingQueue<NotificationMessage>();
20

  
21
	private static final Log log = LogFactory.getLog(NotificationDispatcher.class);
22

  
23
	public void processQueue() {
24
		while (true) {
25
			final NotificationMessage message = queue.poll();
26
			if (message == null) {
27
				return;
28
			} else {
29
				try {
30
					log.info("Invoking notify method...");
31
					final String serviceId = message.getServiceId();
32
					final BaseService service = serviceLocator.getService(BaseService.class, serviceId);
33
					service.notify(message.getSubscriptionId(), message.getTopic(), message.getIsId(), message.getMessage());
34
					log.info("...done");
35
				} catch (Throwable e) {
36
					log.error("Error notifying service", e);
37
					return;
38
				}
39
			}
40
		}
41
	}
42

  
43
	public void sendNotification(final NotificationMessage message) {
44
		queue.add(message);
45
	}
46
}
modules/dnet-information-service/trunk/src/main/resources/eu/dnetlib/enabling/is/applicationContext-is-database.xml
53 53

  
54 54
	<!-- Notification Listener -->
55 55
	<bean id="isNotificationListener" class="eu.dnetlib.enabling.is.sn.NotificationListener"
56
		p:dbUtils-ref="isDatabaseUtils" 
56
		p:dbUtils-ref="isDatabaseUtils" p:dispatcher-ref="isNotificationDispatcher"
57 57
		init-method="init" destroy-method="destroy"/>
58 58

  
59
	<bean id="isNotificationDispatcher" class="eu.dnetlib.enabling.is.sn.NotificationDispatcher" />
60

  
59 61
	<bean t:id="isJobSchedulerAccessor"
60 62
		class="org.springframework.scheduling.quartz.SchedulerAccessorBean"
61 63
		p:scheduler-ref="jobScheduler">
......
69 71
							p:targetObject-ref="isNotificationListener" p:targetMethod="verify" />
70 72
					</property>
71 73
				</bean>
74
				<bean class="org.springframework.scheduling.quartz.SimpleTriggerBean"
75
					p:startDelay="${dnet.is.sn.dispatcher.startDelay}" p:repeatInterval="${dnet.is.sn.dispatcher.repeatInterval}">
76
					<property name="jobDetail">
77
						<bean
78
							class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean"
79
							p:targetObject-ref="isNotificationDispatcher" p:targetMethod="processQueue" />
80
					</property>
81
				</bean>
72 82
			</list>
73 83
		</property>
74 84
	</bean>
modules/dnet-information-service/trunk/src/main/resources/eu/dnetlib/enabling/is/applicationContext-is-database.properties
3 3
dnet.is.database.username       = dnet
4 4
dnet.is.database.password       = dnetPwd
5 5
dnet.is.database.resetAtStartup = true
6
dnet.is.sn.listener.startDelay     = 10000 
7
dnet.is.sn.listener.repeatInterval = 5000
6
dnet.is.sn.listener.startDelay     = 10000
7
dnet.is.sn.listener.repeatInterval = 5000
8
dnet.is.sn.dispatcher.startDelay     = 20000 
9
dnet.is.sn.dispatcher.repeatInterval = 5000
modules/dnet-information-service/trunk/src/main/resources/eu/dnetlib/enabling/is/templates/obtain_notifications.sql.st
1 1
select 
2
	n.tbl,
3
	n.operation,
4
	n.date,
5
	n.new,
6
	n.old,
7
	s.subscriber,
8
	s.condition
2
	n.tbl        as tbl,
3
	n.operation  as operation,
4
	n.new        as new,
5
	n.old        as old,
6
	s.id         as subscr_id
7
	s.subscriber as service_id,
8
	s.condition  as condition
9 9
from 
10 10
	notifications n left outer join subscriptions s on (n.tbl = s.tbl and n.operation = s.operation)

Also available in: Unified diff