Project

General

Profile

1
package eu.dnetlib.parthenos.workflows.nodes;
2

    
3
import java.net.URI;
4
import java.net.URISyntaxException;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.NoSuchElementException;
8
import java.util.concurrent.BlockingQueue;
9
import java.util.concurrent.ExecutorService;
10
import java.util.concurrent.Executors;
11
import java.util.concurrent.TimeUnit;
12

    
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Queues;
16
import eu.dnetlib.data.collector.ThreadSafeIterator;
17
import eu.dnetlib.rmi.data.CollectorServiceRuntimeException;
18
import org.apache.commons.lang3.StringEscapeUtils;
19
import org.apache.commons.lang3.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.http.client.utils.URIBuilder;
23
import org.springframework.http.*;
24
import org.springframework.web.client.ResourceAccessException;
25
import org.springframework.web.client.RestClientException;
26
import org.springframework.web.client.RestTemplate;
27

    
28
/**
29
 * Created by Alessia Bardi on 31/01/2018.
30
 *
31
 * @author Alessia Bardi
32
 */
33
public class VirtuosoParthenosIterator extends ThreadSafeIterator {
34

    
35
	private static final Log log = LogFactory.getLog(VirtuosoParthenosIterator.class);
36
	protected static final String ANY_TIME_QUERY_MS = "3000";
37
	protected static final int QUEUE_TIMEOUT_SECONDS = 100;
38
	public final static String TERMINATOR = "ARNOLD";
39
	public final static String ERROR_TERMINATOR = "SCHWARZ";
40
	protected final static int SLEEP_MS = 5000;
41
	protected final static int MAX_RETRIES = 3;
42
	protected final static int LIMIT = 100;
43

    
44
	private String datasourceName;
45
	private String datasourceInterface;
46
	private String virtuosoReaderAPIUrl;
47
	private boolean started = false;
48
	private Map<String, Integer> errors = Maps.newHashMap();
49
	private List<String> listForClass = Lists.newArrayList();
50
	private BlockingQueue<String> elements = Queues.newArrayBlockingQueue(10);
51

    
52
	private String currentElement = null;
53
	private ExecutorService executor = Executors.newSingleThreadExecutor();
54

    
55
	private RestTemplate restTemplate;
56

    
57

    
58
	private synchronized void verifyStarted(){
59
		if (!this.started) {
60
			this.started = true;
61
			fillQueue();
62
			getNextElement(MAX_RETRIES);
63
		}
64
	}
65

    
66
	protected void fillQueue(){
67
		log.info("Virtuoso reader at : " + getVirtuosoReaderAPIUrl());
68
		getExecutor().submit(() -> {
69
			try {
70
				int offset = 0;
71
				boolean again;
72
				do {
73
					List<String> subjectList = getSubjectList(offset);
74
					for (String subject : subjectList) {
75
						String xmlFile = tryGetRDF(subject, MAX_RETRIES);
76
						if (StringUtils.isBlank(xmlFile)) {
77
							log.warn("Skipping blank RDF for " + subject);
78
						} else {
79
							getElements().offer(xmlFile, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
80
						}
81
					}
82
					again = subjectList.size() == LIMIT;
83
					offset += LIMIT;
84
				} while(again);
85
				log.debug("End of subject list, adding terminator to the queue");
86
				getElements().offer(TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
87
			} catch (Exception e) {
88
				log.error(e.getMessage());
89
				try {
90
					getElements().offer(ERROR_TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
91
				} catch (InterruptedException e1) {
92
					log.error(e1.getMessage());
93
				}
94
			}
95

    
96
		});
97
		getExecutor().shutdown();
98
	}
99

    
100

    
101
	protected String tryGetRDF(final String subjectURL, int attempt) throws URISyntaxException, InterruptedException, VirtuosoParthenosException {
102
		log.debug("Querying Api, remaining attempts: "+attempt);
103
		if (attempt <= 0) {
104
			errors.merge("Failed tryGetRDF", 1, Integer::sum);
105
			return null;
106
		}
107
		ResponseEntity<String> response = null;
108
		try {
109
			response = getRDF(subjectURL);
110
		}catch(ResourceAccessException e){
111
			//request timed out --> sleep and try again
112
			log.warn("Request timeout for "+subjectURL+": I'll sleep and then try again");
113
			Thread.sleep(SLEEP_MS);
114
			return tryGetRDF(subjectURL, --attempt);
115
		}
116
		HttpStatus responseStatus = response.getStatusCode();
117
		if (responseStatus.is2xxSuccessful()) {
118
			String rdfFile = response.getBody();
119
			if(StringUtils.isBlank(rdfFile)){
120
				log.warn("Got blank RDF for "+subjectURL+" , let's try again...");
121
				Thread.sleep(SLEEP_MS);
122
				return tryGetRDF(subjectURL, --attempt);
123
			}
124
			else {
125
				final String xmlFile = completeXML(rdfFile, subjectURL);
126
				return xmlFile;
127
			}
128
		} else {
129
			if (responseStatus.is5xxServerError()) {
130
				//sleep for a while and re-try
131
				log.warn("HTTP ERROR: "+responseStatus.value() + ": " + responseStatus.getReasonPhrase()+": I'll sleep and then try again");
132
				Thread.sleep(SLEEP_MS);
133
				return tryGetRDF(subjectURL, --attempt);
134
			} else {
135
				log.error("ERROR: Can't get the RDF for " + subjectURL + " " + responseStatus.value() + ": " + responseStatus.getReasonPhrase());
136
				errors.merge(responseStatus.value() + ": " + responseStatus.getReasonPhrase(), 1, Integer::sum);
137
			}
138
		}
139
		return null;
140
	}
141

    
142
	protected URI getURIForSubjectList(final int offset) throws URISyntaxException {
143
		URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/apiSubjects");
144
		builder.addParameter("api", getDatasourceInterface());
145
		builder.addParameter("limit", Integer.toString(LIMIT));
146
		builder.addParameter("offset", Integer.toString(offset));
147
		return builder.build();
148
	}
149

    
150
	protected List<String> getSubjectList(final int offset) throws URISyntaxException, VirtuosoParthenosException {
151
		URI uri = getURIForSubjectList(offset);
152
		log.debug("fillQueue -- Calling for subject list: " + uri.toString());
153
		List<String> subjectList;
154
		try {
155
			subjectList = getRestTemplate().getForObject(uri, getListForClass().getClass());
156
		}catch(RestClientException rce){
157
			throw new VirtuosoParthenosException(rce);
158
		}
159
		return subjectList;
160
	}
161

    
162
	protected URI getURIForRDFRequest(final String subjectURL) throws URISyntaxException {
163
		URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subject").addParameter("subjectURL", subjectURL).addParameter("timeout", ANY_TIME_QUERY_MS);
164
		return builder.build();
165
	}
166

    
167
	protected ResponseEntity<String> getRDF(final String subjectURL) throws URISyntaxException {
168
		HttpHeaders headers = new HttpHeaders();
169
		headers.setAccept(Lists.newArrayList(MediaType.APPLICATION_XML));
170
		URI uri = getURIForRDFRequest(subjectURL);
171
		log.debug("fillQueue -- Calling for subject RDF: " + uri.toString());
172
		HttpEntity<String> entity = new HttpEntity<>("parameters", headers);
173
		return restTemplate.exchange(uri, HttpMethod.GET, entity, String.class);
174
	}
175

    
176

    
177
	public String completeXML(final String rdfFile, final String url) {
178
		String xmlEscapedURL = StringEscapeUtils.escapeXml11(url);
179
		String rdfFileNoXmlDecl = rdfFile.replaceAll("\\<\\?xml(.+?)\\?\\>", "").trim();
180
		return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><record xmlns=\"http://www.openarchives.org/OAI/2.0/\"><header xmlns:dri=\"http://www.driver-repository.eu/namespace/dri\"><dri:objIdentifier>"
181
				+ xmlEscapedURL + "</dri:objIdentifier><dri:datasourceapi>" + datasourceInterface + "</dri:datasourceapi><dri:datasourcename>" + datasourceName
182
				+ "</dri:datasourcename></header><metadata>" + rdfFileNoXmlDecl + "</metadata></record>";
183
	}
184

    
185
	@Override
186
	public boolean doHasNext() {
187
		try {
188
			verifyStarted();
189
		} catch (Exception e) {
190
			getExecutor().shutdownNow();
191
			throw new CollectorServiceRuntimeException(e);
192
		}
193
		switch(currentElement){
194
		case TERMINATOR:
195
			 if(!executor.isTerminated()) executor.shutdownNow();
196
			return false;
197
		case ERROR_TERMINATOR:
198
			 executor.shutdownNow();
199
			throw new CollectorServiceRuntimeException("Error getting elements from virtuoso");
200
		default:
201
			return true;
202
		}
203
	}
204

    
205
	@Override
206
	public String doNext() {
207
		if(!hasNext()) {
208
			log.error("Next called but hasNext is false", new NoSuchElementException());
209
			throw new NoSuchElementException();
210
		}
211
		switch(currentElement){
212
		case TERMINATOR:
213
		case ERROR_TERMINATOR:
214
			executor.shutdownNow();
215
			throw new NoSuchElementException();
216
		default:
217
			String res = currentElement;
218
			getNextElement(MAX_RETRIES);
219
			return res;
220
		}
221
	}
222

    
223
	private void getNextElement(int attempt){
224
		log.debug("polling from queue, remaining attempts: "+attempt);
225
		if(attempt <= 0) currentElement = ERROR_TERMINATOR;
226
		else{
227
			try {
228
				currentElement = elements.take();
229
			} catch (InterruptedException e) {
230
				//current thread interrupted. Let's end.
231
				currentElement = ERROR_TERMINATOR;
232
				executor.shutdownNow();
233
			}
234
		}
235
	}
236

    
237
	@Override
238
	public void remove() {
239
		throw new UnsupportedOperationException();
240
	}
241

    
242
	public VirtuosoParthenosIterator datasourceInterface(final String datasourceInterface) {
243
		this.datasourceInterface = datasourceInterface;
244
		return this;
245
	}
246

    
247
	public VirtuosoParthenosIterator virtuosoReaderAPIUrl(final String virtuosoReaderAPIUrl) {
248
		this.virtuosoReaderAPIUrl = virtuosoReaderAPIUrl;
249
		return this;
250
	}
251

    
252
	public VirtuosoParthenosIterator datasourceName(final String datasourceName) {
253
		this.datasourceName = datasourceName;
254
		return this;
255
	}
256

    
257
	public VirtuosoParthenosIterator errors(final Map<String, Integer> errors) {
258
		this.errors = errors;
259
		return this;
260
	}
261

    
262
	public String getDatasourceInterface() {
263
		return datasourceInterface;
264
	}
265

    
266
	public String getVirtuosoReaderAPIUrl() {
267
		return virtuosoReaderAPIUrl;
268
	}
269

    
270
	public Map<String, Integer> getErrors() {
271
		return errors;
272
	}
273

    
274
	public BlockingQueue<String> getElements() {
275
		return elements;
276
	}
277

    
278
	public RestTemplate getRestTemplate() {
279
		return restTemplate;
280
	}
281

    
282
	public VirtuosoParthenosIterator restTemplate(final RestTemplate restTemplate) {
283
		this.restTemplate = restTemplate;
284
		return this;
285
	}
286

    
287
	public String getDatasourceName() {
288
		return datasourceName;
289
	}
290

    
291
	public boolean isStarted() {
292
		return started;
293
	}
294

    
295
	public List<String> getListForClass() {
296
		return listForClass;
297
	}
298

    
299
	public String getCurrentElement() {
300
		return currentElement;
301
	}
302

    
303
	public ExecutorService getExecutor() {
304
		return executor;
305
	}
306

    
307
	public void setDatasourceName(final String datasourceName) {
308
		this.datasourceName = datasourceName;
309
	}
310

    
311
	public void setDatasourceInterface(final String datasourceInterface) {
312
		this.datasourceInterface = datasourceInterface;
313
	}
314

    
315
	public void setVirtuosoReaderAPIUrl(final String virtuosoReaderAPIUrl) {
316
		this.virtuosoReaderAPIUrl = virtuosoReaderAPIUrl;
317
	}
318

    
319
	public void setStarted(final boolean started) {
320
		this.started = started;
321
	}
322

    
323
	public void setErrors(final Map<String, Integer> errors) {
324
		this.errors = errors;
325
	}
326

    
327
	public void setListForClass(final List<String> listForClass) {
328
		this.listForClass = listForClass;
329
	}
330

    
331
	public void setElements(final BlockingQueue<String> elements) {
332
		this.elements = elements;
333
	}
334

    
335
	public void setCurrentElement(final String currentElement) {
336
		this.currentElement = currentElement;
337
	}
338

    
339
	public void setExecutor(final ExecutorService executor) {
340
		this.executor = executor;
341
	}
342

    
343
	public void setRestTemplate(final RestTemplate restTemplate) {
344
		this.restTemplate = restTemplate;
345
	}
346
}
(13-13/13)