Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.validation;
2

    
3
import javax.annotation.Resource;
4

    
5
import eu.dnetlib.enabling.datasources.common.Api;
6
import eu.dnetlib.enabling.datasources.common.Datasource;
7
import eu.dnetlib.enabling.datasources.common.DsmException;
8
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
9
import org.apache.commons.lang.StringUtils;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12

    
13
import com.googlecode.sarasvati.Engine;
14
import com.googlecode.sarasvati.NodeToken;
15

    
16
import eu.dnetlib.data.mdstore.MDStoreService;
17
import eu.dnetlib.data.mdstore.MDStoreServiceException;
18
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
19
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
20
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
21
import eu.dnetlib.miscutils.datetime.DateUtils;
22
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
23
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
24
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
25
import eu.dnetlib.msro.workflows.util.ProgressProvider;
26
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
27
import org.springframework.beans.factory.annotation.Autowired;
28

    
29
/**
30
 * This node send the BB message to the ValidatorService to validate records from an mdstore.
31
 * <p>
32
 * See ticket https://issue.openaire.research-infrastructures.eu/issues/538 for specs.
33
 * </p>
34
 *
35
 * @author andrea.mannocci, alessia
36
 *
37
 */
38
public class ValidatorServiceBlackboardJobNode extends BlackboardJobNode implements ProgressJobNode, ProgressProvider {
39

    
40
	public static final Log log = LogFactory.getLog(ValidatorServiceBlackboardJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
41

    
42
	public static final String VALIDATE_ACTION = "VALIDATE";
43
	public static final String VALIDATION_TYPE = "DNET";
44

    
45
	public static final String VALIDATOR_EPR_PARAM_NAME = "outputResultSetEpr";
46

    
47
	private String xQueryForValidatorServiceId;
48
	private String outputEprName = VALIDATOR_EPR_PARAM_NAME;
49

    
50
	// VALIDATOR JOB PARAM VALUES
51
	private String mdstoreId;
52
	private String guidelines;
53
	private String blacklistRuleName;
54
	private String recordsSampleSize;
55
	/** Json key-value map of other parameters that will be stored in the validator back-end as they are. **/
56
	private String extraParams;
57

    
58
	/** True to ask the validator to generate the output EPR. **/
59
	private boolean shouldOutputRecords = false;
60
	/** True to ask the validator to update the blacklist. **/
61
	private boolean shouldUpdateBlacklist = true;
62

    
63
	@Resource
64
	private UniqueServiceLocator serviceLocator;
65

    
66
	@Autowired
67
	private LocalDatasourceManager<Datasource<?, ?>, Api<?>> dsManager;
68

    
69
	/** Number of records after which the Validator must update its progress status in currentlyValidatedParam **/
70
	private int jobStatusUpdateInterval = 1000;
71
	/** Parameter where the validator tells how many records have been processed sofar **/
72
	private String currentlyValidatedParam = "recordsTested";
73
	/** Number of records validated so far. **/
74
	private int currentlyValidated = 0;
75
	private int total = 0;
76
	private boolean accurate;
77

    
78
	/**
79
	 * <p>
80
	 * Parameters to set for the VALIDATE BB job:
81
	 *
82
	 * <pre>
83
	 *     <PARAMETER name="type" value="DNET"/>
84
	 *     <PARAMETER name="guidelines" value="openaire3.0"/> //values: openaire3.0, openaire2.0, openaire2.0_data, driver
85
	 *     <PARAMETER name="records" value="#records"/> //OPTIONAL
86
	 *     <PARAMETER name="blacklistedRecords" value="true"/> //enables the feature to keep blacklisted records. Default is true.
87
	 *     <PARAMETER name="blacklistGuidelines" value="blacklist_ruleset"/> //optional. values: name of the ruleset (as created from the validator admin panel) to use to generate the blacklist. Default value is the value in "guidelines".
88
	 *     <PARAMETER name="datasourceID" value=""/>
89
	 *     <PARAMETER name="datasourceName" value=""/> //OPTIONAL
90
	 *     <PARAMETER name="datasourceNamespacePrefix" value=""/> //OPTIONAL
91
	 *     <PARAMETER name="interfaceId" value=" "/>
92
	 *     <PARAMETER name="baseUrl" value=""/> //OPTIONAL
93
	 *     <PARAMETER name="mdstoreId" value=" "/>
94
	 *     <PARAMETER name="outputEpr" value="boolean"/> //TRUE to get the EPR of validated records. Default is false.
95
	 *     <PARAMETER name="extraParams" value="json key-value map"/> //OPTIONAL
96
	 *     <PARAMETER name="submissionDate" value="YYYY-MM-DD HH24:MI:SS"/>
97
	 * </pre>
98
	 *
99
	 * </p>
100
	 * {@inheritDoc}
101
	 *
102
	 * @see eu.dnetlib.msro.workflows.nodes.BlackboardJobNode#prepareJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob,
103
	 *      com.googlecode.sarasvati.NodeToken)
104
	 */
105
	@Override
106
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
107

    
108
		job.setAction(VALIDATE_ACTION);
109
		job.getParameters().put("type", VALIDATION_TYPE);
110
		job.getParameters().put("guidelines", guidelines);
111
		job.getParameters().put("records", getRecordsSampleSize());
112
		// params for blacklist
113
		job.getParameters().put("blacklistedRecords", isShouldUpdateBlacklist() + "");
114
		if (isShouldUpdateBlacklist()) {
115
			job.getParameters().put("blacklistGuidelines", blacklistRuleName);
116
		}
117

    
118
		// datasource params
119
		final String datasourceId = token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ORIGINALID);
120
		final String api = token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
121
		job.getParameters().put("datasourceId", datasourceId);
122
		job.getParameters().put("datasourceName", token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_NAME));
123
		job.getParameters().put("datasourceNamespacePrefix", token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_NAMESPACE_PREFIX));
124

    
125
		job.getParameters().put("interfaceId", api);
126
		job.getParameters().put("baseUrl", token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL));
127
		job.getParameters().put("mdstoreId", mdstoreId);
128

    
129
		// output params
130
		job.getParameters().put("outputEpr", isShouldOutputRecords() + "");
131
		// extra params
132
		if (StringUtils.isNotBlank(extraParams)) {
133
			job.getParameters().put("extraParams", extraParams);
134
		}
135

    
136
		// Setting properties for the progress bar
137
		if (StringUtils.isBlank(getRecordsSampleSize()) || getRecordsSampleSize().equalsIgnoreCase("ALL")) {
138
			total = getTotal();
139
		} else {
140
			total = Integer.getInteger(getRecordsSampleSize());
141
		}
142
		if (jobStatusUpdateInterval > 0) {
143
			job.getParameters().put("jobStatusUpdateInterval", "" + jobStatusUpdateInterval);
144
		}
145

    
146
		log.debug("*****Launching validation with the following parameters:****");
147
		log.debug("Datasource id: " + datasourceId);
148
		log.debug("API: " + api);
149
		log.debug("Total records: " + total);
150
		log.debug("Number of records to validate: " + getRecordsSampleSize());
151
		log.debug("jobStatusUpdateInterval (set only if > 0, wf default is 1000, validator's default is 100): " + jobStatusUpdateInterval);
152

    
153
		job.getParameters().put("submissionDate", DateUtils.now_ISO8601());
154
	}
155

    
156
	private int getTotal() {
157
		int size = -1;
158
		accurate = false;
159
		try {
160
			if (StringUtils.isBlank(getRecordsSampleSize()) || getRecordsSampleSize().equalsIgnoreCase("ALL")) {
161
				size = serviceLocator.getService(MDStoreService.class).size(mdstoreId);
162
				accurate = true;
163
			} else {
164
				total = Integer.parseInt(getRecordsSampleSize());
165
				accurate = true;
166
			}
167
		} catch (final MDStoreServiceException e) {
168
			log.warn("Can't get size of mdstore " + mdstoreId + " progress will be INACCURATE");
169
			e.printStackTrace();
170
		} catch (final NumberFormatException e) {
171
			log.warn("The parameter recordSampleSize is invalid: progress will be INACCURATE");
172
			e.printStackTrace();
173

    
174
		}
175
		return size;
176
	}
177

    
178
	/**
179
	 * <p>
180
	 * Parameters in the VALIDATE BB job response:
181
	 *
182
	 * <pre>
183
	 * <PARAMETER name="error" value=""/>
184
	 * <PARAMETER name="outputResultSetEpr" value=""/> //only if isShouldOutputRecords() == true
185
	 * <PARAMETER name="jobId" value=""/>
186
	 * </pre>
187
	 *
188
	 * </p>
189
	 *
190
	 *
191
	 * {@inheritDoc}
192
	 *
193
	 * @see eu.dnetlib.msro.workflows.nodes.BlackboardJobNode#generateBlackboardListener(com.googlecode.sarasvati.Engine,
194
	 *      com.googlecode.sarasvati.NodeToken)
195
	 */
196
	@Override
197
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
198
		return new BlackboardWorkflowJobListener(engine, token) {
199

    
200
			boolean done = false;
201

    
202
			@Override
203
			protected void onDone(final BlackboardJob job) {
204
				if (!isShouldOutputRecords()) {
205
					log.debug("If you see this log you are not asking to get the validated EPR...");
206

    
207
					final String jobId = job.getParameters().get("jobId");
208

    
209
					token.getEnv().setAttribute("validatorJobId", jobId);
210
					final String error = job.getParameters().get("error");
211
					if (StringUtils.isNotBlank(error)) {
212
						token.getEnv().setAttribute("validatorError", error);
213
					}
214

    
215
					try {
216
						final String dsId = token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ORIGINALID);
217
						final String ifaceId = token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
218

    
219
						dsManager.setLastValidationJob(dsId, ifaceId, jobId);
220

    
221
					} catch (final DsmException e) {
222
						log.error("Error setting last_validation_job field");
223
						throw new RuntimeException("Error setting last_validation_job field", e);
224
					}
225

    
226
					super.onDone(job);
227
				}
228
			}
229

    
230
			@Override
231
			protected void onOngoing(final BlackboardJob job) {
232
				currentlyValidated = Integer.parseInt(job.getParameters().get(currentlyValidatedParam));
233
				if (isShouldOutputRecords()) {
234
					onGoingWithEPRResults(job);
235
				} else {
236
					onGoingWithoutEPRResults(job);
237
				}
238
			}
239

    
240
			private void onGoingWithEPRResults(final BlackboardJob job) {
241
				if (!done) {
242
					final String epr = job.getParameters().get(VALIDATOR_EPR_PARAM_NAME);
243
					log.fatal(VALIDATOR_EPR_PARAM_NAME + "= " + epr);
244
					token.getEnv().setAttribute(outputEprName, epr);
245
					done = true;
246
					//super.onDone(job);
247
				}
248
			}
249

    
250
			private void onGoingWithoutEPRResults(final BlackboardJob job) {
251
				super.onOngoing(job);
252
			}
253
		};
254

    
255
	}
256

    
257
	@Override
258
	protected String obtainServiceId(final NodeToken token) {
259
		try {
260
			return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xQueryForValidatorServiceId);
261
		} catch (final Exception e) {
262
			throw new RuntimeException(e);
263
		}
264
	}
265

    
266
	@Override
267
	public ProgressProvider getProgressProvider() {
268
		return this;
269
	}
270

    
271
	@Override
272
	public int getTotalValue() {
273
		return total;
274
	}
275

    
276
	@Override
277
	public int getCurrentValue() {
278
		return currentlyValidated;
279
	}
280

    
281
	@Override
282
	public boolean isInaccurate() {
283
		return !accurate;
284
	}
285

    
286
	public String getGuidelines() {
287
		return guidelines;
288
	}
289

    
290
	public void setGuidelines(final String guidelines) {
291
		this.guidelines = guidelines;
292
	}
293

    
294
	public String getRecordsSampleSize() {
295
		return recordsSampleSize;
296
	}
297

    
298
	public void setRecordsSampleSize(final String recordsSampleSize) {
299
		this.recordsSampleSize = recordsSampleSize;
300
	}
301

    
302
	public String getMdstoreId() {
303
		return mdstoreId;
304
	}
305

    
306
	public void setMdstoreId(final String mdstoreId) {
307
		this.mdstoreId = mdstoreId;
308
	}
309

    
310
	public String getExtra_param() {
311
		return extraParams;
312
	}
313

    
314
	public void setExtra_param(final String extra_param) {
315
		this.extraParams = extra_param;
316
	}
317

    
318
	public String getxQueryForValidatorServiceId() {
319
		return xQueryForValidatorServiceId;
320
	}
321

    
322
	public void setxQueryForValidatorServiceId(final String xQueryForValidatorServiceId) {
323
		this.xQueryForValidatorServiceId = xQueryForValidatorServiceId;
324
	}
325

    
326
	public String getOutputEprName() {
327
		return outputEprName;
328
	}
329

    
330
	public void setOutputEprName(final String outputEprName) {
331
		this.outputEprName = outputEprName;
332
	}
333

    
334
	public String getExtraParams() {
335
		return extraParams;
336
	}
337

    
338
	public void setExtraParams(final String extraParams) {
339
		this.extraParams = extraParams;
340
	}
341

    
342
	public String getCurrentlyValidatedParam() {
343
		return currentlyValidatedParam;
344
	}
345

    
346
	public void setCurrentlyValidatedParam(final String currentlyValidatedParam) {
347
		this.currentlyValidatedParam = currentlyValidatedParam;
348
	}
349

    
350
	public int getJobStatusUpdateInterval() {
351
		return jobStatusUpdateInterval;
352
	}
353

    
354
	public void setJobStatusUpdateInterval(final int jobStatusUpdateInterval) {
355
		this.jobStatusUpdateInterval = jobStatusUpdateInterval;
356
	}
357

    
358
	public int getCurrentlyValidated() {
359
		return currentlyValidated;
360
	}
361

    
362
	public void setCurrentlyValidated(final int currentlyValidated) {
363
		this.currentlyValidated = currentlyValidated;
364
	}
365

    
366
	public boolean isShouldOutputRecords() {
367
		return shouldOutputRecords;
368
	}
369

    
370
	public void setShouldOutputRecords(final boolean shouldOutputRecords) {
371
		this.shouldOutputRecords = shouldOutputRecords;
372
	}
373

    
374
	public boolean isShouldUpdateBlacklist() {
375
		return shouldUpdateBlacklist;
376
	}
377

    
378
	public void setShouldUpdateBlacklist(final boolean shouldUpdateBlacklist) {
379
		this.shouldUpdateBlacklist = shouldUpdateBlacklist;
380
	}
381

    
382
	public String getBlacklistRuleName() {
383
		return blacklistRuleName;
384
	}
385

    
386
	public void setBlacklistRuleName(final String blacklistRuleName) {
387
		this.blacklistRuleName = blacklistRuleName;
388
	}
389

    
390
}
(2-2/2)