Project

General

Profile

1
package eu.dnetlib.parthenos.workflows.nodes;
2

    
3
import java.net.URI;
4
import java.net.URISyntaxException;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.NoSuchElementException;
8
import java.util.concurrent.BlockingQueue;
9
import java.util.concurrent.ExecutorService;
10
import java.util.concurrent.Executors;
11
import java.util.concurrent.TimeUnit;
12

    
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Queues;
16
import eu.dnetlib.data.collector.ThreadSafeIterator;
17
import eu.dnetlib.rmi.data.CollectorServiceRuntimeException;
18
import org.apache.commons.lang3.StringEscapeUtils;
19
import org.apache.commons.lang3.StringUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.http.client.utils.URIBuilder;
23
import org.springframework.http.*;
24
import org.springframework.web.client.RestTemplate;
25

    
26
/**
27
 * Created by Alessia Bardi on 31/01/2018.
28
 *
29
 * @author Alessia Bardi
30
 */
31
public class VirtuosoParthenosIterator extends ThreadSafeIterator {
32

    
33
	private static final Log log = LogFactory.getLog(VirtuosoParthenosIterator.class);
34
	protected static final int QUEUE_TIMEOUT_SECONDS = 30;
35
	public final static String TERMINATOR = "ARNOLD";
36
	public final static String ERROR_TERMINATOR = "SCHWARZ";
37
	protected final static int SLEEP_MS = 5000;
38
	protected final static int MAX_RETRIES = 3;
39

    
40
	private String datasourceName;
41
	private String datasourceInterface;
42
	private String virtuosoReaderAPIUrl;
43
	private boolean started = false;
44
	private Map<String, Integer> errors = Maps.newHashMap();
45
	private List<String> listForClass = Lists.newArrayList();
46
	private BlockingQueue<String> elements = Queues.newArrayBlockingQueue(10);
47

    
48
	private String currentElement = null;
49
	private RestTemplate restTemplate = new RestTemplate();
50
	private ExecutorService executor = Executors.newSingleThreadExecutor();
51

    
52

    
53
	private synchronized void verifyStarted() throws Exception {
54
		if (!this.started) {
55
			this.started = true;
56
			fillQueue();
57
			getNextElement(MAX_RETRIES);
58
		}
59
	}
60

    
61
	protected void fillQueue(){
62
		log.info("Virtuoso reader at : " + getVirtuosoReaderAPIUrl());
63
		executor.submit(() -> {
64
			String threadName = Thread.currentThread().getName();
65
			System.out.println("Hello " + threadName);
66
			URIBuilder builder;
67
			try {
68
				builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/graphs");
69
				builder.addParameter("api", getDatasourceInterface());
70
				URI uri = builder.build();
71
				log.debug("fillQueue -- Calling for graph list: " + uri.toString());
72
				List<String> graphList = restTemplate.getForObject(uri, listForClass.getClass());
73
				for (String graph : graphList) {
74
					builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subjects");
75
					builder.addParameter("graph", graph);
76
					uri = builder.build();
77
					log.debug("fillQueue -- Calling for subject list: " + uri.toString());
78
					List<String> subjectList = restTemplate.getForObject(builder.build(), listForClass.getClass());
79
					for (String subject : subjectList) {
80
						//Note: we skip no http resources based on FORTH feedback: URLs are created only for "relevant" stuff
81
						if(subject.startsWith("http")) {
82
							String xmlFile = tryGetRDF(subject, MAX_RETRIES);
83
							elements.offer(xmlFile, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
84
						}
85
						else{
86
							log.debug("Skipping "+subject+": no http resource");
87
						}
88
					}
89
				}
90
				elements.offer(TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
91
			} catch (Exception e) {
92
				log.error(e.getMessage());
93
				try {
94
					elements.offer(ERROR_TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
95
				} catch (InterruptedException e1) {
96
					log.error(e1.getMessage());
97
				}
98
			}
99
		});
100
		executor.shutdown();
101

    
102
	}
103

    
104
	protected String tryGetRDF(final String subjectURL, int attempt) throws URISyntaxException, InterruptedException, VirtuosoParthenosException {
105
		log.debug("Quering Api, remaining attempts: "+attempt);
106
		if (attempt <= 0) throw new VirtuosoParthenosException("Too many retries...stopping. please check virtuoso server status");
107
		ResponseEntity<String> response = getRDF(subjectURL);
108
		HttpStatus responseStatus = response.getStatusCode();
109
		if (responseStatus.is2xxSuccessful()) {
110
			String rdfFile = response.getBody();
111
			if(StringUtils.isBlank(rdfFile)){
112
				log.warn("Got blank RDF for "+subjectURL+" , let's try again...");
113
				Thread.sleep(SLEEP_MS);
114
				return tryGetRDF(subjectURL, --attempt);
115
			}
116
			else {
117
				final String xmlFile = completeXML(rdfFile, subjectURL);
118
				log.debug(xmlFile);
119
				return xmlFile;
120
			}
121
		} else {
122
			if (responseStatus.is5xxServerError()) {
123
				//sleep for a while and re-try
124
				log.warn("HTTP ERROR: "+responseStatus.value() + ": " + responseStatus.getReasonPhrase()+": I'll sleep and then try again");
125
				Thread.sleep(SLEEP_MS);
126
				return tryGetRDF(subjectURL, --attempt);
127
			} else {
128
				throw new VirtuosoParthenosException(
129
						"ERROR: Can't get the RDF for " + subjectURL + " " + responseStatus.value() + ": " + responseStatus.getReasonPhrase());
130
			}
131
		}
132
	}
133

    
134
	protected ResponseEntity<String> getRDF(final String subjectURL) throws URISyntaxException {
135
		HttpHeaders headers = new HttpHeaders();
136
		headers.setContentType(MediaType.APPLICATION_XML);
137
		URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subject").addParameter("subjectURL", subjectURL);
138
		URI uri = builder.build();
139
		log.debug("fillQueue -- Calling for subject RDF: " + uri.toString());
140
		HttpEntity<String> entity = new HttpEntity<>("parameters", headers);
141
		return restTemplate.exchange(uri, HttpMethod.GET, entity, String.class);
142
	}
143

    
144

    
145
	public String completeXML(final String rdfFile, final String url) {
146
		String xmlEscapedURL = StringEscapeUtils.escapeXml11(url);
147
		return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><record><header xmlns:dri=\"http://www.driver-repository.eu/namespace/dri\"><dri:objIdentifier>"
148
				+ xmlEscapedURL + "</dri:objIdentifier><dri:datasourceapi>" + datasourceInterface + "</dri:datasourceapi><dri:datasourcename>" + datasourceName
149
				+ "</dri:datasourcename></header><metadata>" + rdfFile + "</metadata></record>";
150
	}
151

    
152
	@Override
153
	public boolean doHasNext() {
154
		try {
155
			verifyStarted();
156
		} catch (Exception e) {
157
			executor.shutdownNow();
158
			throw new CollectorServiceRuntimeException(e);
159
		}
160
		switch(currentElement){
161
		case TERMINATOR:
162
		case ERROR_TERMINATOR:
163
			if(!executor.isShutdown()) executor.shutdownNow();
164
			return false;
165
		default:
166
			return true;
167
		}
168

    
169
	}
170

    
171
	@Override
172
	public String doNext() {
173
		try {
174
			verifyStarted();
175
		} catch (Exception e) {
176
			executor.shutdownNow();
177
			throw new CollectorServiceRuntimeException(e);
178
		}
179
		switch(currentElement){
180
		case TERMINATOR:
181
		case ERROR_TERMINATOR:
182
			if(!executor.isShutdown()) executor.shutdownNow();
183
			throw new NoSuchElementException();
184
		default:
185
			String res = currentElement;
186
			getNextElement(MAX_RETRIES);
187
			return res;
188
		}
189
	}
190

    
191
	private void getNextElement(int attempt){
192
		log.debug("polling from queue, remaining attempts: "+attempt);
193
		if(attempt <= 0) currentElement = ERROR_TERMINATOR;
194
		else{
195
			try {
196
				currentElement = elements.poll(QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
197
				if(currentElement == null){
198
					if (executor.isTerminated()) {
199
						//no additional element and we got the timeout...
200
						String errMessage = "Unexpected timeout reading element queue";
201
						log.warn(errMessage);
202
						errors.merge(errMessage, 1, Integer::sum);
203
						currentElement = ERROR_TERMINATOR;
204
					} else {
205
						//executor is not terminated: let's try again
206
						log.warn("Queue timed out, but I'll try again (#"+attempt+")");
207
						getNextElement(--attempt);
208
					}
209
				}
210
			} catch (InterruptedException e) {
211
				//current thread interrupted. Let's end.
212
				currentElement = ERROR_TERMINATOR;
213
				executor.shutdownNow();
214
			}
215
		}
216
	}
217

    
218
	@Override
219
	public void remove() {
220
		throw new UnsupportedOperationException();
221
	}
222

    
223
	public VirtuosoParthenosIterator datasourceInterface(final String datasourceInterface) {
224
		this.datasourceInterface = datasourceInterface;
225
		return this;
226
	}
227

    
228
	public VirtuosoParthenosIterator virtuosoReaderAPIUrl(final String virtuosoReaderAPIUrl) {
229
		this.virtuosoReaderAPIUrl = virtuosoReaderAPIUrl;
230
		return this;
231
	}
232

    
233
	public VirtuosoParthenosIterator datasourceName(final String datasourceName) {
234
		this.datasourceName = datasourceName;
235
		return this;
236
	}
237

    
238
	public VirtuosoParthenosIterator errors(final Map<String, Integer> errors) {
239
		this.errors = errors;
240
		return this;
241
	}
242

    
243
	public String getDatasourceInterface() {
244
		return datasourceInterface;
245
	}
246

    
247
	public String getVirtuosoReaderAPIUrl() {
248
		return virtuosoReaderAPIUrl;
249
	}
250

    
251
	public Map<String, Integer> getErrors() {
252
		return errors;
253
	}
254

    
255
	public BlockingQueue<String> getElements() {
256
		return elements;
257
	}
258
}
(12-12/12)