Project

General

Profile

1
package eu.dnetlib.parthenos.workflows.nodes;
2

    
3
import java.net.URI;
4
import java.net.URISyntaxException;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.NoSuchElementException;
8
import java.util.concurrent.BlockingQueue;
9
import java.util.concurrent.ExecutorService;
10
import java.util.concurrent.Executors;
11
import java.util.concurrent.TimeUnit;
12

    
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Queues;
16
import eu.dnetlib.data.collector.ThreadSafeIterator;
17
import eu.dnetlib.rmi.data.CollectorServiceRuntimeException;
18
import org.apache.commons.lang3.StringEscapeUtils;
19
import org.apache.commons.lang3.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.http.client.utils.URIBuilder;
23
import org.springframework.http.*;
24
import org.springframework.web.client.ResourceAccessException;
25
import org.springframework.web.client.RestClientException;
26
import org.springframework.web.client.RestTemplate;
27

    
28
/**
29
 * Created by Alessia Bardi on 31/01/2018.
30
 *
31
 * @author Alessia Bardi
32
 */
33
public class VirtuosoParthenosIterator extends ThreadSafeIterator {
34

    
35
	private static final Log log = LogFactory.getLog(VirtuosoParthenosIterator.class);
36
	protected static final int QUEUE_TIMEOUT_SECONDS = 100;
37
	public final static String TERMINATOR = "ARNOLD";
38
	public final static String ERROR_TERMINATOR = "SCHWARZ";
39
	protected final static int SLEEP_MS = 5000;
40
	protected final static int MAX_RETRIES = 3;
41

    
42
	private String datasourceName;
43
	private String datasourceInterface;
44
	private String virtuosoReaderAPIUrl;
45
	private boolean started = false;
46
	private Map<String, Integer> errors = Maps.newHashMap();
47
	private List<String> listForClass = Lists.newArrayList();
48
	private BlockingQueue<String> elements = Queues.newArrayBlockingQueue(10);
49

    
50
	private String currentElement = null;
51
	private ExecutorService executor = Executors.newSingleThreadExecutor();
52

    
53
	private RestTemplate restTemplate;
54

    
55

    
56
	private synchronized void verifyStarted() throws Exception {
57
		if (!this.started) {
58
			this.started = true;
59
			fillQueue();
60
			getNextElement(MAX_RETRIES);
61
		}
62
	}
63

    
64
	protected void fillQueue(){
65
		log.info("Virtuoso reader at : " + getVirtuosoReaderAPIUrl());
66
		executor.submit(() -> {
67
			String threadName = Thread.currentThread().getName();
68
			System.out.println("Hello " + threadName);
69
			URIBuilder builder;
70
			try {
71
				builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/graphs");
72
				builder.addParameter("api", getDatasourceInterface());
73
				URI uri = builder.build();
74
				log.debug("fillQueue -- Calling for graph list: " + uri.toString());
75
				List<String> graphList = restTemplate.getForObject(uri, listForClass.getClass());
76
				for (String graph : graphList) {
77
					builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subjects");
78
					builder.addParameter("graph", graph);
79
					uri = builder.build();
80
					log.debug("fillQueue -- Calling for subject list: " + uri.toString());
81
					List<String> subjectList =Lists.newArrayList();
82
					try {
83
						subjectList = restTemplate.getForObject(builder.build(), listForClass.getClass());
84
					}catch(RestClientException rce){
85
						log.warn("Skipping subjects from "+graph+" and sleeping for a while...");
86
						log.warn(rce.getMessage());
87
						Thread.sleep(SLEEP_MS);
88
					}
89
					for (String subject : subjectList) {
90
						//Note: we skip no http resources based on FORTH feedback: URLs are created only for "relevant" stuff
91
						//TODO: is it better to skip them here or in the query run by VirtuosoReadApi?
92
						if (subject.startsWith("http")) {
93
							String xmlFile = tryGetRDF(subject, MAX_RETRIES);
94
							if (StringUtils.isBlank(xmlFile)) {
95
								log.warn("Skipping blank RDF for " + subject);
96
							} else {
97
								elements.offer(xmlFile, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
98
							}
99
						} else {
100
							log.debug("Skipping " + subject + ": no http resource");
101
						}
102
					}
103
				}
104
				elements.offer(TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
105
			} catch (Exception e) {
106
				log.error(e.getMessage());
107
				try {
108
					elements.offer(ERROR_TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
109
				} catch (InterruptedException e1) {
110
					log.error(e1.getMessage());
111
				}
112
			}
113
		});
114
		executor.shutdown();
115

    
116
	}
117

    
118
	protected String tryGetRDF(final String subjectURL, int attempt) throws URISyntaxException, InterruptedException, VirtuosoParthenosException {
119
		log.debug("Quering Api, remaining attempts: "+attempt);
120
		if (attempt <= 0) throw new VirtuosoParthenosException("Too many retries...stopping. please check virtuoso server status");
121
		ResponseEntity<String> response = null;
122
		try {
123
			response = getRDF(subjectURL);
124
		}catch(ResourceAccessException e){
125
			//request timed out --> let's return null and skip this resource
126
			log.warn("Request timeout");
127
			errors.merge(e.getMessage(), 1, Integer::sum);
128
			return null;
129
		}
130
		HttpStatus responseStatus = response.getStatusCode();
131
		if (responseStatus.is2xxSuccessful()) {
132
			String rdfFile = response.getBody();
133
			if(StringUtils.isBlank(rdfFile)){
134
				log.warn("Got blank RDF for "+subjectURL+" , let's try again...");
135
				Thread.sleep(SLEEP_MS);
136
				return tryGetRDF(subjectURL, --attempt);
137
			}
138
			else {
139
				final String xmlFile = completeXML(rdfFile, subjectURL);
140
				log.debug(xmlFile);
141
				return xmlFile;
142
			}
143
		} else {
144
			if (responseStatus.is5xxServerError()) {
145
				//sleep for a while and re-try
146
				log.warn("HTTP ERROR: "+responseStatus.value() + ": " + responseStatus.getReasonPhrase()+": I'll sleep and then try again");
147
				Thread.sleep(SLEEP_MS);
148
				return tryGetRDF(subjectURL, --attempt);
149
			} else {
150
				throw new VirtuosoParthenosException(
151
						"ERROR: Can't get the RDF for " + subjectURL + " " + responseStatus.value() + ": " + responseStatus.getReasonPhrase());
152
			}
153
		}
154
	}
155

    
156
	protected ResponseEntity<String> getRDF(final String subjectURL) throws URISyntaxException {
157
		HttpHeaders headers = new HttpHeaders();
158
		headers.setContentType(MediaType.APPLICATION_XML);
159
		URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subject").addParameter("subjectURL", subjectURL);
160
		URI uri = builder.build();
161
		log.debug("fillQueue -- Calling for subject RDF: " + uri.toString());
162
		HttpEntity<String> entity = new HttpEntity<>("parameters", headers);
163
		return restTemplate.exchange(uri, HttpMethod.GET, entity, String.class);
164
	}
165

    
166

    
167
	public String completeXML(final String rdfFile, final String url) {
168
		String xmlEscapedURL = StringEscapeUtils.escapeXml11(url);
169
		return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><record><header xmlns:dri=\"http://www.driver-repository.eu/namespace/dri\"><dri:objIdentifier>"
170
				+ xmlEscapedURL + "</dri:objIdentifier><dri:datasourceapi>" + datasourceInterface + "</dri:datasourceapi><dri:datasourcename>" + datasourceName
171
				+ "</dri:datasourcename></header><metadata>" + rdfFile + "</metadata></record>";
172
	}
173

    
174
	@Override
175
	public boolean doHasNext() {
176
		try {
177
			verifyStarted();
178
		} catch (Exception e) {
179
			executor.shutdownNow();
180
			throw new CollectorServiceRuntimeException(e);
181
		}
182
		switch(currentElement){
183
		case TERMINATOR:
184
			if(!executor.isShutdown()) executor.shutdownNow();
185
			return false;
186
		case ERROR_TERMINATOR:
187
			if(!executor.isShutdown()) executor.shutdownNow();
188
			throw new CollectorServiceRuntimeException("Error getting elements from virtuoso");
189
		default:
190
			return true;
191
		}
192
	}
193

    
194
	@Override
195
	public String doNext() {
196
		try {
197
			verifyStarted();
198
		} catch (Exception e) {
199
			executor.shutdownNow();
200
			throw new CollectorServiceRuntimeException(e);
201
		}
202
		switch(currentElement){
203
		case TERMINATOR:
204
		case ERROR_TERMINATOR:
205
			if(!executor.isShutdown()) executor.shutdownNow();
206
			throw new NoSuchElementException();
207
		default:
208
			String res = currentElement;
209
			getNextElement(MAX_RETRIES);
210
			return res;
211
		}
212
	}
213

    
214
	private void getNextElement(int attempt){
215
		log.debug("polling from queue, remaining attempts: "+attempt);
216
		if(attempt <= 0) currentElement = ERROR_TERMINATOR;
217
		else{
218
			try {
219
				currentElement = elements.poll(QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
220
				if(currentElement == null){
221
					if (executor.isTerminated()) {
222
						//no additional element and we got the timeout...
223
						String errMessage = "Unexpected timeout reading element queue";
224
						log.warn(errMessage);
225
						errors.merge(errMessage, 1, Integer::sum);
226
						currentElement = ERROR_TERMINATOR;
227
					} else {
228
						//executor is not terminated: let's try again
229
						log.warn("Queue timed out, but I'll try again (#"+attempt+")");
230
						getNextElement(--attempt);
231
					}
232
				}
233
			} catch (InterruptedException e) {
234
				//current thread interrupted. Let's end.
235
				currentElement = ERROR_TERMINATOR;
236
				executor.shutdownNow();
237
			}
238
		}
239
	}
240

    
241
	@Override
242
	public void remove() {
243
		throw new UnsupportedOperationException();
244
	}
245

    
246
	public VirtuosoParthenosIterator datasourceInterface(final String datasourceInterface) {
247
		this.datasourceInterface = datasourceInterface;
248
		return this;
249
	}
250

    
251
	public VirtuosoParthenosIterator virtuosoReaderAPIUrl(final String virtuosoReaderAPIUrl) {
252
		this.virtuosoReaderAPIUrl = virtuosoReaderAPIUrl;
253
		return this;
254
	}
255

    
256
	public VirtuosoParthenosIterator datasourceName(final String datasourceName) {
257
		this.datasourceName = datasourceName;
258
		return this;
259
	}
260

    
261
	public VirtuosoParthenosIterator errors(final Map<String, Integer> errors) {
262
		this.errors = errors;
263
		return this;
264
	}
265

    
266
	public String getDatasourceInterface() {
267
		return datasourceInterface;
268
	}
269

    
270
	public String getVirtuosoReaderAPIUrl() {
271
		return virtuosoReaderAPIUrl;
272
	}
273

    
274
	public Map<String, Integer> getErrors() {
275
		return errors;
276
	}
277

    
278
	public BlockingQueue<String> getElements() {
279
		return elements;
280
	}
281

    
282
	public RestTemplate getRestTemplate() {
283
		return restTemplate;
284
	}
285

    
286
	public VirtuosoParthenosIterator restTemplate(final RestTemplate restTemplate) {
287
		this.restTemplate = restTemplate;
288
		return this;
289
	}
290
}
(12-12/12)