Project

General

Profile

« Previous | Next » 

Revision 55394

- Returning the validation score in the bb message
- Fixed the metadataPrefix passed to the OAI-PMH Harvester

View differences:

modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impl/ValidatorManagerImpl.java
187 187
			dnetListener.setInternalJobsSum(workers);
188 188
			dnetListener.setValidationJobId(jobIdStored);
189 189
			dnetListener.setJobStatusUpdateInterval(jobStatusUpdateInterval);
190

  
191
			dnetListener.setValidatorManager(this);
192
			dnetListener.setGroupBy(groupBy);
193

  
190 194
			if (blacklistedRecords) {
191 195
				dnetListener.setEnableOutputToDisk(true);
192 196
				
......
194 198
			if (outputEpr)
195 199
				dnetListener.setEnableOutputToRS(true);
196 200

  
201
			dnetListener.initOutputs();
202

  
197 203
			validator.submitJob(jobContent, workers, listenerContent, dnetListener);
198 204
				
199 205
			return newJob;
......
221 227
				}	
222 228
			}
223 229
			
224
			if ( job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire2.0") || 
225
					job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire3.0") || 
230
			if (
231
//					job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire2.0") ||
232
					job.getDesiredCompatibilityLevel().toLowerCase().matches("^openaire\\d.\\d$") ||
233
//					job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire3.0") ||
234
//					job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire4.0") ||
226 235
					job.getDesiredCompatibilityLevel().equalsIgnoreCase("driver") ) {
227 236
				logger.debug("Chosen set: OpenAIRE For Literature Repositories");
228 237
				logger.debug("Setting METADATA_PREFIX to: oai_dc");
229 238
				job.setMetadataPrefix("oai_dc");
230
			} else if ( job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire2.0_data")) {
239
			} else if ( job.getDesiredCompatibilityLevel().toLowerCase().matches("^openaire\\d.\\d_data$")) {
240
					// job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire2.0_data")) {
231 241
				logger.debug("Chosen set: OpenAIRE For Data Archives");
232 242
				logger.debug("Setting METADATA_PREFIX to: oai_datacite");
233 243
				job.setMetadataPrefix("oai_datacite");			
234
			} else if ( job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire3.0_cris")) {
244
			} else if ( job.getDesiredCompatibilityLevel().toLowerCase().matches("^openaire\\d.\\d(.\\d)?_cris$")) {
245
					// job.getDesiredCompatibilityLevel().equalsIgnoreCase("openaire3.0_cris")) {
235 246
				logger.debug("Chosen set: OpenAIRE For Cris");
236 247
				logger.debug("Setting METADATA_PREFIX to: oai_CERIF_openaire");
237 248
				job.setMetadataPrefix("oai_CERIF_openaire");			
modules/uoa-validator-service/trunk/src/main/java/eu/dnetlib/validator/service/impls/listeners/DnetListener.java
12 12
import java.util.concurrent.LinkedBlockingQueue;
13 13
import java.util.concurrent.atomic.AtomicInteger;
14 14

  
15
import eu.dnetlib.domain.functionality.validator.StoredJob;
16
import eu.dnetlib.validator.service.impl.ValidatorManager;
15 17
import org.apache.log4j.Logger;
16 18
import org.springframework.core.task.TaskExecutor;
17 19

  
......
33 35

  
34 36
    private static Logger logger = Logger.getLogger(DnetListener.class);
35 37
    private final AtomicInteger activeThreads = new AtomicInteger(0);
36
    public Object allThreadsFinished = new Object();
38
    private Object allThreadsFinished = new Object();
37 39
    private RecordXMLBuilder xmlBuilder;
38 40
    private BlackboardJob job;
39 41
    private BlackboardNotificationHandler<BlackboardServerHandler> blackboardHandler;
......
41 43
    private ServiceLocator<ResultSetService> resultSetServiceLocator = null;
42 44
    private ResultSetService resultSetService;
43 45
    private String outputResultSetID;
44
    private List<String> outputRSBuffer, outputDiskBuffer;
46
    private List<String> outputDiskBuffer;
45 47
    private EPR outputEpr;
46 48
    private int internalJobsFinished = 0;
47 49
    private int internalJobsSum = 1;
48 50
    private int objsValidated = 0;
49 51
    private int jobStatusUpdateInterval;
50
    private boolean eprSetOk = false;
51 52
    private boolean enableOutputToRS = false;
52 53
    private boolean enableOutputToDisk = false;
53
    private boolean diskSetOk = false;
54 54
    private BufferedWriter bw = null;
55 55
    private BlockingQueue<String> queue;
56 56
    private int validationJobId;
57 57

  
58
    @PostConstruct
58
    private ValidatorManager validatorManager;
59
    private String groupBy;
60

  
59 61
    public void initOutputs() {
60 62
        logger.debug("initializing outputs");
61 63
        if (enableOutputToRS) {
62
            if (!eprSetOk) {
63
                this.initEprOutput();
64
                job.getParameters().put("outputResultSetEpr",
65
                        EPRUtils.eprToXml(outputEpr));
66
                eprSetOk = true;
67
                logger.debug("epr output ok");
68
                logger.debug("output epr id successfully set: "
69
                        + EPRUtils.eprToXml(outputEpr));
70
                logger.debug("current job status: " + job.getActionStatus());
71
            }
64
            this.initEprOutput();
65
            job.getParameters().put("outputResultSetEpr", EPRUtils.eprToXml(outputEpr));
66

  
67
            logger.debug("output epr id successfully set: " + EPRUtils.eprToXml(outputEpr));
72 68
        }
73 69

  
74 70
        if (enableOutputToDisk) {
75
            logger.debug("initializing disk 0");
76
            if (!diskSetOk) {
77
                logger.debug("initializing disk 1");
78
                this.initDiskOutput();
79
                diskSetOk = true;
80
                logger.debug("disk output ok");
81
            }
71
            this.initDiskOutput();
72
            logger.debug("disk output ok");
82 73
        } else {
83 74
            logger.debug("initializing disk disabled");
84 75
        }
85 76
    }
86 77

  
87
    public void initEprOutput() {
88
        try {
89
            logger.debug("Initializing ResultSetService.");
90
            // Get reference to service
91
            resultSetService = resultSetServiceLocator.getService();
92
            // Create a new result set
93
            outputEpr = resultSetService.createPushRS(86400, 0);
94
            // get result set ids
95
            outputResultSetID = outputEpr.getParameter("ResourceIdentifier");
96

  
97
            // initializing buffers
98
            outputRSBuffer = new ArrayList<String>();
99

  
100
            queue = new LinkedBlockingQueue<String>();
101

  
102
        } catch (Exception e) {
103
            logger.error("Error initializing ResultSetService.", e);
104
            blackboardHandler.getBlackboardHandler().failed(job, e);
105
        }
106
    }
107

  
108
    public void initDiskOutput() {
109
        try {
110

  
111
            logger.debug("Initializing FileOutputStream.");
112
//			File file = new File("/var/lib/dnet/validator/workflow_blacklists/"
113
//					+ validationJobId);
114
            String datasourceId;
115
            if (job.getParameters().get("datasourceId") != null) {
116
                datasourceId = job.getParameters().get("datasourceId");
117
            } else if (job.getParameters().get("datasourceID") != null) {
118
                datasourceId = job.getParameters().get("datasourceID");
119
            } else {
120
                datasourceId = "unknownId";
121
            }
122

  
123
            File file = new File("/var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
124
            logger.debug("File: + /var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
125
            if (file.exists()) {
126
                logger.debug("File will be replaced");
127
                file.delete();
128
            }
129
            file.createNewFile();
130

  
131
            bw = new BufferedWriter(new FileWriter(file.getAbsoluteFile()));
132

  
133
//			GZIPOutputStream zip = new GZIPOutputStream(new FileOutputStream(file.getAbsoluteFile()));
134

  
135
//			bw = new BufferedWriter(new OutputStreamWriter(zip, "UTF-8"));
136

  
137
            outputDiskBuffer = new ArrayList<String>();
138

  
139
        } catch (IOException e) {
140
            logger.error("Error initializing FileOutputStream.", e);
141
            blackboardHandler.getBlackboardHandler().failed(job, e);
142

  
143
        }
144
    }
145

  
146 78
    @Override
147 79
    public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String, Object> recordContext, Throwable t) throws ValidatorException {
148 80
        try {
......
157 89
    public synchronized void currentResults(List<CompletedTask> tasks, int jobId, Object record, Map<String, Object> recordContext) throws ValidatorException {
158 90
        try {
159 91
            if (enableOutputToRS) {
160
                if (!eprSetOk) {
161
                    this.initEprOutput();
162
                    job.getParameters().put("outputResultSetEpr", EPRUtils.eprToXml(outputEpr));
163
                    eprSetOk = true;
164

  
165
                    logger.error("output epr id successfully set: " + EPRUtils.eprToXml(outputEpr));
166
                    logger.error("current job status: " + job.getActionStatus());
167
                }
168

  
169
                long time1 = Calendar.getInstance().getTimeInMillis();
170
//                @SuppressWarnings("unchecked")
92
                @SuppressWarnings("unchecked")
171 93
                String xmlString = xmlBuilder.buildXml((List<Map<String, String>>) recordContext.get("veloList"), record, (Map<String, String>) recordContext.get("recordValidationResult"));
172
                long time2 = Calendar.getInstance().getTimeInMillis();
173

  
174
                logger.debug("Building the xml took " + ((time2 - time1)) + " milli seconds");
175 94
                this.sendToQueue(xmlString);
176
                // logger.debug("XML: " + xmlString);
177 95
            }
178 96

  
179 97
            if (enableOutputToDisk) {
180
                if (!diskSetOk) {
181
                    this.initDiskOutput();
182
                    diskSetOk = true;
183
                }
184

  
185 98
                if ((Integer) recordContext.get("recordBlacklistScore") < 100) {
186 99
                    bw.write(tasks.get(0).valobjId);
187 100
                    bw.newLine();
......
200 113
            objsValidated++;
201 114

  
202 115
            if (objsValidated % jobStatusUpdateInterval == 0) {
203
                job.getParameters().put("recordsTested",
204
                        Integer.toString(objsValidated));
116
                job.getParameters().put("recordsTested", Integer.toString(objsValidated));
205 117
                blackboardHandler.getBlackboardHandler().ongoing(job);
206 118
            }
207 119
        } catch (Exception e) {
208 120
            logger.error("Error while proccessing record results");
209
            throw new ValidatorException("Error while proccessing record results",                    e);
121
            throw new ValidatorException("Error while proccessing record results", e);
210 122
        }
211 123
    }
212 124

  
......
219 131
            internalJobsFinished++;
220 132
            if (internalJobsFinished == internalJobsSum) {
221 133

  
222
                logger.debug("internalJobsFinished == internalJobsSum: " + (internalJobsFinished == internalJobsSum));
223
                if (enableOutputToRS && eprSetOk) {
134
                logger.debug("internalJobsFinished == internalJobsSum");
135
                if (enableOutputToRS) {
224 136
                    // send remaining xmls from buffers
225 137
//					if (outputRSBuffer.size() > 0) {
226 138
//						try {
......
262 174
                    logger.debug("closing result set");
263 175
                    resultSetService.closeRS(outputResultSetID);
264 176
                }
265
                if (enableOutputToDisk && diskSetOk) {
177
                if (enableOutputToDisk) {
266 178
                    bw.write("FINISHED");
267 179
//					bw.newLine();
268 180
//					logger.debug("output disk buffer size: " + outputDiskBuffer.size());
......
277 189
                }
278 190
                // job.getParameters().put("outputResultSetEpr",
279 191
                // EPRUtils.eprToXml(outputEpr));
280
                job.getParameters().put("jobId",
281
                        Integer.toString(validationJobId));
192
                job.getParameters().put("jobId", Integer.toString(validationJobId));
282 193

  
283 194
                job.getParameters().put("recordsTested", Integer.toString(objsValidated));
284 195
//				 job.getParameters().put(
......
286 197
//				 Integer.toString(resultSetService
287 198
//				 .getNumberOfElements(outputResultSetID)));
288 199
                // update job status
200

  
201
                logger.info("Getting stored job");
202
                StoredJob storedJob = validatorManager.getStoredJob(jobId, this.groupBy);
203
                logger.info("Score: " + storedJob.getContentJobScore());
204
                job.getParameters().put("score", storedJob.getContentJobScore() + "");
205

  
289 206
                blackboardHandler.getBlackboardHandler().done(job);
290 207

  
291 208
            } else {
......
304 221
        try {
305 222
            internalJobsFinished++;
306 223
            if (internalJobsFinished == internalJobsSum) {
307
                if (enableOutputToRS && eprSetOk) {
224
                if (enableOutputToRS) {
308 225
                    resultSetService.closeRS(outputResultSetID);
309 226
                }
310
                if (enableOutputToDisk && diskSetOk) {
227
                if (enableOutputToDisk) {
311 228
                    outputDiskBuffer.clear();
312 229
                    bw.close();
313
                    File file = new File("/tmp/validator-wf/"
314
                            + validationJobId);
230
                    File file = new File("/tmp/validator-wf/" + validationJobId);
315 231
//					File file = new File("/tmp/validator-wf/"
316 232
//							+ job.getParameters().get("datasourceId"));
317 233
                    if (file.exists()) {
......
335 251

  
336 252
    private void sendToQueue(String xmlString) {
337 253
        logger.debug("received passed XMLresult");
338
        // add records
339
//		if (queue.size() == 100000) {
340
//			queue.clear();
341
//			resultSetService.
342
//		}
254

  
343 255
        try {
344 256
            queue.put(xmlString);
345 257
        } catch (InterruptedException e1) {
......
359 271
        }
360 272
    }
361 273

  
274
    private void initEprOutput() {
275
        try {
276
            logger.debug("Initializing ResultSetService.");
277
            // Get reference to service
278
            resultSetService = resultSetServiceLocator.getService();
279
            // Create a new result set
280
            outputEpr = resultSetService.createPushRS(86400, 0);
281
            // get result set ids
282
            outputResultSetID = outputEpr.getParameter("ResourceIdentifier");
283

  
284
            // initializing buffers
285
//            outputRSBuffer = new ArrayList<String>();
286

  
287
            queue = new LinkedBlockingQueue<String>();
288

  
289
        } catch (Exception e) {
290
            logger.error("Error initializing ResultSetService.", e);
291
            blackboardHandler.getBlackboardHandler().failed(job, e);
292
        }
293
    }
294

  
295
    private void initDiskOutput() {
296
        try {
297

  
298
            logger.debug("Initializing FileOutputStream.");
299
//			File file = new File("/var/lib/dnet/validator/workflow_blacklists/"
300
//					+ validationJobId);
301
            String datasourceId;
302
            if (job.getParameters().get("datasourceId") != null) {
303
                datasourceId = job.getParameters().get("datasourceId");
304
            } else if (job.getParameters().get("datasourceID") != null) {
305
                datasourceId = job.getParameters().get("datasourceID");
306
            } else {
307
                datasourceId = "unknownId";
308
            }
309

  
310
            File file = new File("/var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
311
            logger.debug("File: + /var/lib/dnet/validator/workflow_blacklists/" + datasourceId);
312
            if (file.exists()) {
313
                logger.debug("File will be replaced");
314
                file.delete();
315
            }
316
            file.createNewFile();
317

  
318
            bw = new BufferedWriter(new FileWriter(file.getAbsoluteFile()));
319

  
320
//			GZIPOutputStream zip = new GZIPOutputStream(new FileOutputStream(file.getAbsoluteFile()));
321

  
322
//			bw = new BufferedWriter(new OutputStreamWriter(zip, "UTF-8"));
323

  
324
            outputDiskBuffer = new ArrayList<String>();
325

  
326
        } catch (IOException e) {
327
            logger.error("Error initializing FileOutputStream.", e);
328
            blackboardHandler.getBlackboardHandler().failed(job, e);
329

  
330
        }
331
    }
332

  
362 333
    public RecordXMLBuilder getXmlBuilder() {
363 334
        return xmlBuilder;
364 335
    }
......
441 412
        this.enableOutputToRS = enableOutputToRS;
442 413
    }
443 414

  
415
    public ValidatorManager getValidatorManager() {
416
        return validatorManager;
417
    }
418

  
419
    public void setValidatorManager(ValidatorManager validatorManager) {
420
        this.validatorManager = validatorManager;
421
    }
422

  
423
    public String getGroupBy() {
424
        return groupBy;
425
    }
426

  
427
    public void setGroupBy(String groupBy) {
428
        this.groupBy = groupBy;
429
    }
444 430
}
modules/uoa-validator-service/trunk/src/main/resources/eu/dnetlib/validator/service/listeners/springContext-validator-listeners.xml
46 46
		<property name="valBaseUrl" value="${services.validator.webBaseUrl}" />
47 47
	</bean>
48 48
		
49
	<bean id ="openaireDnetListener" class="eu.dnetlib.validator.service.impls.listeners.DnetListener" scope="prototype">	
49
	<bean id ="openaireDnetListener" class="eu.dnetlib.validator.service.impls.listeners.DnetListener" scope="prototype" init-method="initOutputs">
50 50
		<property name="xmlBuilder" ref="recordXMLBuilder" />
51 51
		<property name="resultSetServiceLocator" ref="resultSetServiceLocator"/>
52 52
		<property name="rsExecutor" ref="jobExecutor" />

Also available in: Unified diff