Project

General

Profile

1
package eu.dnetlib.enabling.is.sn;
2

    
3
import java.util.concurrent.BlockingQueue;
4
import java.util.concurrent.LinkedBlockingQueue;
5

    
6
import javax.xml.ws.wsaddressing.W3CEndpointReference;
7

    
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10

    
11
/**
12
 * Asynchronous but in-order delivery of notifications.
13
 * 
14
 * @author marko
15
 * 
16
 */
17
public class AsynchronousNotificationSenderImpl extends AbstractNotificationSender implements Runnable { // NOPMD
18
	/**
19
	 * logger.
20
	 */
21
	private static final Log log = LogFactory.getLog(AsynchronousNotificationSenderImpl.class); // NOPMD by marko on 11/24/08 5:02 PM
22

    
23
	/**
24
	 * Encapsulates an notification job.
25
	 * 
26
	 * @author marko
27
	 *
28
	 */
29
	class NotificationJob {
30
		/**
31
		 * notification destination.
32
		 */
33
		private final transient W3CEndpointReference destination;
34
		
35
		/**
36
		 * notification message. 
37
		 */
38
		private final transient NotificationMessage message;
39

    
40
		/**
41
		 * construct a new notification job.
42
		 * 
43
		 * @param destination destination
44
		 * @param message message
45
		 */
46
		public NotificationJob(final W3CEndpointReference destination, final NotificationMessage message) {
47
			super();
48
			this.destination = destination;
49
			this.message = message;
50
		}
51

    
52
		public W3CEndpointReference getDestination() {
53
			return destination;
54
		}
55

    
56
		public NotificationMessage getMessage() {
57
			return message;
58
		}
59

    
60
	}
61

    
62
	/**
63
	 * job queue.
64
	 */
65
	private BlockingQueue<NotificationJob> jobQueue = new LinkedBlockingQueue<NotificationJob>();
66

    
67
	/**
68
	 * {@inheritDoc}
69
	 * 
70
	 * @see eu.dnetlib.enabling.is.sn.NotificationSender#send(javax.xml.ws.wsaddressing.W3CEndpointReference,
71
	 *      eu.dnetlib.enabling.is.sn.NotificationMessage)
72
	 */
73
	@Override
74
	public void send(final W3CEndpointReference destination, final NotificationMessage message) {
75
		log.debug("queuing asynchronous notification");
76
		try {
77
			jobQueue.put(new NotificationJob(destination, message));
78
		} catch (InterruptedException e) {
79
			log.warn("possibly lost notification", e);
80
		}
81
	}
82

    
83
	/**
84
	 * start this notification sender (called by spring lifecycle).
85
	 */
86
	void start() {
87
		new Thread(this).start(); // NOPMD
88
	}
89

    
90
	/** 
91
	 * {@inheritDoc}
92
	 * @see java.lang.Runnable#run()
93
	 */
94
	@Override
95
	public void run() {
96
		while (true) {
97
			try {
98
				final NotificationJob job = jobQueue.take();
99

    
100
				try {
101
					getInvoker().send(job.getDestination(), job.getMessage(), 0);
102
				} catch (javax.xml.ws.soap.SOAPFaultException t) {
103
					log.fatal("error sending notification to " + job.getDestination().toString(), t);
104
				}
105
			} catch (InterruptedException e) {
106
				log.warn("possibly lost notification", e);
107
			}
108
		}
109
	}
110

    
111
	public BlockingQueue<NotificationJob> getJobQueue() {
112
		return jobQueue;
113
	}
114

    
115
	public void setJobQueue(final BlockingQueue<NotificationJob> jobQueue) {
116
		this.jobQueue = jobQueue;
117
	}
118

    
119
}
(4-4/23)