Project

General

Profile

« Previous | Next » 

Revision 50796

Simplified code of VirtuosoIterator. Only one queue and not too many parallel calls: let's see if virtuoso is more happy and if debugging is easier.

View differences:

VirtuosoParthenosIterator.java
1 1
package eu.dnetlib.parthenos.workflows.nodes;
2 2

  
3 3
import java.net.URI;
4
import java.net.URISyntaxException;
4 5
import java.util.List;
5 6
import java.util.Map;
6 7
import java.util.NoSuchElementException;
7
import java.util.concurrent.BlockingQueue;
8
import java.util.concurrent.TimeUnit;
8
import java.util.concurrent.*;
9 9

  
10 10
import com.google.common.collect.Lists;
11 11
import com.google.common.collect.Maps;
......
29 29
	private static final Log log = LogFactory.getLog(VirtuosoParthenosIterator.class);
30 30
	protected static final int QUEUE_TIMEOUT_SECONDS = 5;
31 31
	public final static String TERMINATOR = "ARNOLD";
32
	public final static String ERROR_TERMINATOR = "SCHWARZ";
33
	protected final static int SLEEP_MS = 1000;
34
	protected final static int MAX_RETRIES = 3;
32 35

  
33 36
	private String datasourceName;
34 37
	private String datasourceInterface;
......
36 39
	private boolean started = false;
37 40
	private Map<String, Integer> errors = Maps.newHashMap();
38 41
	private List<String> listForClass = Lists.newArrayList();
39
	private BlockingQueue<String> graphs = Queues.newArrayBlockingQueue(10);
40
	private BlockingQueue<String> subjects = Queues.newArrayBlockingQueue(10);
41 42
	private BlockingQueue<String> elements = Queues.newArrayBlockingQueue(10);
42 43

  
43 44
	private String currentElement = null;
44 45
	private RestTemplate restTemplate = new RestTemplate();
46
	private ExecutorService executor = Executors.newSingleThreadExecutor();
45 47

  
46 48

  
47
	private synchronized void verifyStarted() throws InterruptedException {
49
	private synchronized void verifyStarted() throws Exception {
48 50
		if (!this.started) {
49 51
			this.started = true;
50
			fillGraphQueue();
51
			fillSubjectsQueue();
52
			fillElementsQueue();
53
			currentElement = elements.take();
52
			fillQueue();
53
			getNextElement(MAX_RETRIES);
54 54
		}
55 55
	}
56 56

  
57
	protected void fillGraphQueue(){
58
		new Thread(() -> {
59
			log.info("Virtuoso reader at : " + getVirtuosoReaderAPIUrl());
57
	protected void fillQueue() throws Exception {
58
		log.info("Virtuoso reader at : " + getVirtuosoReaderAPIUrl());
59
		executor.submit(() -> {
60
			String threadName = Thread.currentThread().getName();
61
			System.out.println("Hello " + threadName);
62
			URIBuilder builder;
60 63
			try {
61
				URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/graphs");
64
				builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/graphs");
62 65
				builder.addParameter("api", getDatasourceInterface());
63 66
				URI uri = builder.build();
64
				log.debug("fillGraphQueue -- Calling: "+uri.toString());
65

  
67
				log.debug("fillQueue -- Calling for graph list: " + uri.toString());
66 68
				List<String> graphList = restTemplate.getForObject(uri, listForClass.getClass());
67
				for (String g : graphList) {
68
					graphs.offer(g, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
69
				for (String graph : graphList) {
70
					builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subjects");
71
					builder.addParameter("graph", graph);
72
					uri = builder.build();
73
					log.debug("fillQueue -- Calling for subject list: " + uri.toString());
74
					List<String> subjectList = restTemplate.getForObject(builder.build(), listForClass.getClass());
75
					for (String subject : subjectList) {
76
						String xmlFile = tryGetRDF(subject, MAX_RETRIES);
77
						elements.offer(xmlFile, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
78
					}
69 79
				}
70
			} catch (Throwable t) {
71
				log.error("Exception in fillGraphQueue:\n"+t);
72
				errors.merge(t.getMessage(), 1, Integer::sum);
73
			}
74
			finally {
80
				elements.offer(TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
81
			} catch (Exception e) {
82
				log.error(e.getMessage());
75 83
				try {
76
					graphs.offer(TERMINATOR,QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
77
				} catch (InterruptedException e) {
78
					log.error("Exception in fillGraphQueue:\n"+e);
84
					elements.offer(ERROR_TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
85
				} catch (InterruptedException e1) {
86
					log.error(e1.getMessage());
79 87
				}
80 88
			}
81
		}).start();
89
		});
90
		executor.shutdown();
82 91

  
83

  
84 92
	}
85 93

  
86
	protected void fillSubjectsQueue(){
87
		new Thread(() -> {
88
			try {
89
				String graph = graphs.take();
90
				while(!graph.equals(TERMINATOR)){
91
					URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subjects");
92
					builder.addParameter("graph", graph);
93
					URI uri = builder.build();
94
					log.debug("fillSubjectsQueue -- Calling: "+uri.toString());
95
					List<String> subjectList = restTemplate.getForObject(builder.build(), listForClass.getClass());
96
					for (String s : subjectList) {
97
						subjects.offer(s,QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
98
					}
99
					graph = graphs.take();
100
				}
101
			} catch (Throwable t) {
102
				log.error("Exception in fillSubjectsQueue:\n"+t);
103
				errors.merge(t.getMessage(), 1, Integer::sum);
94
	protected String tryGetRDF(final String subjectURL, int attempt) throws URISyntaxException, InterruptedException, VirtuosoParthenosException {
95
		if (attempt <= 0) throw new VirtuosoParthenosException("Too many retries...stopping. please check virtuoso server status");
96
		ResponseEntity<String> response = getRDF(subjectURL);
97
		HttpStatus responseStatus = response.getStatusCode();
98
		if (responseStatus.is2xxSuccessful()) {
99
			String rdfFile = response.getBody();
100
			final String xmlFile = completeXML(rdfFile, subjectURL);
101
			log.debug(xmlFile);
102
			return xmlFile;
103
		} else {
104
			if (responseStatus.is5xxServerError()) {
105
				//sleep for a while and re-try
106
				Thread.sleep(SLEEP_MS);
107
				return tryGetRDF(subjectURL, --attempt);
108
			} else {
109
				throw new VirtuosoParthenosException(
110
						"ERROR: Can't get the RDF for " + subjectURL + " " + responseStatus.value() + ": " + responseStatus.getReasonPhrase());
104 111
			}
105
			finally {
106
				try {
107
					subjects.offer(TERMINATOR,QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
108
				} catch (InterruptedException e) {
109
					log.error("Exception in fillSubjectsQueue:\n"+e);
110
				}
111
			}
112
		}).start();
112
		}
113 113
	}
114 114

  
115
	protected void fillElementsQueue(){
115
	protected ResponseEntity<String> getRDF(final String subjectURL) throws URISyntaxException {
116 116
		HttpHeaders headers = new HttpHeaders();
117 117
		headers.setContentType(MediaType.APPLICATION_XML);
118
		new Thread(() -> {
119
			try {
120
				String subject = subjects.take();
121
				while(!subject.equals(TERMINATOR)) {
122
					URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subject").addParameter("subjectURL", subject);
123
					URI uri = builder.build();
124
					log.debug("fillElementsQueue -- Calling: "+uri.toString());
125
					HttpEntity<String> entity = new HttpEntity<>("parameters", headers);
126
					ResponseEntity<String> response = restTemplate.exchange(uri, HttpMethod.GET, entity, String.class);
127
					log.debug("Result - status ("+ response.getStatusCode() + ") has body: " + response.hasBody());
128
					String rdfFile = response.getBody();
129
					final String xmlFile = completeXML(rdfFile, subject);
130
					elements.offer(xmlFile, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
131
					log.debug(xmlFile);
132
					subject = subjects.take();
133
				}
134
			} catch (Throwable t) {
135
				log.error("Exception in fillElementsQueue:\n"+t);
136
				errors.merge(t.getMessage(), 1, Integer::sum);
137
			}
138
			finally {
139
				try {
140
					elements.offer(TERMINATOR,QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
141
				} catch (InterruptedException e) {
142
					log.error("Exception in fillElementsQueue:\n"+e);
143
				}
144
			}
145
		}).start();
118
		URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subject").addParameter("subjectURL", subjectURL);
119
		URI uri = builder.build();
120
		log.debug("fillQueue -- Calling for subject RDF: " + uri.toString());
121
		HttpEntity<String> entity = new HttpEntity<>("parameters", headers);
122
		return restTemplate.exchange(uri, HttpMethod.GET, entity, String.class);
146 123
	}
147 124

  
148 125

  
149
	public String completeXML(final String rdfFile, final String url){
126
	public String completeXML(final String rdfFile, final String url) {
150 127
		String xmlEscapedURL = StringEscapeUtils.escapeXml11(url);
151
		return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><record><header xmlns:dri=\"http://www.driver-repository.eu/namespace/dri\"><dri:objIdentifier>"+xmlEscapedURL+"</dri:objIdentifier><dri:datasourceapi>"+datasourceInterface+"</dri:datasourceapi><dri:datasourcename>"+datasourceName+"</dri:datasourcename></header><metadata>"+rdfFile+"</metadata></record>";
128
		return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><record><header xmlns:dri=\"http://www.driver-repository.eu/namespace/dri\"><dri:objIdentifier>"
129
				+ xmlEscapedURL + "</dri:objIdentifier><dri:datasourceapi>" + datasourceInterface + "</dri:datasourceapi><dri:datasourcename>" + datasourceName
130
				+ "</dri:datasourcename></header><metadata>" + rdfFile + "</metadata></record>";
152 131
	}
153 132

  
154

  
155 133
	@Override
156 134
	public boolean doHasNext() {
157 135
		try {
158 136
			verifyStarted();
159
		} catch (InterruptedException e) {
137
		} catch (Exception e) {
138
			executor.shutdownNow();
160 139
			throw new CollectorServiceRuntimeException(e);
161 140
		}
162
		return !currentElement.equals(TERMINATOR);
141
		switch(currentElement){
142
		case TERMINATOR:
143
		case ERROR_TERMINATOR:
144
			if(!executor.isShutdown()) executor.shutdownNow();
145
			return false;
146
		default:
147
			return true;
148
		}
149

  
163 150
	}
164 151

  
165 152
	@Override
166 153
	public String doNext() {
167 154
		try {
168 155
			verifyStarted();
169
		} catch (InterruptedException e) {
156
		} catch (Exception e) {
157
			executor.shutdownNow();
170 158
			throw new CollectorServiceRuntimeException(e);
171 159
		}
172
		if(currentElement.equals(TERMINATOR))
160
		switch(currentElement){
161
		case TERMINATOR:
162
		case ERROR_TERMINATOR:
163
			if(!executor.isShutdown()) executor.shutdownNow();
173 164
			throw new NoSuchElementException();
165
		default:
166
			String res = currentElement;
167
			getNextElement(MAX_RETRIES);
168
			return res;
169
		}
170
	}
171

  
172
	private void getNextElement(int attempt){
173
		log.warn("polling from queue, remaining attempts: "+attempt);
174
		if(attempt <= 0) currentElement = ERROR_TERMINATOR;
174 175
		else{
175
			String res = currentElement;
176 176
			try {
177
				currentElement = elements.take();
177
				currentElement = elements.poll(QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
178 178
			} catch (InterruptedException e) {
179
				log.error("Exception in doNext:\n"+e);
180
				errors.merge(e.getMessage(), 1, Integer::sum);
179
				if (executor.isTerminated()) {
180
					//no additional element and we got the timeout...
181
					String errMessage = "Unexpected timeout reading element queue";
182
					log.warn(errMessage);
183
					errors.merge(errMessage, 1, Integer::sum);
184
					currentElement = ERROR_TERMINATOR;
185
				} else {
186
					//executor is not terminated: let's try again
187
					log.warn("Queue timed out, but I'll try again");
188
					getNextElement(--attempt);
189
				}
181 190
			}
182
			return res;
183 191
		}
184

  
185 192
	}
186 193

  
187 194
	@Override
......
221 228
		return errors;
222 229
	}
223 230

  
224
	public BlockingQueue<String> getGraphs() {
225
		return graphs;
226
	}
227

  
228
	public BlockingQueue<String> getSubjects() {
229
		return subjects;
230
	}
231

  
232 231
	public BlockingQueue<String> getElements() {
233 232
		return elements;
234 233
	}

Also available in: Unified diff