Project

General

Profile

1
package eu.dnetlib.parthenos.workflows.nodes;
2

    
3
import java.net.URI;
4
import java.net.URISyntaxException;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.NoSuchElementException;
8
import java.util.concurrent.BlockingQueue;
9
import java.util.concurrent.ExecutorService;
10
import java.util.concurrent.Executors;
11
import java.util.concurrent.TimeUnit;
12

    
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Queues;
16
import eu.dnetlib.data.collector.ThreadSafeIterator;
17
import eu.dnetlib.rmi.data.CollectorServiceRuntimeException;
18
import org.apache.commons.lang3.StringEscapeUtils;
19
import org.apache.commons.lang3.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.http.client.utils.URIBuilder;
23
import org.springframework.http.*;
24
import org.springframework.web.client.ResourceAccessException;
25
import org.springframework.web.client.RestClientException;
26
import org.springframework.web.client.RestTemplate;
27

    
28
/**
29
 * Created by Alessia Bardi on 31/01/2018.
30
 *
31
 * @author Alessia Bardi
32
 */
33
public class VirtuosoParthenosIterator extends ThreadSafeIterator {
34

    
35
	private static final Log log = LogFactory.getLog(VirtuosoParthenosIterator.class);
36
	protected static final int QUEUE_TIMEOUT_SECONDS = 100;
37
	public final static String TERMINATOR = "ARNOLD";
38
	public final static String ERROR_TERMINATOR = "SCHWARZ";
39
	protected final static int SLEEP_MS = 5000;
40
	protected final static int MAX_RETRIES = 3;
41
	protected final static int LIMIT = 100;
42

    
43
	private String datasourceName;
44
	private String datasourceInterface;
45
	private String virtuosoReaderAPIUrl;
46
	private boolean started = false;
47
	private Map<String, Integer> errors = Maps.newHashMap();
48
	private List<String> listForClass = Lists.newArrayList();
49
	private BlockingQueue<String> elements = Queues.newArrayBlockingQueue(10);
50

    
51
	private String currentElement = null;
52
	private ExecutorService executor = Executors.newSingleThreadExecutor();
53

    
54
	private RestTemplate restTemplate;
55

    
56

    
57
	private synchronized void verifyStarted(){
58
		if (!this.started) {
59
			this.started = true;
60
			fillQueue();
61
			getNextElement(MAX_RETRIES);
62
		}
63
	}
64

    
65
	protected void fillQueue(){
66
		log.info("Virtuoso reader at : " + getVirtuosoReaderAPIUrl());
67
		getExecutor().submit(() -> {
68
			try {
69
				int offset = 0;
70
				boolean again = true;
71
				do {
72
					List<String> subjectList = getSubjectList(offset);
73
					for (String subject : subjectList) {
74
						String xmlFile = tryGetRDF(subject, MAX_RETRIES);
75
						if (StringUtils.isBlank(xmlFile)) {
76
							log.warn("Skipping blank RDF for " + subject);
77
						} else {
78
							getElements().offer(xmlFile, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
79
						}
80
					}
81
					again = subjectList.size() == LIMIT;
82
					offset += LIMIT;
83
				} while(again);
84
				log.debug("End of subject list, adding terminator to the queue");
85
				getElements().offer(TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
86
			} catch (Exception e) {
87
				log.error(e.getMessage());
88
				try {
89
					getElements().offer(ERROR_TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
90
				} catch (InterruptedException e1) {
91
					log.error(e1.getMessage());
92
				}
93
			}
94

    
95
		});
96
		getExecutor().shutdown();
97
	}
98

    
99

    
100
	protected String tryGetRDF(final String subjectURL, int attempt) throws URISyntaxException, InterruptedException, VirtuosoParthenosException {
101
		log.debug("Quering Api, remaining attempts: "+attempt);
102
		if (attempt <= 0) {
103
			errors.merge("Failed tryGetRDF", 1, Integer::sum);
104
			return null;
105
		}
106
		ResponseEntity<String> response = null;
107
		try {
108
			response = getRDF(subjectURL);
109
		}catch(ResourceAccessException e){
110
			//request timed out --> sleep and try again
111
			log.warn("Request timeout for "+subjectURL+": I'll sleep and then try again");
112
			Thread.sleep(SLEEP_MS);
113
			return tryGetRDF(subjectURL, --attempt);
114
		}
115
		HttpStatus responseStatus = response.getStatusCode();
116
		if (responseStatus.is2xxSuccessful()) {
117
			String rdfFile = response.getBody();
118
			if(StringUtils.isBlank(rdfFile)){
119
				log.warn("Got blank RDF for "+subjectURL+" , let's try again...");
120
				Thread.sleep(SLEEP_MS);
121
				return tryGetRDF(subjectURL, --attempt);
122
			}
123
			else {
124
				final String xmlFile = completeXML(rdfFile, subjectURL);
125
				return xmlFile;
126
			}
127
		} else {
128
			if (responseStatus.is5xxServerError()) {
129
				//sleep for a while and re-try
130
				log.warn("HTTP ERROR: "+responseStatus.value() + ": " + responseStatus.getReasonPhrase()+": I'll sleep and then try again");
131
				Thread.sleep(SLEEP_MS);
132
				return tryGetRDF(subjectURL, --attempt);
133
			} else {
134
				log.error("ERROR: Can't get the RDF for " + subjectURL + " " + responseStatus.value() + ": " + responseStatus.getReasonPhrase());
135
				errors.merge(responseStatus.value() + ": " + responseStatus.getReasonPhrase(), 1, Integer::sum);
136
			}
137
		}
138
		return null;
139
	}
140

    
141
	protected URI getURIForSubjectList(final int offset) throws URISyntaxException {
142
		URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/apiSubjects");
143
		builder.addParameter("api", getDatasourceInterface());
144
		builder.addParameter("limit", Integer.toString(LIMIT));
145
		builder.addParameter("offset", Integer.toString(offset));
146
		return builder.build();
147
	}
148

    
149
	protected List<String> getSubjectList(final int offset) throws URISyntaxException, VirtuosoParthenosException {
150
		URI uri = getURIForSubjectList(offset);
151
		log.debug("fillQueue -- Calling for subject list: " + uri.toString());
152
		List<String> subjectList;
153
		try {
154
			subjectList = getRestTemplate().getForObject(uri, getListForClass().getClass());
155
		}catch(RestClientException rce){
156
			throw new VirtuosoParthenosException(rce);
157
		}
158
		return subjectList;
159
	}
160

    
161
	protected URI getURIForRDFRequest(final String subjectURL) throws URISyntaxException {
162
		URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subject").addParameter("subjectURL", subjectURL);
163
		return builder.build();
164
	}
165

    
166
	protected ResponseEntity<String> getRDF(final String subjectURL) throws URISyntaxException {
167
		HttpHeaders headers = new HttpHeaders();
168
		headers.setAccept(Lists.newArrayList(MediaType.APPLICATION_XML));
169
		URI uri = getURIForRDFRequest(subjectURL);
170
		log.debug("fillQueue -- Calling for subject RDF: " + uri.toString());
171
		HttpEntity<String> entity = new HttpEntity<>("parameters", headers);
172
		return restTemplate.exchange(uri, HttpMethod.GET, entity, String.class);
173
	}
174

    
175

    
176
	public String completeXML(final String rdfFile, final String url) {
177
		String xmlEscapedURL = StringEscapeUtils.escapeXml11(url);
178
		String rdfFileNoXmlDecl = rdfFile.replaceAll("\\<\\?xml(.+?)\\?\\>", "").trim();
179
		return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><record xmlns=\"http://www.openarchives.org/OAI/2.0/\"><header xmlns:dri=\"http://www.driver-repository.eu/namespace/dri\"><dri:objIdentifier>"
180
				+ xmlEscapedURL + "</dri:objIdentifier><dri:datasourceapi>" + datasourceInterface + "</dri:datasourceapi><dri:datasourcename>" + datasourceName
181
				+ "</dri:datasourcename></header><metadata>" + rdfFileNoXmlDecl + "</metadata></record>";
182
	}
183

    
184
	@Override
185
	public boolean doHasNext() {
186
		try {
187
			verifyStarted();
188
		} catch (Exception e) {
189
			getExecutor().shutdownNow();
190
			throw new CollectorServiceRuntimeException(e);
191
		}
192
		switch(currentElement){
193
		case TERMINATOR:
194
			 if(!executor.isTerminated()) executor.shutdownNow();
195
			return false;
196
		case ERROR_TERMINATOR:
197
			 executor.shutdownNow();
198
			throw new CollectorServiceRuntimeException("Error getting elements from virtuoso");
199
		default:
200
			return true;
201
		}
202
	}
203

    
204
	@Override
205
	public String doNext() {
206
		if(!hasNext()) {
207
			log.error("Next called but hasNext is false", new NoSuchElementException());
208
			throw new NoSuchElementException();
209
		}
210
		switch(currentElement){
211
		case TERMINATOR:
212
		case ERROR_TERMINATOR:
213
			executor.shutdownNow();
214
			throw new NoSuchElementException();
215
		default:
216
			String res = currentElement;
217
			getNextElement(MAX_RETRIES);
218
			return res;
219
		}
220
	}
221

    
222
	private void getNextElement(int attempt){
223
		log.debug("polling from queue, remaining attempts: "+attempt);
224
		if(attempt <= 0) currentElement = ERROR_TERMINATOR;
225
		else{
226
			try {
227
				currentElement = elements.take();
228
			} catch (InterruptedException e) {
229
				//current thread interrupted. Let's end.
230
				currentElement = ERROR_TERMINATOR;
231
				executor.shutdownNow();
232
			}
233
		}
234
	}
235

    
236
	@Override
237
	public void remove() {
238
		throw new UnsupportedOperationException();
239
	}
240

    
241
	public VirtuosoParthenosIterator datasourceInterface(final String datasourceInterface) {
242
		this.datasourceInterface = datasourceInterface;
243
		return this;
244
	}
245

    
246
	public VirtuosoParthenosIterator virtuosoReaderAPIUrl(final String virtuosoReaderAPIUrl) {
247
		this.virtuosoReaderAPIUrl = virtuosoReaderAPIUrl;
248
		return this;
249
	}
250

    
251
	public VirtuosoParthenosIterator datasourceName(final String datasourceName) {
252
		this.datasourceName = datasourceName;
253
		return this;
254
	}
255

    
256
	public VirtuosoParthenosIterator errors(final Map<String, Integer> errors) {
257
		this.errors = errors;
258
		return this;
259
	}
260

    
261
	public String getDatasourceInterface() {
262
		return datasourceInterface;
263
	}
264

    
265
	public String getVirtuosoReaderAPIUrl() {
266
		return virtuosoReaderAPIUrl;
267
	}
268

    
269
	public Map<String, Integer> getErrors() {
270
		return errors;
271
	}
272

    
273
	public BlockingQueue<String> getElements() {
274
		return elements;
275
	}
276

    
277
	public RestTemplate getRestTemplate() {
278
		return restTemplate;
279
	}
280

    
281
	public VirtuosoParthenosIterator restTemplate(final RestTemplate restTemplate) {
282
		this.restTemplate = restTemplate;
283
		return this;
284
	}
285

    
286
	public String getDatasourceName() {
287
		return datasourceName;
288
	}
289

    
290
	public boolean isStarted() {
291
		return started;
292
	}
293

    
294
	public List<String> getListForClass() {
295
		return listForClass;
296
	}
297

    
298
	public String getCurrentElement() {
299
		return currentElement;
300
	}
301

    
302
	public ExecutorService getExecutor() {
303
		return executor;
304
	}
305

    
306
	public void setDatasourceName(final String datasourceName) {
307
		this.datasourceName = datasourceName;
308
	}
309

    
310
	public void setDatasourceInterface(final String datasourceInterface) {
311
		this.datasourceInterface = datasourceInterface;
312
	}
313

    
314
	public void setVirtuosoReaderAPIUrl(final String virtuosoReaderAPIUrl) {
315
		this.virtuosoReaderAPIUrl = virtuosoReaderAPIUrl;
316
	}
317

    
318
	public void setStarted(final boolean started) {
319
		this.started = started;
320
	}
321

    
322
	public void setErrors(final Map<String, Integer> errors) {
323
		this.errors = errors;
324
	}
325

    
326
	public void setListForClass(final List<String> listForClass) {
327
		this.listForClass = listForClass;
328
	}
329

    
330
	public void setElements(final BlockingQueue<String> elements) {
331
		this.elements = elements;
332
	}
333

    
334
	public void setCurrentElement(final String currentElement) {
335
		this.currentElement = currentElement;
336
	}
337

    
338
	public void setExecutor(final ExecutorService executor) {
339
		this.executor = executor;
340
	}
341

    
342
	public void setRestTemplate(final RestTemplate restTemplate) {
343
		this.restTemplate = restTemplate;
344
	}
345
}
(13-13/13)