Revision 50796
Added by Alessia Bardi over 6 years ago
VirtuosoParthenosIterator.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.parthenos.workflows.nodes; |
2 | 2 |
|
3 | 3 |
import java.net.URI; |
4 |
import java.net.URISyntaxException; |
|
4 | 5 |
import java.util.List; |
5 | 6 |
import java.util.Map; |
6 | 7 |
import java.util.NoSuchElementException; |
7 |
import java.util.concurrent.BlockingQueue; |
|
8 |
import java.util.concurrent.TimeUnit; |
|
8 |
import java.util.concurrent.*; |
|
9 | 9 |
|
10 | 10 |
import com.google.common.collect.Lists; |
11 | 11 |
import com.google.common.collect.Maps; |
... | ... | |
29 | 29 |
private static final Log log = LogFactory.getLog(VirtuosoParthenosIterator.class); |
30 | 30 |
protected static final int QUEUE_TIMEOUT_SECONDS = 5; |
31 | 31 |
public final static String TERMINATOR = "ARNOLD"; |
32 |
public final static String ERROR_TERMINATOR = "SCHWARZ"; |
|
33 |
protected final static int SLEEP_MS = 1000; |
|
34 |
protected final static int MAX_RETRIES = 3; |
|
32 | 35 |
|
33 | 36 |
private String datasourceName; |
34 | 37 |
private String datasourceInterface; |
... | ... | |
36 | 39 |
private boolean started = false; |
37 | 40 |
private Map<String, Integer> errors = Maps.newHashMap(); |
38 | 41 |
private List<String> listForClass = Lists.newArrayList(); |
39 |
private BlockingQueue<String> graphs = Queues.newArrayBlockingQueue(10); |
|
40 |
private BlockingQueue<String> subjects = Queues.newArrayBlockingQueue(10); |
|
41 | 42 |
private BlockingQueue<String> elements = Queues.newArrayBlockingQueue(10); |
42 | 43 |
|
43 | 44 |
private String currentElement = null; |
44 | 45 |
private RestTemplate restTemplate = new RestTemplate(); |
46 |
private ExecutorService executor = Executors.newSingleThreadExecutor(); |
|
45 | 47 |
|
46 | 48 |
|
47 |
private synchronized void verifyStarted() throws InterruptedException {
|
|
49 |
private synchronized void verifyStarted() throws Exception { |
|
48 | 50 |
if (!this.started) { |
49 | 51 |
this.started = true; |
50 |
fillGraphQueue(); |
|
51 |
fillSubjectsQueue(); |
|
52 |
fillElementsQueue(); |
|
53 |
currentElement = elements.take(); |
|
52 |
fillQueue(); |
|
53 |
getNextElement(MAX_RETRIES); |
|
54 | 54 |
} |
55 | 55 |
} |
56 | 56 |
|
57 |
protected void fillGraphQueue(){ |
|
58 |
new Thread(() -> { |
|
59 |
log.info("Virtuoso reader at : " + getVirtuosoReaderAPIUrl()); |
|
57 |
protected void fillQueue() throws Exception { |
|
58 |
log.info("Virtuoso reader at : " + getVirtuosoReaderAPIUrl()); |
|
59 |
executor.submit(() -> { |
|
60 |
String threadName = Thread.currentThread().getName(); |
|
61 |
System.out.println("Hello " + threadName); |
|
62 |
URIBuilder builder; |
|
60 | 63 |
try { |
61 |
URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/graphs");
|
|
64 |
builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/graphs"); |
|
62 | 65 |
builder.addParameter("api", getDatasourceInterface()); |
63 | 66 |
URI uri = builder.build(); |
64 |
log.debug("fillGraphQueue -- Calling: "+uri.toString()); |
|
65 |
|
|
67 |
log.debug("fillQueue -- Calling for graph list: " + uri.toString()); |
|
66 | 68 |
List<String> graphList = restTemplate.getForObject(uri, listForClass.getClass()); |
67 |
for (String g : graphList) { |
|
68 |
graphs.offer(g, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
|
69 |
for (String graph : graphList) { |
|
70 |
builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subjects"); |
|
71 |
builder.addParameter("graph", graph); |
|
72 |
uri = builder.build(); |
|
73 |
log.debug("fillQueue -- Calling for subject list: " + uri.toString()); |
|
74 |
List<String> subjectList = restTemplate.getForObject(builder.build(), listForClass.getClass()); |
|
75 |
for (String subject : subjectList) { |
|
76 |
String xmlFile = tryGetRDF(subject, MAX_RETRIES); |
|
77 |
elements.offer(xmlFile, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
|
78 |
} |
|
69 | 79 |
} |
70 |
} catch (Throwable t) { |
|
71 |
log.error("Exception in fillGraphQueue:\n"+t); |
|
72 |
errors.merge(t.getMessage(), 1, Integer::sum); |
|
73 |
} |
|
74 |
finally { |
|
80 |
elements.offer(TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
|
81 |
} catch (Exception e) { |
|
82 |
log.error(e.getMessage()); |
|
75 | 83 |
try { |
76 |
graphs.offer(TERMINATOR,QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
|
77 |
} catch (InterruptedException e) { |
|
78 |
log.error("Exception in fillGraphQueue:\n"+e);
|
|
84 |
elements.offer(ERROR_TERMINATOR, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
|
85 |
} catch (InterruptedException e1) {
|
|
86 |
log.error(e1.getMessage());
|
|
79 | 87 |
} |
80 | 88 |
} |
81 |
}).start(); |
|
89 |
}); |
|
90 |
executor.shutdown(); |
|
82 | 91 |
|
83 |
|
|
84 | 92 |
} |
85 | 93 |
|
86 |
protected void fillSubjectsQueue(){ |
|
87 |
new Thread(() -> { |
|
88 |
try { |
|
89 |
String graph = graphs.take(); |
|
90 |
while(!graph.equals(TERMINATOR)){ |
|
91 |
URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subjects"); |
|
92 |
builder.addParameter("graph", graph); |
|
93 |
URI uri = builder.build(); |
|
94 |
log.debug("fillSubjectsQueue -- Calling: "+uri.toString()); |
|
95 |
List<String> subjectList = restTemplate.getForObject(builder.build(), listForClass.getClass()); |
|
96 |
for (String s : subjectList) { |
|
97 |
subjects.offer(s,QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
|
98 |
} |
|
99 |
graph = graphs.take(); |
|
100 |
} |
|
101 |
} catch (Throwable t) { |
|
102 |
log.error("Exception in fillSubjectsQueue:\n"+t); |
|
103 |
errors.merge(t.getMessage(), 1, Integer::sum); |
|
94 |
protected String tryGetRDF(final String subjectURL, int attempt) throws URISyntaxException, InterruptedException, VirtuosoParthenosException { |
|
95 |
if (attempt <= 0) throw new VirtuosoParthenosException("Too many retries...stopping. please check virtuoso server status"); |
|
96 |
ResponseEntity<String> response = getRDF(subjectURL); |
|
97 |
HttpStatus responseStatus = response.getStatusCode(); |
|
98 |
if (responseStatus.is2xxSuccessful()) { |
|
99 |
String rdfFile = response.getBody(); |
|
100 |
final String xmlFile = completeXML(rdfFile, subjectURL); |
|
101 |
log.debug(xmlFile); |
|
102 |
return xmlFile; |
|
103 |
} else { |
|
104 |
if (responseStatus.is5xxServerError()) { |
|
105 |
//sleep for a while and re-try |
|
106 |
Thread.sleep(SLEEP_MS); |
|
107 |
return tryGetRDF(subjectURL, --attempt); |
|
108 |
} else { |
|
109 |
throw new VirtuosoParthenosException( |
|
110 |
"ERROR: Can't get the RDF for " + subjectURL + " " + responseStatus.value() + ": " + responseStatus.getReasonPhrase()); |
|
104 | 111 |
} |
105 |
finally { |
|
106 |
try { |
|
107 |
subjects.offer(TERMINATOR,QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
|
108 |
} catch (InterruptedException e) { |
|
109 |
log.error("Exception in fillSubjectsQueue:\n"+e); |
|
110 |
} |
|
111 |
} |
|
112 |
}).start(); |
|
112 |
} |
|
113 | 113 |
} |
114 | 114 |
|
115 |
protected void fillElementsQueue(){
|
|
115 |
protected ResponseEntity<String> getRDF(final String subjectURL) throws URISyntaxException {
|
|
116 | 116 |
HttpHeaders headers = new HttpHeaders(); |
117 | 117 |
headers.setContentType(MediaType.APPLICATION_XML); |
118 |
new Thread(() -> { |
|
119 |
try { |
|
120 |
String subject = subjects.take(); |
|
121 |
while(!subject.equals(TERMINATOR)) { |
|
122 |
URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subject").addParameter("subjectURL", subject); |
|
123 |
URI uri = builder.build(); |
|
124 |
log.debug("fillElementsQueue -- Calling: "+uri.toString()); |
|
125 |
HttpEntity<String> entity = new HttpEntity<>("parameters", headers); |
|
126 |
ResponseEntity<String> response = restTemplate.exchange(uri, HttpMethod.GET, entity, String.class); |
|
127 |
log.debug("Result - status ("+ response.getStatusCode() + ") has body: " + response.hasBody()); |
|
128 |
String rdfFile = response.getBody(); |
|
129 |
final String xmlFile = completeXML(rdfFile, subject); |
|
130 |
elements.offer(xmlFile, QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
|
131 |
log.debug(xmlFile); |
|
132 |
subject = subjects.take(); |
|
133 |
} |
|
134 |
} catch (Throwable t) { |
|
135 |
log.error("Exception in fillElementsQueue:\n"+t); |
|
136 |
errors.merge(t.getMessage(), 1, Integer::sum); |
|
137 |
} |
|
138 |
finally { |
|
139 |
try { |
|
140 |
elements.offer(TERMINATOR,QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS); |
|
141 |
} catch (InterruptedException e) { |
|
142 |
log.error("Exception in fillElementsQueue:\n"+e); |
|
143 |
} |
|
144 |
} |
|
145 |
}).start(); |
|
118 |
URIBuilder builder = new URIBuilder(getVirtuosoReaderAPIUrl() + "/subject").addParameter("subjectURL", subjectURL); |
|
119 |
URI uri = builder.build(); |
|
120 |
log.debug("fillQueue -- Calling for subject RDF: " + uri.toString()); |
|
121 |
HttpEntity<String> entity = new HttpEntity<>("parameters", headers); |
|
122 |
return restTemplate.exchange(uri, HttpMethod.GET, entity, String.class); |
|
146 | 123 |
} |
147 | 124 |
|
148 | 125 |
|
149 |
public String completeXML(final String rdfFile, final String url){ |
|
126 |
public String completeXML(final String rdfFile, final String url) {
|
|
150 | 127 |
String xmlEscapedURL = StringEscapeUtils.escapeXml11(url); |
151 |
return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><record><header xmlns:dri=\"http://www.driver-repository.eu/namespace/dri\"><dri:objIdentifier>"+xmlEscapedURL+"</dri:objIdentifier><dri:datasourceapi>"+datasourceInterface+"</dri:datasourceapi><dri:datasourcename>"+datasourceName+"</dri:datasourcename></header><metadata>"+rdfFile+"</metadata></record>"; |
|
128 |
return "<?xml version=\"1.0\" encoding=\"UTF-8\"?><record><header xmlns:dri=\"http://www.driver-repository.eu/namespace/dri\"><dri:objIdentifier>" |
|
129 |
+ xmlEscapedURL + "</dri:objIdentifier><dri:datasourceapi>" + datasourceInterface + "</dri:datasourceapi><dri:datasourcename>" + datasourceName |
|
130 |
+ "</dri:datasourcename></header><metadata>" + rdfFile + "</metadata></record>"; |
|
152 | 131 |
} |
153 | 132 |
|
154 |
|
|
155 | 133 |
@Override |
156 | 134 |
public boolean doHasNext() { |
157 | 135 |
try { |
158 | 136 |
verifyStarted(); |
159 |
} catch (InterruptedException e) { |
|
137 |
} catch (Exception e) { |
|
138 |
executor.shutdownNow(); |
|
160 | 139 |
throw new CollectorServiceRuntimeException(e); |
161 | 140 |
} |
162 |
return !currentElement.equals(TERMINATOR); |
|
141 |
switch(currentElement){ |
|
142 |
case TERMINATOR: |
|
143 |
case ERROR_TERMINATOR: |
|
144 |
if(!executor.isShutdown()) executor.shutdownNow(); |
|
145 |
return false; |
|
146 |
default: |
|
147 |
return true; |
|
148 |
} |
|
149 |
|
|
163 | 150 |
} |
164 | 151 |
|
165 | 152 |
@Override |
166 | 153 |
public String doNext() { |
167 | 154 |
try { |
168 | 155 |
verifyStarted(); |
169 |
} catch (InterruptedException e) { |
|
156 |
} catch (Exception e) { |
|
157 |
executor.shutdownNow(); |
|
170 | 158 |
throw new CollectorServiceRuntimeException(e); |
171 | 159 |
} |
172 |
if(currentElement.equals(TERMINATOR)) |
|
160 |
switch(currentElement){ |
|
161 |
case TERMINATOR: |
|
162 |
case ERROR_TERMINATOR: |
|
163 |
if(!executor.isShutdown()) executor.shutdownNow(); |
|
173 | 164 |
throw new NoSuchElementException(); |
165 |
default: |
|
166 |
String res = currentElement; |
|
167 |
getNextElement(MAX_RETRIES); |
|
168 |
return res; |
|
169 |
} |
|
170 |
} |
|
171 |
|
|
172 |
private void getNextElement(int attempt){ |
|
173 |
log.warn("polling from queue, remaining attempts: "+attempt); |
|
174 |
if(attempt <= 0) currentElement = ERROR_TERMINATOR; |
|
174 | 175 |
else{ |
175 |
String res = currentElement; |
|
176 | 176 |
try { |
177 |
currentElement = elements.take();
|
|
177 |
currentElement = elements.poll(QUEUE_TIMEOUT_SECONDS, TimeUnit.SECONDS);
|
|
178 | 178 |
} catch (InterruptedException e) { |
179 |
log.error("Exception in doNext:\n"+e); |
|
180 |
errors.merge(e.getMessage(), 1, Integer::sum); |
|
179 |
if (executor.isTerminated()) { |
|
180 |
//no additional element and we got the timeout... |
|
181 |
String errMessage = "Unexpected timeout reading element queue"; |
|
182 |
log.warn(errMessage); |
|
183 |
errors.merge(errMessage, 1, Integer::sum); |
|
184 |
currentElement = ERROR_TERMINATOR; |
|
185 |
} else { |
|
186 |
//executor is not terminated: let's try again |
|
187 |
log.warn("Queue timed out, but I'll try again"); |
|
188 |
getNextElement(--attempt); |
|
189 |
} |
|
181 | 190 |
} |
182 |
return res; |
|
183 | 191 |
} |
184 |
|
|
185 | 192 |
} |
186 | 193 |
|
187 | 194 |
@Override |
... | ... | |
221 | 228 |
return errors; |
222 | 229 |
} |
223 | 230 |
|
224 |
public BlockingQueue<String> getGraphs() { |
|
225 |
return graphs; |
|
226 |
} |
|
227 |
|
|
228 |
public BlockingQueue<String> getSubjects() { |
|
229 |
return subjects; |
|
230 |
} |
|
231 |
|
|
232 | 231 |
public BlockingQueue<String> getElements() { |
233 | 232 |
return elements; |
234 | 233 |
} |
Also available in: Unified diff
Simplified code of VirtuosoIterator. Only one queue and not too many parallel calls: let's see if virtuoso is more happy and if debugging is easier.