12 |
12 |
import java.util.concurrent.LinkedBlockingQueue;
|
13 |
13 |
import java.util.concurrent.atomic.AtomicInteger;
|
14 |
14 |
|
|
15 |
import eu.dnetlib.domain.functionality.validator.StoredJob;
|
|
16 |
import eu.dnetlib.validator.service.impl.ValidatorManager;
|
15 |
17 |
import org.apache.log4j.Logger;
|
16 |
18 |
import org.springframework.core.task.TaskExecutor;
|
17 |
19 |
|
... | ... | |
33 |
35 |
|
34 |
36 |
private static Logger logger = Logger.getLogger(DnetListener.class);
|
35 |
37 |
private final AtomicInteger activeThreads = new AtomicInteger(0);
|
36 |
|
public Object allThreadsFinished = new Object();
|
|
38 |
private Object allThreadsFinished = new Object();
|
37 |
39 |
private RecordXMLBuilder xmlBuilder;
|
38 |
40 |
private BlackboardJob job;
|
39 |
41 |
private BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler;
|
... | ... | |
41 |
43 |
private ServiceLocator<ResultSetService> resultSetServiceLocator = null;
|
42 |
44 |
private ResultSetService resultSetService;
|
43 |
45 |
private String outputResultSetID;
|
44 |
|
private List<String> outputRSBuffer, outputDiskBuffer;
|
|
46 |
private List<String> outputDiskBuffer;
|
45 |
47 |
private EPR outputEpr;
|
46 |
48 |
private int internalJobsFinished = 0;
|
47 |
49 |
private int internalJobsSum = 1;
|
48 |
50 |
private int objsValidated = 0;
|
49 |
51 |
private int jobStatusUpdateInterval;
|
50 |
|
private boolean eprSetOk = false;
|
51 |
52 |
private boolean enableOutputToRS = false;
|
52 |
53 |
private boolean enableOutputToDisk = false;
|
53 |
|
private boolean diskSetOk = false;
|
54 |
54 |
private BufferedWriter bw = null;
|
55 |
55 |
private BlockingQueue<String> queue;
|
56 |
56 |
private int validationJobId;
|
57 |
57 |
|
58 |
|
@PostConstruct
|
|
58 |
private ValidatorManager validatorManager;
|
|
59 |
private String groupBy;
|
|
60 |
|
59 |
61 |
public void initOutputs() {
|
60 |
62 |
logger.debug("initializing outputs");
|
61 |
63 |
if (enableOutputToRS) {
|
62 |
|
if (!eprSetOk) {
|
63 |
|
this.initEprOutput();
|
64 |
|
job.getParameters().put("outputResultSetEpr",
|
65 |
|
EPRUtils.eprToXml(outputEpr));
|
66 |
|
eprSetOk = true;
|
67 |
|
logger.debug("epr output ok");
|
68 |
|
logger.debug("output epr id successfully set: "
|
69 |
|
+ EPRUtils.eprToXml(outputEpr));
|
70 |
|
logger.debug("current job status: " + job.getActionStatus());
|
71 |
|
}
|
|
64 |
this.initEprOutput();
|
|
65 |
job.getParameters().put("outputResultSetEpr", EPRUtils.eprToXml(outputEpr));
|
|
66 |
|
|
67 |
logger.debug("output epr id successfully set: " + EPRUtils.eprToXml(outputEpr));
|
72 |
68 |
}
|
73 |
69 |
|
74 |
70 |
if (enableOutputToDisk) {
|
75 |
|
logger.debug("initializing disk 0");
|
76 |
|
if (!diskSetOk) {
|
77 |
|
logger.debug("initializing disk 1");
|
78 |
|
this.initDiskOutput();
|
79 |
|
diskSetOk = true;
|
80 |
|
logger.debug("disk output ok");
|
81 |
|
}
|
|
71 |
this.initDiskOutput();
|
|
72 |
logger.debug("disk output ok");
|
82 |
73 |
} else {
|
83 |
74 |
logger.debug("initializing disk disabled");
|
84 |
75 |
}
|
85 |
76 |
}
|
86 |
77 |
|
87 |
|
public void initEprOutput() {
|
88 |
|
try {
|
89 |
|
logger.debug("Initializing ResultSetService.");
|
90 |
|
// Get reference to service
|
91 |
|
resultSetService = resultSetServiceLocator.getService();
|
92 |
|
// Create a new result set
|
93 |
|
outputEpr = resultSetService.createPushRS(86400, 0);
|
94 |
|
// get result set ids
|
95 |
|
outputResultSetID = outputEpr.getParameter("ResourceIdentifier");
|
96 |
|
|
97 |
|
// initializing buffers
|
98 |
|
outputRSBuffer = new ArrayList<String>();
|
99 |
|
|
100 |
|
queue = new LinkedBlockingQueue<String>();
|
101 |
|
|
102 |
|
} catch (Exception e) {
|
103 |
|
logger.error("Error initializing ResultSetService.", e);
|
104 |
|
blackboardHandler.getBlackboardHandler().failed(job, e);
|
105 |
|
}
|
106 |
|
}
|
107 |
|
|
108 |
|
public void initDiskOutput() {
|
109 |
|
try {
|
110 |
|
|
111 |
|
logger.debug("Initializing FileOutputStream.");
|
112 |
|
// File file = new File("/var/lib/dnet/validator/workflow_blacklists/"
|
113 |
|
// + validationJobId);
|
114 |
|
String datasourceId;
|
115 |
|
if (job.getParameters().get("datasourceId") != null) {
|
116 |
|
datasourceId = job.getParameters().get("datasourceId");
|
117 |
|
} else if (job.getParameters().get("datasourceID") != null) {
|
118 |
|
datasourceId = job.getParameters().get("datasourceID");
|
119 |
|
} else {
|
120 |
|
datasourceId = "unknownId";
|
121 |
|
}
|
122 |
|
|
123 |
|
File file = new File("/var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
|
124 |
|
logger.debug("File: + /var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
|
125 |
|
if (file.exists()) {
|
126 |
|
logger.debug("File will be replaced");
|
127 |
|
file.delete();
|
128 |
|
}
|
129 |
|
file.createNewFile();
|
130 |
|
|
131 |
|
bw = new BufferedWriter(new FileWriter(file.getAbsoluteFile()));
|
132 |
|
|
133 |
|
// GZIPOutputStream zip = new GZIPOutputStream(new FileOutputStream(file.getAbsoluteFile()));
|
134 |
|
|
135 |
|
// bw = new BufferedWriter(new OutputStreamWriter(zip, "UTF-8"));
|
136 |
|
|
137 |
|
outputDiskBuffer = new ArrayList<String>();
|
138 |
|
|
139 |
|
} catch (IOException e) {
|
140 |
|
logger.error("Error initializing FileOutputStream.", e);
|
141 |
|
blackboardHandler.getBlackboardHandler().failed(job, e);
|
142 |
|
|
143 |
|
}
|
144 |
|
}
|
145 |
|
|
146 |
78 |
@Override
|
147 |
79 |
public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String, Object> recordContext, Throwable t) throws ValidatorException {
|
148 |
80 |
try {
|
... | ... | |
157 |
89 |
public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String, Object> recordContext) throws ValidatorException {
|
158 |
90 |
try {
|
159 |
91 |
if (enableOutputToRS) {
|
160 |
|
if (!eprSetOk) {
|
161 |
|
this.initEprOutput();
|
162 |
|
job.getParameters().put("outputResultSetEpr", EPRUtils.eprToXml(outputEpr));
|
163 |
|
eprSetOk = true;
|
164 |
|
|
165 |
|
logger.error("output epr id successfully set: " + EPRUtils.eprToXml(outputEpr));
|
166 |
|
logger.error("current job status: " + job.getActionStatus());
|
167 |
|
}
|
168 |
|
|
169 |
|
long time1 = Calendar.getInstance().getTimeInMillis();
|
170 |
|
// @SuppressWarnings("unchecked")
|
|
92 |
@SuppressWarnings("unchecked")
|
171 |
93 |
String xmlString = xmlBuilder.buildXml((List<Map<String, String>>) recordContext.get("veloList"), record, (Map<String, String>) recordContext.get("recordValidationResult"));
|
172 |
|
long time2 = Calendar.getInstance().getTimeInMillis();
|
173 |
|
|
174 |
|
logger.debug("Building the xml took " + ((time2 - time1)) + " milli seconds");
|
175 |
94 |
this.sendToQueue(xmlString);
|
176 |
|
// logger.debug("XML: " + xmlString);
|
177 |
95 |
}
|
178 |
96 |
|
179 |
97 |
if (enableOutputToDisk) {
|
180 |
|
if (!diskSetOk) {
|
181 |
|
this.initDiskOutput();
|
182 |
|
diskSetOk = true;
|
183 |
|
}
|
184 |
|
|
185 |
98 |
if ((Integer) recordContext.get("recordBlacklistScore") < 100) {
|
186 |
99 |
bw.write(tasks.get(0).valobjId);
|
187 |
100 |
bw.newLine();
|
... | ... | |
200 |
113 |
objsValidated++;
|
201 |
114 |
|
202 |
115 |
if (objsValidated % jobStatusUpdateInterval == 0) {
|
203 |
|
job.getParameters().put("recordsTested",
|
204 |
|
Integer.toString(objsValidated));
|
|
116 |
job.getParameters().put("recordsTested", Integer.toString(objsValidated));
|
205 |
117 |
blackboardHandler.getBlackboardHandler().ongoing(job);
|
206 |
118 |
}
|
207 |
119 |
} catch (Exception e) {
|
208 |
120 |
logger.error("Error while proccessing record results");
|
209 |
|
throw new ValidatorException("Error while proccessing record results", e);
|
|
121 |
throw new ValidatorException("Error while proccessing record results", e);
|
210 |
122 |
}
|
211 |
123 |
}
|
212 |
124 |
|
... | ... | |
219 |
131 |
internalJobsFinished++;
|
220 |
132 |
if (internalJobsFinished == internalJobsSum) {
|
221 |
133 |
|
222 |
|
logger.debug("internalJobsFinished == internalJobsSum: " + (internalJobsFinished == internalJobsSum));
|
223 |
|
if (enableOutputToRS && eprSetOk) {
|
|
134 |
logger.debug("internalJobsFinished == internalJobsSum");
|
|
135 |
if (enableOutputToRS) {
|
224 |
136 |
// send remaining xmls from buffers
|
225 |
137 |
// if (outputRSBuffer.size() > 0) {
|
226 |
138 |
// try {
|
... | ... | |
262 |
174 |
logger.debug("closing result set");
|
263 |
175 |
resultSetService.closeRS(outputResultSetID);
|
264 |
176 |
}
|
265 |
|
if (enableOutputToDisk && diskSetOk) {
|
|
177 |
if (enableOutputToDisk) {
|
266 |
178 |
bw.write("FINISHED");
|
267 |
179 |
// bw.newLine();
|
268 |
180 |
// logger.debug("output disk buffer size: " + outputDiskBuffer.size());
|
... | ... | |
277 |
189 |
}
|
278 |
190 |
// job.getParameters().put("outputResultSetEpr",
|
279 |
191 |
// EPRUtils.eprToXml(outputEpr));
|
280 |
|
job.getParameters().put("jobId",
|
281 |
|
Integer.toString(validationJobId));
|
|
192 |
job.getParameters().put("jobId", Integer.toString(validationJobId));
|
282 |
193 |
|
283 |
194 |
job.getParameters().put("recordsTested", Integer.toString(objsValidated));
|
284 |
195 |
// job.getParameters().put(
|
... | ... | |
286 |
197 |
// Integer.toString(resultSetService
|
287 |
198 |
// .getNumberOfElements(outputResultSetID)));
|
288 |
199 |
// update job status
|
|
200 |
|
|
201 |
logger.info("Getting stored job");
|
|
202 |
StoredJob storedJob = validatorManager.getStoredJob(jobId, this.groupBy);
|
|
203 |
logger.info("Score: " + storedJob.getContentJobScore());
|
|
204 |
job.getParameters().put("score", storedJob.getContentJobScore() + "");
|
|
205 |
|
289 |
206 |
blackboardHandler.getBlackboardHandler().done(job);
|
290 |
207 |
|
291 |
208 |
} else {
|
... | ... | |
304 |
221 |
try {
|
305 |
222 |
internalJobsFinished++;
|
306 |
223 |
if (internalJobsFinished == internalJobsSum) {
|
307 |
|
if (enableOutputToRS && eprSetOk) {
|
|
224 |
if (enableOutputToRS) {
|
308 |
225 |
resultSetService.closeRS(outputResultSetID);
|
309 |
226 |
}
|
310 |
|
if (enableOutputToDisk && diskSetOk) {
|
|
227 |
if (enableOutputToDisk) {
|
311 |
228 |
outputDiskBuffer.clear();
|
312 |
229 |
bw.close();
|
313 |
|
File file = new File("/tmp/validator-wf/"
|
314 |
|
+ validationJobId);
|
|
230 |
File file = new File("/tmp/validator-wf/" + validationJobId);
|
315 |
231 |
// File file = new File("/tmp/validator-wf/"
|
316 |
232 |
// + job.getParameters().get("datasourceId"));
|
317 |
233 |
if (file.exists()) {
|
... | ... | |
335 |
251 |
|
336 |
252 |
private void sendToQueue(String xmlString) {
|
337 |
253 |
logger.debug("received passed XMLresult");
|
338 |
|
// add records
|
339 |
|
// if (queue.size() == 100000) {
|
340 |
|
// queue.clear();
|
341 |
|
// resultSetService.
|
342 |
|
// }
|
|
254 |
|
343 |
255 |
try {
|
344 |
256 |
queue.put(xmlString);
|
345 |
257 |
} catch (InterruptedException e1) {
|
... | ... | |
359 |
271 |
}
|
360 |
272 |
}
|
361 |
273 |
|
|
274 |
private void initEprOutput() {
|
|
275 |
try {
|
|
276 |
logger.debug("Initializing ResultSetService.");
|
|
277 |
// Get reference to service
|
|
278 |
resultSetService = resultSetServiceLocator.getService();
|
|
279 |
// Create a new result set
|
|
280 |
outputEpr = resultSetService.createPushRS(86400, 0);
|
|
281 |
// get result set ids
|
|
282 |
outputResultSetID = outputEpr.getParameter("ResourceIdentifier");
|
|
283 |
|
|
284 |
// initializing buffers
|
|
285 |
// outputRSBuffer = new ArrayList<String>();
|
|
286 |
|
|
287 |
queue = new LinkedBlockingQueue<String>();
|
|
288 |
|
|
289 |
} catch (Exception e) {
|
|
290 |
logger.error("Error initializing ResultSetService.", e);
|
|
291 |
blackboardHandler.getBlackboardHandler().failed(job, e);
|
|
292 |
}
|
|
293 |
}
|
|
294 |
|
|
295 |
private void initDiskOutput() {
|
|
296 |
try {
|
|
297 |
|
|
298 |
logger.debug("Initializing FileOutputStream.");
|
|
299 |
// File file = new File("/var/lib/dnet/validator/workflow_blacklists/"
|
|
300 |
// + validationJobId);
|
|
301 |
String datasourceId;
|
|
302 |
if (job.getParameters().get("datasourceId") != null) {
|
|
303 |
datasourceId = job.getParameters().get("datasourceId");
|
|
304 |
} else if (job.getParameters().get("datasourceID") != null) {
|
|
305 |
datasourceId = job.getParameters().get("datasourceID");
|
|
306 |
} else {
|
|
307 |
datasourceId = "unknownId";
|
|
308 |
}
|
|
309 |
|
|
310 |
File file = new File("/var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
|
|
311 |
logger.debug("File: + /var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
|
|
312 |
if (file.exists()) {
|
|
313 |
logger.debug("File will be replaced");
|
|
314 |
file.delete();
|
|
315 |
}
|
|
316 |
file.createNewFile();
|
|
317 |
|
|
318 |
bw = new BufferedWriter(new FileWriter(file.getAbsoluteFile()));
|
|
319 |
|
|
320 |
// GZIPOutputStream zip = new GZIPOutputStream(new FileOutputStream(file.getAbsoluteFile()));
|
|
321 |
|
|
322 |
// bw = new BufferedWriter(new OutputStreamWriter(zip, "UTF-8"));
|
|
323 |
|
|
324 |
outputDiskBuffer = new ArrayList<String>();
|
|
325 |
|
|
326 |
} catch (IOException e) {
|
|
327 |
logger.error("Error initializing FileOutputStream.", e);
|
|
328 |
blackboardHandler.getBlackboardHandler().failed(job, e);
|
|
329 |
|
|
330 |
}
|
|
331 |
}
|
|
332 |
|
362 |
333 |
public RecordXMLBuilder getXmlBuilder() {
|
363 |
334 |
return xmlBuilder;
|
364 |
335 |
}
|
... | ... | |
441 |
412 |
this.enableOutputToRS = enableOutputToRS;
|
442 |
413 |
}
|
443 |
414 |
|
|
415 |
public ValidatorManager getValidatorManager() {
|
|
416 |
return validatorManager;
|
|
417 |
}
|
|
418 |
|
|
419 |
public void setValidatorManager(ValidatorManager validatorManager) {
|
|
420 |
this.validatorManager = validatorManager;
|
|
421 |
}
|
|
422 |
|
|
423 |
public String getGroupBy() {
|
|
424 |
return groupBy;
|
|
425 |
}
|
|
426 |
|
|
427 |
public void setGroupBy(String groupBy) {
|
|
428 |
this.groupBy = groupBy;
|
|
429 |
}
|
444 |
430 |
}
|
- Returning the validation score in the bb message
- Fixed the metadataPrefix passed to the OAI-PMH Harvester