Project

General

Profile

1
package eu.dnetlib.parthenos.workflows.nodes;
2

    
3
import java.net.URI;
4
import java.net.URISyntaxException;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.NoSuchElementException;
8
import java.util.concurrent.BlockingQueue;
9
import java.util.concurrent.ExecutorService;
10
import java.util.concurrent.Executors;
11
import java.util.concurrent.TimeUnit;
12

    
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Queues;
16
import eu.dnetlib.data.collector.ThreadSafeIterator;
17
import eu.dnetlib.rmi.data.CollectorServiceRuntimeException;
18
import org.apache.commons.lang3.StringEscapeUtils;
19
import org.apache.commons.lang3.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.http.client.utils.URIBuilder;
23
import org.springframework.http.*;
24
import org.springframework.web.client.ResourceAccessException;
25
import org.springframework.web.client.RestClientException;
26
import org.springframework.web.client.RestTemplate;
27

    
28
/**
29
 * Created by Alessia Bardi on 31/01/2018.
30
 *
31
 * @author Alessia Bardi
32
 */
33
public class VirtuosoParthenosIterator extends ThreadSafeIterator {
34

    
35
	private static final Log log = LogFactory.getLog(VirtuosoParthenosIterator.class);
36
	protected static final int QUEUE_TIMEOUT_SECONDS = 100;
37
	public final static String TERMINATOR = "ARNOLD";
38
	public final static String ERROR_TERMINATOR = "SCHWARZ";
39
	protected final static int SLEEP_MS = 5000;
40
	protected final static int MAX_RETRIES = 3;
41

    
42
	private String datasourceName;
43
	private String datasourceInterface;
44
	private String virtuosoReaderAPIUrl;
45
	private boolean started = false;
46
	private Map<String, Integer> errors = Maps.newHashMap();
47
	private List<String> listForClass = Lists.newArrayList();
48
	private BlockingQueue<String> elements = Queues.newArrayBlockingQueue(10);
49

    
50
	private String currentElement = null;
51
	private ExecutorService executor = Executors.newSingleThreadExecutor();
52

    
53
	private RestTemplate restTemplate;
54

    
55

    
56
	private synchronized void verifyStarted() throws Exception {
57
		if (!this.started) {
58
			this.started = true;
59
			fillQueue();
60
			getNextElement(MAX_RETRIES);
61
		}
62
	}
63

    
64
	protected void fillQueue(){
65
		log.info("Virtuoso reader at : " + getVirtuosoReaderAPIUrl());
66
		getExecutor().submit(() -> {
67
			String threadName = Thread.currentThread().getName();
68
			System.out.println("Hello " + threadName);
69
			URIBuilder builder;
70
			try {
71
				builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/graphs");
72
				builder.addParameter("api", getDatasourceInterface());
73
				URI uri = builder.build();
74
				log.info("fillQueue -- Calling for graph list: " + uri.toString());
75
				List<String> graphList = getRestTemplate().getForObject(uri, getListForClass().getClass());
76
				for (String graph : graphList) {
77
					List<String> subjectList = getSubjectList(graph);
78
					for (String subject : subjectList) {
79
						//Note: we skip no http resources based on FORTH feedback: URLs are created only for "relevant" stuff
80
						//TODO: is it better to skip them here or in the query run by VirtuosoReadApi?
81
						if (subject.startsWith("http")) {
82
							String xmlFile = tryGetRDF(subject, MAX_RETRIES);
83
							if (StringUtils.isBlank(xmlFile)) {
84
								log.warn("Skipping blank RDF for " + subject);
85
							} else {
86
								getElements().offer(xmlFile, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
87
							}
88
						} else {
89
							log.debug("Skipping " + subject + ": no http resource");
90
						}
91
					}
92
				}
93
				getElements().offer(TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
94
			} catch (Exception e) {
95
				log.error(e.getMessage());
96
				try {
97
					getElements().offer(ERROR_TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
98
				} catch (InterruptedException e1) {
99
					log.error(e1.getMessage());
100
				}
101
			}
102
		});
103
		getExecutor().shutdown();
104
	}
105

    
106

    
107
	protected String tryGetRDF(final String subjectURL, int attempt) throws URISyntaxException, InterruptedException, VirtuosoParthenosException {
108
		log.debug("Quering Api, remaining attempts: "+attempt);
109
		if (attempt <= 0) throw new VirtuosoParthenosException("Too many retries...stopping. please check virtuoso server status");
110
		ResponseEntity<String> response = null;
111
		try {
112
			response = getRDF(subjectURL);
113
		}catch(ResourceAccessException e){
114
			//request timed out --> let's return null and skip this resource
115
			log.warn("Request timeout");
116
			errors.merge(e.getMessage(), 1, Integer::sum);
117
			return null;
118
		}
119
		HttpStatus responseStatus = response.getStatusCode();
120
		if (responseStatus.is2xxSuccessful()) {
121
			String rdfFile = response.getBody();
122
			if(StringUtils.isBlank(rdfFile)){
123
				log.warn("Got blank RDF for "+subjectURL+" , let's try again...");
124
				Thread.sleep(SLEEP_MS);
125
				return tryGetRDF(subjectURL, --attempt);
126
			}
127
			else {
128
				final String xmlFile = completeXML(rdfFile, subjectURL);
129
				log.debug(xmlFile);
130
				return xmlFile;
131
			}
132
		} else {
133
			if (responseStatus.is5xxServerError()) {
134
				//sleep for a while and re-try
135
				log.warn("HTTP ERROR: "+responseStatus.value() + ": " + responseStatus.getReasonPhrase()+": I'll sleep and then try again");
136
				Thread.sleep(SLEEP_MS);
137
				return tryGetRDF(subjectURL, --attempt);
138
			} else {
139
				throw new VirtuosoParthenosException(
140
						"ERROR: Can't get the RDF for " + subjectURL + " " + responseStatus.value() + ": " + responseStatus.getReasonPhrase());
141
			}
142
		}
143
	}
144

    
145
	protected URI getURIForSubjectList(final String graph) throws URISyntaxException {
146
		URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subjects");
147
		builder.addParameter("graph", graph);
148
		return builder.build();
149
	}
150

    
151
	protected List<String> getSubjectList(final String graph) throws URISyntaxException, InterruptedException {
152
		URI uri = getURIForSubjectList(graph);
153
		log.debug("fillQueue -- Calling for subject list: " + uri.toString());
154
		List<String> subjectList = Lists.newArrayList();
155
		try {
156
			subjectList = getRestTemplate().getForObject(uri, getListForClass().getClass());
157
		}catch(RestClientException rce){
158
			log.warn("Skipping subjects from "+graph+" and sleeping for a while...");
159
			log.warn(rce.getMessage());
160
			Thread.sleep(SLEEP_MS);
161
		}
162
		return subjectList;
163
	}
164

    
165
	protected URI getURIForRDFRequest(final String subjectURL) throws URISyntaxException {
166
		URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subject").addParameter("subjectURL", subjectURL);
167
		return builder.build();
168
	}
169

    
170
	protected ResponseEntity<String> getRDF(final String subjectURL) throws URISyntaxException {
171
		HttpHeaders headers = new HttpHeaders();
172
		headers.setAccept(Lists.newArrayList(MediaType.APPLICATION_XML));
173
		URI uri = getURIForRDFRequest(subjectURL);
174
		log.debug("fillQueue -- Calling for subject RDF: " + uri.toString());
175
		HttpEntity<String> entity = new HttpEntity<>("parameters", headers);
176
		return restTemplate.exchange(uri, HttpMethod.GET, entity, String.class);
177
	}
178

    
179

    
180
	public String completeXML(final String rdfFile, final String url) {
181
		String xmlEscapedURL = StringEscapeUtils.escapeXml11(url);
182
		String rdfFileNoXmlDecl = rdfFile.replaceAll("\\<\\?xml(.+?)\\?\\>", "").trim();
183
		return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><record xmlns=\"http://www.openarchives.org/OAI/2.0/\"><header xmlns:dri=\"http://www.driver-repository.eu/namespace/dri\"><dri:objIdentifier>"
184
				+ xmlEscapedURL + "</dri:objIdentifier><dri:datasourceapi>" + datasourceInterface + "</dri:datasourceapi><dri:datasourcename>" + datasourceName
185
				+ "</dri:datasourcename></header><metadata>" + rdfFileNoXmlDecl + "</metadata></record>";
186
	}
187

    
188
	@Override
189
	public boolean doHasNext() {
190
		try {
191
			verifyStarted();
192
		} catch (Exception e) {
193
			executor.shutdownNow();
194
			throw new CollectorServiceRuntimeException(e);
195
		}
196
		switch(currentElement){
197
		case TERMINATOR:
198
			 if(!executor.isTerminated()) executor.shutdownNow();
199
			return false;
200
		case ERROR_TERMINATOR:
201
			 executor.shutdownNow();
202
			throw new CollectorServiceRuntimeException("Error getting elements from virtuoso");
203
		default:
204
			return true;
205
		}
206
	}
207

    
208
	@Override
209
	public String doNext() {
210
		try {
211
			verifyStarted();
212
		} catch (Exception e) {
213
			executor.shutdownNow();
214
			throw new CollectorServiceRuntimeException(e);
215
		}
216
		switch(currentElement){
217
		case TERMINATOR:
218
		case ERROR_TERMINATOR:
219
			executor.shutdownNow();
220
			throw new NoSuchElementException();
221
		default:
222
			String res = currentElement;
223
			getNextElement(MAX_RETRIES);
224
			return res;
225
		}
226
	}
227

    
228
	private void getNextElement(int attempt){
229
		log.debug("polling from queue, remaining attempts: "+attempt);
230
		if(attempt <= 0) currentElement = ERROR_TERMINATOR;
231
		else{
232
			try {
233
				currentElement = elements.take();
234
				/*
235
				if(currentElement == null){
236
					if (executor.isTerminated()) {
237
						//no additional element and we got the timeout...
238
						String errMessage = "Unexpected timeout reading element queue";
239
						log.warn(errMessage);
240
						errors.merge(errMessage, 1, Integer::sum);
241
						currentElement = ERROR_TERMINATOR;
242
					} else {
243
						//executor is not terminated: let's try again
244
						log.warn("Queue timed out, but I'll try again (#"+attempt+")");
245
						getNextElement(--attempt);
246
					}
247
				}
248
				*/
249
			} catch (InterruptedException e) {
250
				//current thread interrupted. Let's end.
251
				currentElement = ERROR_TERMINATOR;
252
				executor.shutdownNow();
253
			}
254
		}
255
	}
256

    
257
	@Override
258
	public void remove() {
259
		throw new UnsupportedOperationException();
260
	}
261

    
262
	public VirtuosoParthenosIterator datasourceInterface(final String datasourceInterface) {
263
		this.datasourceInterface = datasourceInterface;
264
		return this;
265
	}
266

    
267
	public VirtuosoParthenosIterator virtuosoReaderAPIUrl(final String virtuosoReaderAPIUrl) {
268
		this.virtuosoReaderAPIUrl = virtuosoReaderAPIUrl;
269
		return this;
270
	}
271

    
272
	public VirtuosoParthenosIterator datasourceName(final String datasourceName) {
273
		this.datasourceName = datasourceName;
274
		return this;
275
	}
276

    
277
	public VirtuosoParthenosIterator errors(final Map<String, Integer> errors) {
278
		this.errors = errors;
279
		return this;
280
	}
281

    
282
	public String getDatasourceInterface() {
283
		return datasourceInterface;
284
	}
285

    
286
	public String getVirtuosoReaderAPIUrl() {
287
		return virtuosoReaderAPIUrl;
288
	}
289

    
290
	public Map<String, Integer> getErrors() {
291
		return errors;
292
	}
293

    
294
	public BlockingQueue<String> getElements() {
295
		return elements;
296
	}
297

    
298
	public RestTemplate getRestTemplate() {
299
		return restTemplate;
300
	}
301

    
302
	public VirtuosoParthenosIterator restTemplate(final RestTemplate restTemplate) {
303
		this.restTemplate = restTemplate;
304
		return this;
305
	}
306

    
307
	public String getDatasourceName() {
308
		return datasourceName;
309
	}
310

    
311
	public boolean isStarted() {
312
		return started;
313
	}
314

    
315
	public List<String> getListForClass() {
316
		return listForClass;
317
	}
318

    
319
	public String getCurrentElement() {
320
		return currentElement;
321
	}
322

    
323
	public ExecutorService getExecutor() {
324
		return executor;
325
	}
326

    
327
	public void setDatasourceName(final String datasourceName) {
328
		this.datasourceName = datasourceName;
329
	}
330

    
331
	public void setDatasourceInterface(final String datasourceInterface) {
332
		this.datasourceInterface = datasourceInterface;
333
	}
334

    
335
	public void setVirtuosoReaderAPIUrl(final String virtuosoReaderAPIUrl) {
336
		this.virtuosoReaderAPIUrl = virtuosoReaderAPIUrl;
337
	}
338

    
339
	public void setStarted(final boolean started) {
340
		this.started = started;
341
	}
342

    
343
	public void setErrors(final Map<String, Integer> errors) {
344
		this.errors = errors;
345
	}
346

    
347
	public void setListForClass(final List<String> listForClass) {
348
		this.listForClass = listForClass;
349
	}
350

    
351
	public void setElements(final BlockingQueue<String> elements) {
352
		this.elements = elements;
353
	}
354

    
355
	public void setCurrentElement(final String currentElement) {
356
		this.currentElement = currentElement;
357
	}
358

    
359
	public void setExecutor(final ExecutorService executor) {
360
		this.executor = executor;
361
	}
362

    
363
	public void setRestTemplate(final RestTemplate restTemplate) {
364
		this.restTemplate = restTemplate;
365
	}
366
}
(13-13/13)