Revision 53583
Added by Antonis Lempesis over 5 years ago
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impls/listeners/DnetListener.java | ||
---|---|---|
143 | 143 |
} |
144 | 144 |
} |
145 | 145 |
|
146 |
public void sendToQueueOld(String xmlString) { |
|
147 |
logger.debug("received passed XMLresult"); |
|
148 |
// add records |
|
149 |
outputRSBuffer.add(xmlString); |
|
150 |
if (outputRSBuffer.size() > 30) { |
|
151 |
long time1 = Calendar.getInstance().getTimeInMillis(); |
|
152 |
try { |
|
153 |
resultSetService.populateRS(outputResultSetID, outputRSBuffer); |
|
154 |
outputRSBuffer.clear(); |
|
155 |
} catch (ResultSetServiceException e) { |
|
156 |
logger.error("Error populating ResultSetService.", e); |
|
157 |
} |
|
158 |
long time2 = Calendar.getInstance().getTimeInMillis(); |
|
159 |
logger.debug("Populating RS took " + ((time2 - time1)) |
|
160 |
+ " milli seconds"); |
|
161 |
} |
|
162 |
// logger.debug("XML: " + xmlString); |
|
163 |
} |
|
164 |
|
|
165 |
public void sendToQueue(String xmlString) { |
|
166 |
logger.debug("received passed XMLresult"); |
|
167 |
// add records |
|
168 |
// if (queue.size() == 100000) { |
|
169 |
// queue.clear(); |
|
170 |
// resultSetService. |
|
171 |
// } |
|
172 |
try { |
|
173 |
queue.put(xmlString); |
|
174 |
} catch (InterruptedException e1) { |
|
175 |
logger.error("Error putting in queue.", e1); |
|
176 |
} |
|
177 |
if (queue.size() > 50) { |
|
178 |
long time1 = Calendar.getInstance().getTimeInMillis(); |
|
179 |
RSTask task = new RSTask(resultSetService, outputResultSetID, queue, activeThreads, allThreadsFinished); |
|
180 |
// synchronized (counterLock) { |
|
181 |
// activeThreads ++; |
|
182 |
// } |
|
183 |
activeThreads.getAndIncrement(); |
|
184 |
rsExecutor.execute(task); |
|
185 |
long time2 = Calendar.getInstance().getTimeInMillis(); |
|
186 |
logger.debug("Populating RS took " + ((time2 - time1)) |
|
187 |
+ " milli seconds"); |
|
188 |
} |
|
189 |
} |
|
190 |
|
|
191 | 146 |
@Override |
192 |
public synchronized void currentResults(List<CompletedTask> tasks, |
|
193 |
int jobId, Object record, Map<String, Object> recordContext, |
|
194 |
Throwable t) throws ValidatorException { |
|
147 |
public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String, Object> recordContext, Throwable t) throws ValidatorException { |
|
195 | 148 |
try { |
196 | 149 |
blackboardHandler.getBlackboardHandler().failed(job, t); |
197 | 150 |
} catch (Exception e) { |
198 | 151 |
logger.error("Error while proccessing record results"); |
199 |
throw new ValidatorException( |
|
200 |
"Error while proccessing record results", |
|
201 |
e); |
|
152 |
throw new ValidatorException("Error while proccessing record results", e); |
|
202 | 153 |
} |
203 | 154 |
} |
204 | 155 |
|
205 | 156 |
@Override |
206 |
public synchronized void currentResults(List<CompletedTask> tasks, |
|
207 |
int jobId, Object record, Map<String, Object> recordContext) |
|
208 |
throws ValidatorException { |
|
157 |
public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String, Object> recordContext) throws ValidatorException { |
|
209 | 158 |
try { |
210 | 159 |
if (enableOutputToRS) { |
211 | 160 |
if (!eprSetOk) { |
212 | 161 |
this.initEprOutput(); |
213 |
job.getParameters().put("outputResultSetEpr", |
|
214 |
EPRUtils.eprToXml(outputEpr)); |
|
162 |
job.getParameters().put("outputResultSetEpr", EPRUtils.eprToXml(outputEpr)); |
|
215 | 163 |
eprSetOk = true; |
216 |
logger.error("output epr id successfully set: " |
|
217 |
+ EPRUtils.eprToXml(outputEpr));
|
|
164 |
|
|
165 |
logger.error("output epr id successfully set: " + EPRUtils.eprToXml(outputEpr));
|
|
218 | 166 |
logger.error("current job status: " + job.getActionStatus()); |
219 | 167 |
} |
220 | 168 |
|
221 | 169 |
long time1 = Calendar.getInstance().getTimeInMillis(); |
222 |
@SuppressWarnings("unchecked") |
|
223 |
String xmlString = xmlBuilder.buildXml( |
|
224 |
(List<Map<String, String>>) recordContext |
|
225 |
.get("veloList"), record, |
|
226 |
(Map<String, String>) recordContext |
|
227 |
.get("recordValidationResult")); |
|
170 |
// @SuppressWarnings("unchecked") |
|
171 |
String xmlString = xmlBuilder.buildXml((List<Map<String, String>>) recordContext.get("veloList"), record, (Map<String, String>) recordContext.get("recordValidationResult")); |
|
228 | 172 |
long time2 = Calendar.getInstance().getTimeInMillis(); |
229 |
logger.debug("Building the xml took " + ((time2 - time1)) |
|
230 |
+ " milli seconds"); |
|
231 |
if ((Integer) recordContext.get("score") > 0) |
|
232 |
this.sendToQueue(xmlString); |
|
233 |
else |
|
234 |
this.sendToQueue(xmlString); |
|
173 |
|
|
174 |
logger.debug("Building the xml took " + ((time2 - time1)) + " milli seconds"); |
|
175 |
this.sendToQueue(xmlString); |
|
235 | 176 |
// logger.debug("XML: " + xmlString); |
236 | 177 |
} |
178 |
|
|
237 | 179 |
if (enableOutputToDisk) { |
238 | 180 |
if (!diskSetOk) { |
239 | 181 |
this.initDiskOutput(); |
... | ... | |
254 | 196 |
// outputDiskBuffer.clear(); |
255 | 197 |
// } |
256 | 198 |
} |
199 |
|
|
257 | 200 |
objsValidated++; |
201 |
|
|
258 | 202 |
if (objsValidated % jobStatusUpdateInterval == 0) { |
259 | 203 |
job.getParameters().put("recordsTested", |
260 | 204 |
Integer.toString(objsValidated)); |
... | ... | |
262 | 206 |
} |
263 | 207 |
} catch (Exception e) { |
264 | 208 |
logger.error("Error while proccessing record results"); |
265 |
throw new ValidatorException( |
|
266 |
"Error while proccessing record results", |
|
267 |
e); |
|
209 |
throw new ValidatorException("Error while proccessing record results", e); |
|
268 | 210 |
} |
269 | 211 |
} |
270 | 212 |
|
271 | 213 |
@Override |
272 | 214 |
public synchronized void finished(int jobId, Map<String, Object> jobContext) { |
273 | 215 |
try { |
216 |
|
|
217 |
logger.debug("Job " + jobId + " finished"); |
|
218 |
|
|
274 | 219 |
internalJobsFinished++; |
275 | 220 |
if (internalJobsFinished == internalJobsSum) { |
221 |
|
|
222 |
logger.debug("internalJobsFinished == internalJobsSum: " + (internalJobsFinished == internalJobsSum)); |
|
276 | 223 |
if (enableOutputToRS && eprSetOk) { |
277 | 224 |
// send remaining xmls from buffers |
278 | 225 |
// if (outputRSBuffer.size() > 0) { |
... | ... | |
386 | 333 |
} |
387 | 334 |
} |
388 | 335 |
|
336 |
private void sendToQueue(String xmlString) { |
|
337 |
logger.debug("received passed XMLresult"); |
|
338 |
// add records |
|
339 |
// if (queue.size() == 100000) { |
|
340 |
// queue.clear(); |
|
341 |
// resultSetService. |
|
342 |
// } |
|
343 |
try { |
|
344 |
queue.put(xmlString); |
|
345 |
} catch (InterruptedException e1) { |
|
346 |
logger.error("Error putting in queue.", e1); |
|
347 |
} |
|
348 |
if (queue.size() > 50) { |
|
349 |
long time1 = Calendar.getInstance().getTimeInMillis(); |
|
350 |
RSTask task = new RSTask(resultSetService, outputResultSetID, queue, activeThreads, allThreadsFinished); |
|
351 |
// synchronized (counterLock) { |
|
352 |
// activeThreads ++; |
|
353 |
// } |
|
354 |
activeThreads.getAndIncrement(); |
|
355 |
rsExecutor.execute(task); |
|
356 |
long time2 = Calendar.getInstance().getTimeInMillis(); |
|
357 |
logger.debug("Populating RS took " + ((time2 - time1)) |
|
358 |
+ " milli seconds"); |
|
359 |
} |
|
360 |
} |
|
361 |
|
|
389 | 362 |
public RecordXMLBuilder getXmlBuilder() { |
390 | 363 |
return xmlBuilder; |
391 | 364 |
} |
Also available in: Unified diff
code cleanup