Project

General

Profile

1
package eu.dnetlib.enabling.is.sn.resourcestate;
2

    
3
import java.util.Arrays;
4
import java.util.Collection;
5
import java.util.HashSet;
6
import java.util.Set;
7

    
8
import javax.xml.transform.dom.DOMResult;
9
import javax.xml.ws.wsaddressing.W3CEndpointReference;
10
import javax.xml.xpath.XPathExpressionException;
11
import javax.xml.xpath.XPathFactory;
12

    
13
import com.google.common.collect.Sets;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.apache.oro.text.perl.Perl5Util;
17
import org.springframework.beans.factory.annotation.Autowired;
18
import org.springframework.beans.factory.annotation.Required;
19

    
20
import eu.dnetlib.enabling.is.sn.AbstractSubscriptionRegistry;
21
import eu.dnetlib.enabling.is.sn.SubscriptionRegistry;
22
import eu.dnetlib.enabling.is.sn.SubscriptionRequest;
23
import eu.dnetlib.enabling.is.sn.TopicExpressionMatchResult;
24
import eu.dnetlib.enabling.is.sn.rmi.SubscriptionRequestRejectedException;
25
import org.springframework.cache.annotation.CacheEvict;
26
import org.springframework.cache.annotation.Cacheable;
27

    
28
/**
29
 * Manage subscription for UPDATE/CREATE/DELETE resource-related topic prefixes.
30
 *
31
 * @author marko
32
 *
33
 */
34
public class ResourceStateSubscriptionRegistry extends AbstractSubscriptionRegistry implements SubscriptionRegistry {
35

    
36
	private static final Log log = LogFactory.getLog(ResourceStateSubscriptionRegistry.class);
37

    
38
	/**
39
	 * subscription DAO.
40
	 */
41
	private ResourceStateSubscriptionDAO subscriptionDao;
42

    
43
	/**
44
	 * subscription request filter.
45
	 */
46
	@Autowired
47
	private SubscriptionRequestFilter subscriptionRequestFilter;
48

    
49
	/**
50
	 * {@inheritDoc}
51
	 *
52
	 * @see eu.dnetlib.enabling.is.sn.SubscriptionRegistry#registerSubscription(eu.dnetlib.enabling.is.sn.SubscriptionRequest)
53
	 */
54
	@Override
55
	@CacheEvict(value = "subscriptions", allEntries = true)
56
	public String registerSubscription(final SubscriptionRequest subscription) throws SubscriptionRequestRejectedException {
57
		final TopicExpressionMatchResult prefixMatch = matchPrefix(subscription);
58
		if (prefixMatch == null)
59
			return null;
60

    
61
		final TopicExpressionMatchResult typeMatch = matchType(prefixMatch.getRest());
62
		if (typeMatch == null)
63
			return null; // TODO: decide whether to fail or not
64

    
65
		final TopicExpressionMatchResult idMatch = matchId(typeMatch.getRest());
66

    
67
		if (idMatch == null)
68
			return null; // TODO: decide whether to fail or not
69

    
70
		final ResourceStateSubscription rss = new ResourceStateSubscription(subscription, prefixMatch.getPrefix(), typeMatch.getPrefix(), idMatch.getPrefix(),
71
				idMatch.getRest());
72

    
73
		if (!getSubscriptionRequestFilter().accept(rss))
74
			throw new SubscriptionRequestRejectedException(String.format("rejected subscription request, resourceId: '%s', xpath: '%s', from: %s",
75
					rss.getResourceId(), rss.getXpath(), rss.getSubscriber()));
76

    
77
		return registerSubscription(rss);
78
	}
79

    
80
	/**
81
	 * this registers the real subscription.
82
	 *
83
	 * TODO: am I sure that the overload is a good thing here?
84
	 *
85
	 * @param subscription
86
	 *            subscription
87
	 * @return subscription id (potentially changed)
88
	 */
89
	private String registerSubscription(final ResourceStateSubscription subscription) {
90
	    if(log.isDebugEnabled()) {
91
		log.debug("evict subscriptions cache");
92
	    }
93

    
94
		// TODO: change the dao, and implement a method which finds a given subscription directly.
95
		final Collection<ResourceStateSubscription> similar = subscriptionDao.listSubscriptions(subscription.getPrefix(), subscription.getType(),
96
				subscription.getResourceId());
97
		for (final ResourceStateSubscription r : similar) {
98
			if (r != null && getAddress(subscription.getSubscriberAsEpr()).equals(getAddress(r.getSubscriberAsEpr()))
99
					&& (subscription.getXpath() == r.getXpath() || subscription.getXpath().equals(r.getXpath()))
100
					&& (subscription.getResourceId() == r.getResourceId() || subscription.getResourceId().equals(r.getResourceId())))
101
				return r.getSubscriptionId();
102
		}
103

    
104
		subscriptionDao.addSubscription(subscription);
105

    
106
		return subscription.getSubscriptionId();
107
	}
108

    
109
	/**
110
	 * {@inheritDoc}
111
	 *
112
	 * @see eu.dnetlib.enabling.is.sn.SubscriptionRegistry#unsubscribe(java.lang.String)
113
	 */
114
	@Override
115
	@CacheEvict(value = "subscriptions", allEntries = true)
116
	public boolean unsubscribe(final String subId) {
117
		log.info("evict subscriptions cache");
118
		return subscriptionDao.removeSubscription(subId);
119
	}
120

    
121
	/**
122
	 * Obtains the address component of the EPR.
123
	 *
124
	 * TODO: refactor in a utility class.
125
	 *
126
	 * @param epr
127
	 *            endpoint reference
128
	 * @return address contained in the endpoint reference
129
	 */
130
	private Object getAddress(final W3CEndpointReference epr) {
131
		final DOMResult dom = new DOMResult();
132
		epr.writeTo(dom);
133

    
134
		try {
135
			return XPathFactory.newInstance().newXPath().evaluate("//*[local-name() = 'Address']", dom.getNode());
136
		} catch (final XPathExpressionException e) {
137
			throw new IllegalStateException("cannot construct xpath expression", e);
138
		}
139
	}
140

    
141
	/**
142
	 * extract resource type name.
143
	 *
144
	 * @param rest
145
	 *            topic expression without prefix
146
	 * @return tuple containing type name and rest of the prefix
147
	 */
148
	public TopicExpressionMatchResult matchType(final String rest) {
149
		final Perl5Util matcher = new Perl5Util(); // NOPMD
150
		if (!matcher.match("/^(\\*|[a-zA-Z_0-9]*)($|/)(.*)$/", rest))
151
			return null;
152

    
153
		return new TopicExpressionMatchResult(matcher.getMatch().group(1), matcher.getMatch().group(2 + 1));
154
	}
155

    
156
	/**
157
	 * extract resource id.
158
	 *
159
	 * @param rest
160
	 *            topic expression without prefix
161
	 * @return tuple containing type name and rest of the prefix
162
	 */
163
	public TopicExpressionMatchResult matchId(final String rest) {
164
		final Perl5Util matcher = new Perl5Util(); // NOPMD
165
		if (!matcher.match("/^([^/]*)(.*)/", rest))
166
			return null;
167

    
168
		return new TopicExpressionMatchResult(matcher.getMatch().group(1), matcher.getMatch().group(2));
169
	}
170

    
171
	/**
172
	 * return all subscriptions matching a given prefix and a given type. Wildcard subscriptions will match any resource type.
173
	 *
174
	 * @param prefix
175
	 *            prefix
176
	 * @param type
177
	 *            concrete type
178
	 * @param resId
179
	 *            resource identifier
180
	 * @return all matching subscriptions
181
	 */
182
	@Override
183
	@Cacheable(value="subscriptions", key="{ #prefix, #type, #resId }")
184
	public Collection<ResourceStateSubscription> listMatchingSubscriptions(final String prefix, final String type, final String resId) {
185

    
186
		if(log.isDebugEnabled()) {
187
			log.debug(String.format("uncached list subscriptions [prefix: '%s', type: '%s', resourceId: '%s']", prefix, type, resId));
188
		}
189
		final Set<ResourceStateSubscription> merged = new HashSet<ResourceStateSubscription>();
190
		merged.addAll(subscriptionDao.listSubscriptions(prefix, type, resId));
191
		merged.addAll(subscriptionDao.listSubscriptions(prefix, type, "*"));
192
		merged.addAll(subscriptionDao.listSubscriptions(prefix, "*", "*"));
193
		return merged;
194
	}
195

    
196
	@Override
197
	@Cacheable(value="subscriptions")
198
	public Collection<ResourceStateSubscription> listSubscriptions() {
199
		log.info("uncached list subscriptions");
200

    
201
		return Sets.newHashSet(subscriptionDao.listSubscriptions());
202
	}
203

    
204
	@Override
205
	@CacheEvict(value = "subscriptions", allEntries = true)
206
	public boolean removeSubscription(final String subscriptionId) {
207
		return subscriptionDao.removeSubscription(subscriptionId);
208
	}
209

    
210
	@Override
211
	protected Collection<String> getAcceptedPrefixes() {
212
		return Arrays.asList(new String[] { ResourceStateSubscription.PREFIX_CREATE, ResourceStateSubscription.PREFIX_DELETE,
213
				ResourceStateSubscription.PREFIX_UPDATE });
214
	}
215

    
216
	public ResourceStateSubscriptionDAO getSubscriptionDao() {
217
		return subscriptionDao;
218
	}
219

    
220
	@Required
221
	public void setSubscriptionDao(final ResourceStateSubscriptionDAO subscriptionDao) {
222
		this.subscriptionDao = subscriptionDao;
223
	}
224

    
225
	public SubscriptionRequestFilter getSubscriptionRequestFilter() {
226
		return subscriptionRequestFilter;
227
	}
228

    
229
	public void setSubscriptionRequestFilter(final SubscriptionRequestFilter subscriptionRequestFilter) {
230
		this.subscriptionRequestFilter = subscriptionRequestFilter;
231
	}
232

    
233
}
(8-8/9)