Revision 34412
Added by Michele Artini almost 10 years ago
modules/dnet-information-service/trunk/src/main/java/eu/dnetlib/enabling/is/sn/NotificationMessage.java | ||
---|---|---|
1 |
package eu.dnetlib.enabling.is.sn; |
|
2 |
|
|
3 |
public class NotificationMessage { |
|
4 |
|
|
5 |
private String serviceId; |
|
6 |
private String subscriptionId; |
|
7 |
private String topic; |
|
8 |
private String isId; |
|
9 |
private String message; |
|
10 |
|
|
11 |
public NotificationMessage() {} |
|
12 |
|
|
13 |
public NotificationMessage(final String serviceId, final String subscriptionId, final String topic, final String isId, final String message) { |
|
14 |
this.serviceId = serviceId; |
|
15 |
this.subscriptionId = subscriptionId; |
|
16 |
this.topic = topic; |
|
17 |
this.isId = isId; |
|
18 |
this.message = message; |
|
19 |
} |
|
20 |
|
|
21 |
public String getServiceId() { |
|
22 |
return serviceId; |
|
23 |
} |
|
24 |
|
|
25 |
public void setServiceId(final String serviceId) { |
|
26 |
this.serviceId = serviceId; |
|
27 |
} |
|
28 |
|
|
29 |
public String getSubscriptionId() { |
|
30 |
return subscriptionId; |
|
31 |
} |
|
32 |
|
|
33 |
public void setSubscriptionId(final String subscriptionId) { |
|
34 |
this.subscriptionId = subscriptionId; |
|
35 |
} |
|
36 |
|
|
37 |
public String getTopic() { |
|
38 |
return topic; |
|
39 |
} |
|
40 |
|
|
41 |
public void setTopic(final String topic) { |
|
42 |
this.topic = topic; |
|
43 |
} |
|
44 |
|
|
45 |
public String getIsId() { |
|
46 |
return isId; |
|
47 |
} |
|
48 |
|
|
49 |
public void setIsId(final String isId) { |
|
50 |
this.isId = isId; |
|
51 |
} |
|
52 |
|
|
53 |
public String getMessage() { |
|
54 |
return message; |
|
55 |
} |
|
56 |
|
|
57 |
public void setMessage(final String message) { |
|
58 |
this.message = message; |
|
59 |
} |
|
60 |
|
|
61 |
} |
modules/dnet-information-service/trunk/src/main/java/eu/dnetlib/enabling/is/sn/NotificationListener.java | ||
---|---|---|
29 | 29 |
|
30 | 30 |
private DatabaseUtils dbUtils; |
31 | 31 |
|
32 |
private NotificationDispatcher dispatcher; |
|
33 |
|
|
32 | 34 |
private boolean running = false; |
33 | 35 |
private Connection conn = null; |
34 | 36 |
private String query = ""; |
... | ... | |
81 | 83 |
final Map<String, Object> mapTest = obtainMapFromJson((String) map.get(operation == Operation.DELETE ? "old" : "new")); |
82 | 84 |
|
83 | 85 |
if (compareMaps(mapCond, mapTest)) { |
84 |
log.info("SENDING NOTIFICATIN TO " + map.get("subscriber")); |
|
85 |
// TODO |
|
86 |
log.info("SENDING NOTIFICATIN TO " + map.get("service_id")); |
|
87 |
final NotificationMessage message = new NotificationMessage(); |
|
88 |
message.setSubscriptionId((String) map.get("subscr_id")); |
|
89 |
message.setIsId("XXXX"); |
|
90 |
message.setMessage((String) map.get(operation == Operation.DELETE ? "old" : "new")); |
|
91 |
message.setServiceId((String) map.get("service_id")); |
|
92 |
message.setTopic(map.get("operation") + "/" + map.get("tbl")); |
|
93 |
dispatcher.sendNotification(message); |
|
86 | 94 |
} |
87 | 95 |
} |
88 | 96 |
} |
... | ... | |
130 | 138 |
public void setDbUtils(final DatabaseUtils dbUtils) { |
131 | 139 |
this.dbUtils = dbUtils; |
132 | 140 |
} |
141 |
|
|
142 |
public NotificationDispatcher getDispatcher() { |
|
143 |
return dispatcher; |
|
144 |
} |
|
145 |
|
|
146 |
@Required |
|
147 |
public void setDispatcher(final NotificationDispatcher dispatcher) { |
|
148 |
this.dispatcher = dispatcher; |
|
149 |
} |
|
133 | 150 |
} |
modules/dnet-information-service/trunk/src/main/java/eu/dnetlib/enabling/is/sn/NotificationDispatcher.java | ||
---|---|---|
1 |
package eu.dnetlib.enabling.is.sn; |
|
2 |
|
|
3 |
import java.util.concurrent.BlockingQueue; |
|
4 |
import java.util.concurrent.LinkedBlockingQueue; |
|
5 |
|
|
6 |
import javax.annotation.Resource; |
|
7 |
|
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
|
|
11 |
import eu.dnetlib.common.rmi.BaseService; |
|
12 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
|
13 |
|
|
14 |
public class NotificationDispatcher { |
|
15 |
|
|
16 |
@Resource |
|
17 |
private UniqueServiceLocator serviceLocator; |
|
18 |
|
|
19 |
private final BlockingQueue<NotificationMessage> queue = new LinkedBlockingQueue<NotificationMessage>(); |
|
20 |
|
|
21 |
private static final Log log = LogFactory.getLog(NotificationDispatcher.class); |
|
22 |
|
|
23 |
public void processQueue() { |
|
24 |
while (true) { |
|
25 |
final NotificationMessage message = queue.poll(); |
|
26 |
if (message == null) { |
|
27 |
return; |
|
28 |
} else { |
|
29 |
try { |
|
30 |
log.info("Invoking notify method..."); |
|
31 |
final String serviceId = message.getServiceId(); |
|
32 |
final BaseService service = serviceLocator.getService(BaseService.class, serviceId); |
|
33 |
service.notify(message.getSubscriptionId(), message.getTopic(), message.getIsId(), message.getMessage()); |
|
34 |
log.info("...done"); |
|
35 |
} catch (Throwable e) { |
|
36 |
log.error("Error notifying service", e); |
|
37 |
return; |
|
38 |
} |
|
39 |
} |
|
40 |
} |
|
41 |
} |
|
42 |
|
|
43 |
public void sendNotification(final NotificationMessage message) { |
|
44 |
queue.add(message); |
|
45 |
} |
|
46 |
} |
modules/dnet-information-service/trunk/src/main/resources/eu/dnetlib/enabling/is/applicationContext-is-database.xml | ||
---|---|---|
53 | 53 |
|
54 | 54 |
<!-- Notification Listener --> |
55 | 55 |
<bean id="isNotificationListener" class="eu.dnetlib.enabling.is.sn.NotificationListener" |
56 |
p:dbUtils-ref="isDatabaseUtils" |
|
56 |
p:dbUtils-ref="isDatabaseUtils" p:dispatcher-ref="isNotificationDispatcher"
|
|
57 | 57 |
init-method="init" destroy-method="destroy"/> |
58 | 58 |
|
59 |
<bean id="isNotificationDispatcher" class="eu.dnetlib.enabling.is.sn.NotificationDispatcher" /> |
|
60 |
|
|
59 | 61 |
<bean t:id="isJobSchedulerAccessor" |
60 | 62 |
class="org.springframework.scheduling.quartz.SchedulerAccessorBean" |
61 | 63 |
p:scheduler-ref="jobScheduler"> |
... | ... | |
69 | 71 |
p:targetObject-ref="isNotificationListener" p:targetMethod="verify" /> |
70 | 72 |
</property> |
71 | 73 |
</bean> |
74 |
<bean class="org.springframework.scheduling.quartz.SimpleTriggerBean" |
|
75 |
p:startDelay="${dnet.is.sn.dispatcher.startDelay}" p:repeatInterval="${dnet.is.sn.dispatcher.repeatInterval}"> |
|
76 |
<property name="jobDetail"> |
|
77 |
<bean |
|
78 |
class="org.springframework.scheduling.quartz.MethodInvokingJobDetailFactoryBean" |
|
79 |
p:targetObject-ref="isNotificationDispatcher" p:targetMethod="processQueue" /> |
|
80 |
</property> |
|
81 |
</bean> |
|
72 | 82 |
</list> |
73 | 83 |
</property> |
74 | 84 |
</bean> |
modules/dnet-information-service/trunk/src/main/resources/eu/dnetlib/enabling/is/applicationContext-is-database.properties | ||
---|---|---|
3 | 3 |
dnet.is.database.username = dnet |
4 | 4 |
dnet.is.database.password = dnetPwd |
5 | 5 |
dnet.is.database.resetAtStartup = true |
6 |
dnet.is.sn.listener.startDelay = 10000 |
|
7 |
dnet.is.sn.listener.repeatInterval = 5000 |
|
6 |
dnet.is.sn.listener.startDelay = 10000 |
|
7 |
dnet.is.sn.listener.repeatInterval = 5000 |
|
8 |
dnet.is.sn.dispatcher.startDelay = 20000 |
|
9 |
dnet.is.sn.dispatcher.repeatInterval = 5000 |
modules/dnet-information-service/trunk/src/main/resources/eu/dnetlib/enabling/is/templates/obtain_notifications.sql.st | ||
---|---|---|
1 | 1 |
select |
2 |
n.tbl, |
|
3 |
n.operation, |
|
4 |
n.date,
|
|
5 |
n.new,
|
|
6 |
n.old,
|
|
7 |
s.subscriber, |
|
8 |
s.condition |
|
2 |
n.tbl as tbl,
|
|
3 |
n.operation as operation,
|
|
4 |
n.new as new,
|
|
5 |
n.old as old,
|
|
6 |
s.id as subscr_id
|
|
7 |
s.subscriber as service_id,
|
|
8 |
s.condition as condition
|
|
9 | 9 |
from |
10 | 10 |
notifications n left outer join subscriptions s on (n.tbl = s.tbl and n.operation = s.operation) |
Also available in: Unified diff
Notification Dispatcher