Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.validation;
2

    
3
import javax.annotation.Resource;
4

    
5
import eu.dnetlib.enabling.datasources.common.Api;
6
import eu.dnetlib.enabling.datasources.common.Datasource;
7
import eu.dnetlib.enabling.datasources.common.DsmException;
8
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
9
import org.apache.commons.lang.StringUtils;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12

    
13
import com.googlecode.sarasvati.Engine;
14
import com.googlecode.sarasvati.NodeToken;
15

    
16
import eu.dnetlib.data.mdstore.MDStoreService;
17
import eu.dnetlib.data.mdstore.MDStoreServiceException;
18
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
19
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
20
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
21
import eu.dnetlib.miscutils.datetime.DateUtils;
22
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
23
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
24
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
25
import eu.dnetlib.msro.workflows.util.ProgressProvider;
26
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
27
import org.springframework.beans.factory.annotation.Autowired;
28

    
29
/**
30
 * This node send the BB message to the ValidatorService to validate records from an mdstore.
31
 * <p>
32
 * See ticket https://issue.openaire.research-infrastructures.eu/issues/538 for specs.
33
 * </p>
34
 *
35
 * @author andrea.mannocci, alessia
36
 *
37
 */
38
public class ValidatorServiceBlackboardJobNode extends BlackboardJobNode implements ProgressJobNode, ProgressProvider {
39

    
40
	public static final Log log = LogFactory.getLog(ValidatorServiceBlackboardJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
41

    
42
	public static final String VALIDATE_ACTION = "VALIDATE";
43
	public static final String VALIDATION_TYPE = "DNET";
44

    
45
	public static final String VALIDATOR_EPR_PARAM_NAME = "outputResultSetEpr";
46

    
47
	private String xQueryForValidatorServiceId;
48
	private String outputEprName = VALIDATOR_EPR_PARAM_NAME;
49

    
50
	// VALIDATOR JOB PARAM VALUES
51
	private String mdstoreId;
52
	private String guidelines;
53
	private String blacklistRuleName;
54
	private String recordsSampleSize;
55
	/** Json key-value map of other parameters that will be stored in the validator back-end as they are. **/
56
	private String extraParams;
57

    
58
	/** True to ask the validator to generate the output EPR. **/
59
	private boolean shouldOutputRecords = false;
60
	/** True to ask the validator to update the blacklist. **/
61
	private boolean shouldUpdateBlacklist = true;
62

    
63
	@Resource
64
	private UniqueServiceLocator serviceLocator;
65

    
66
	@Autowired
67
	private LocalDatasourceManager<Datasource<?, ?>, Api<?>> dsManager;
68

    
69
	/** Number of records after which the Validator must update its progress status in currentlyValidatedParam **/
70
	private int jobStatusUpdateInterval = 1000;
71
	/** Parameter where the validator tells how many records have been processed sofar **/
72
	private String currentlyValidatedParam = "recordsTested";
73
	/** Number of records validated so far. **/
74
	private int currentlyValidated = 0;
75
	private int total = 0;
76
	private boolean accurate;
77

    
78
	/**
79
	 * <p>
80
	 * Parameters to set for the VALIDATE BB job:
81
	 *
82
	 * <pre>
83
	 *     <PARAMETER name="type" value="DNET"/>
84
	 *     <PARAMETER name="guidelines" value="openaire3.0"/> //values: openaire3.0, openaire2.0, openaire2.0_data, driver
85
	 *     <PARAMETER name="records" value="#records"/> //OPTIONAL
86
	 *     <PARAMETER name="blacklistedRecords" value="true"/> //enables the feature to keep blacklisted records. Default is true.
87
	 *     <PARAMETER name="blacklistGuidelines" value="blacklist_ruleset"/> //optional. values: name of the ruleset (as created from the validator admin panel) to use to generate the blacklist. Default value is the value in "guidelines".
88
	 *     <PARAMETER name="datasourceID" value=""/>
89
	 *     <PARAMETER name="datasourceName" value=""/> //OPTIONAL
90
	 *     <PARAMETER name="datasourceNamespacePrefix" value=""/> //OPTIONAL
91
	 *     <PARAMETER name="interfaceId" value=" "/>
92
	 *     <PARAMETER name="baseUrl" value=""/> //OPTIONAL
93
	 *     <PARAMETER name="mdstoreId" value=" "/>
94
	 *     <PARAMETER name="outputEpr" value="boolean"/> //TRUE to get the EPR of validated records. Default is false.
95
	 *     <PARAMETER name="extraParams" value="json key-value map"/> //OPTIONAL
96
	 *     <PARAMETER name="submissionDate" value="YYYY-MM-DD HH24:MI:SS"/>
97
	 * </pre>
98
	 *
99
	 * </p>
100
	 * {@inheritDoc}
101
	 *
102
	 * @see eu.dnetlib.msro.workflows.nodes.BlackboardJobNode#prepareJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob,
103
	 *      com.googlecode.sarasvati.NodeToken)
104
	 */
105
	@Override
106
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
107

    
108
		job.setAction(VALIDATE_ACTION);
109
		job.getParameters().put("type", VALIDATION_TYPE);
110
		job.getParameters().put("guidelines", guidelines);
111
		job.getParameters().put("records", getRecordsSampleSize());
112
		// params for blacklist
113
		job.getParameters().put("blacklistedRecords", isShouldUpdateBlacklist() + "");
114
		if (isShouldUpdateBlacklist()) {
115
			job.getParameters().put("blacklistGuidelines", blacklistRuleName);
116
		}
117

    
118
		// datasource params
119
		final String datasourceId = token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ORIGINALID);
120
		final String api = token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
121
		job.getParameters().put("datasourceId", datasourceId);
122
		job.getParameters().put("datasourceName", token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_NAME));
123
		job.getParameters().put("datasourceNamespacePrefix", token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_NAMESPACE_PREFIX));
124

    
125
		job.getParameters().put("interfaceId", api);
126
		job.getParameters().put("baseUrl", token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE_BASEURL));
127
		job.getParameters().put("mdstoreId", mdstoreId);
128

    
129
		// output params
130
		job.getParameters().put("outputEpr", isShouldOutputRecords() + "");
131
		// extra params
132
		if (StringUtils.isNotBlank(extraParams)) {
133
			job.getParameters().put("extraParams", extraParams);
134
		}
135

    
136
		// Setting properties for the progress bar
137
		if (StringUtils.isBlank(getRecordsSampleSize()) || getRecordsSampleSize().equalsIgnoreCase("ALL")) {
138
			total = getTotal();
139
		} else {
140
			total = Integer.getInteger(getRecordsSampleSize());
141
		}
142
		if (jobStatusUpdateInterval > 0) {
143
			job.getParameters().put("jobStatusUpdateInterval", "" + jobStatusUpdateInterval);
144
		}
145

    
146
		log.debug("*****Launching validation with the following parameters:****");
147
		log.debug("Datasource id: " + datasourceId);
148
		log.debug("API: " + api);
149
		log.debug("Total records: " + total);
150
		log.debug("Number of records to validate: " + getRecordsSampleSize());
151
		log.debug("jobStatusUpdateInterval (set only if > 0, wf default is 1000, validator's default is 100): " + jobStatusUpdateInterval);
152

    
153
		job.getParameters().put("submissionDate", DateUtils.now_ISO8601());
154
	}
155

    
156
	private int getTotal() {
157
		int size = -1;
158
		accurate = false;
159
		try {
160
			if (StringUtils.isBlank(getRecordsSampleSize()) || getRecordsSampleSize().equalsIgnoreCase("ALL")) {
161
				size = serviceLocator.getService(MDStoreService.class).size(mdstoreId);
162
				accurate = true;
163
			} else {
164
				total = Integer.parseInt(getRecordsSampleSize());
165
				accurate = true;
166
			}
167
		} catch (final MDStoreServiceException e) {
168
			log.warn("Can't get size of mdstore " + mdstoreId + " progress will be INACCURATE");
169
			e.printStackTrace();
170
		} catch (final NumberFormatException e) {
171
			log.warn("The parameter recordSampleSize is invalid: progress will be INACCURATE");
172
			e.printStackTrace();
173

    
174
		}
175
		return size;
176
	}
177

    
178
	/**
179
	 * <p>
180
	 * Parameters in the VALIDATE BB job response:
181
	 *
182
	 * <pre>
183
	 * <PARAMETER name="error" value=""/>
184
	 * <PARAMETER name="outputResultSetEpr" value=""/> //only if isShouldOutputRecords() == true
185
	 * <PARAMETER name="jobId" value=""/>
186
	 * </pre>
187
	 *
188
	 * </p>
189
	 *
190
	 *
191
	 * {@inheritDoc}
192
	 *
193
	 * @see eu.dnetlib.msro.workflows.nodes.BlackboardJobNode#generateBlackboardListener(com.googlecode.sarasvati.Engine,
194
	 *      com.googlecode.sarasvati.NodeToken)
195
	 */
196
	@Override
197
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
198
		return new BlackboardWorkflowJobListener(engine, token) {
199

    
200
			boolean eprFound = false;
201

    
202
			@Override
203
			protected void onDone(final BlackboardJob job) {
204

    
205
				log.debug("If you see this log you are not asking to get the validated EPR...");
206

    
207
				final String jobId = job.getParameters().get("jobId");
208

    
209
				token.getEnv().setAttribute("validatorJobId", jobId);
210
				final String error = job.getParameters().get("error");
211
				if (StringUtils.isNotBlank(error)) {
212
					token.getEnv().setAttribute("validatorError", error);
213
				}
214

    
215
				try {
216
					final String dsId = token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ORIGINALID);
217
					final String ifaceId = token.getFullEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
218

    
219
					dsManager.setLastValidationJob(dsId, ifaceId, jobId);
220

    
221
				} catch (final DsmException e) {
222
					log.error("Error setting last_validation_job field");
223
					throw new RuntimeException("Error setting last_validation_job field", e);
224
				}
225
				
226
				super.onDone(job);
227
			}
228

    
229
			@Override
230
			protected void onOngoing(final BlackboardJob job) {
231
				currentlyValidated = Integer.parseInt(job.getParameters().get(currentlyValidatedParam));
232
				if (isShouldOutputRecords() & !eprFound) {
233
					final String epr = job.getParameters().get(VALIDATOR_EPR_PARAM_NAME);
234
					log.fatal(VALIDATOR_EPR_PARAM_NAME + "= " + epr);
235
					token.getEnv().setAttribute(outputEprName, epr);
236
					eprFound = true;
237
				}
238
				super.onOngoing(job);
239
			}
240

    
241
		};
242

    
243
	}
244

    
245
	@Override
246
	protected String obtainServiceId(final NodeToken token) {
247
		try {
248
			return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xQueryForValidatorServiceId);
249
		} catch (final Exception e) {
250
			throw new RuntimeException(e);
251
		}
252
	}
253

    
254
	@Override
255
	public ProgressProvider getProgressProvider() {
256
		return this;
257
	}
258

    
259
	@Override
260
	public int getTotalValue() {
261
		return total;
262
	}
263

    
264
	@Override
265
	public int getCurrentValue() {
266
		return currentlyValidated;
267
	}
268

    
269
	@Override
270
	public boolean isInaccurate() {
271
		return !accurate;
272
	}
273

    
274
	public String getGuidelines() {
275
		return guidelines;
276
	}
277

    
278
	public void setGuidelines(final String guidelines) {
279
		this.guidelines = guidelines;
280
	}
281

    
282
	public String getRecordsSampleSize() {
283
		return recordsSampleSize;
284
	}
285

    
286
	public void setRecordsSampleSize(final String recordsSampleSize) {
287
		this.recordsSampleSize = recordsSampleSize;
288
	}
289

    
290
	public String getMdstoreId() {
291
		return mdstoreId;
292
	}
293

    
294
	public void setMdstoreId(final String mdstoreId) {
295
		this.mdstoreId = mdstoreId;
296
	}
297

    
298
	public String getExtra_param() {
299
		return extraParams;
300
	}
301

    
302
	public void setExtra_param(final String extra_param) {
303
		this.extraParams = extra_param;
304
	}
305

    
306
	public String getxQueryForValidatorServiceId() {
307
		return xQueryForValidatorServiceId;
308
	}
309

    
310
	public void setxQueryForValidatorServiceId(final String xQueryForValidatorServiceId) {
311
		this.xQueryForValidatorServiceId = xQueryForValidatorServiceId;
312
	}
313

    
314
	public String getOutputEprName() {
315
		return outputEprName;
316
	}
317

    
318
	public void setOutputEprName(final String outputEprName) {
319
		this.outputEprName = outputEprName;
320
	}
321

    
322
	public String getExtraParams() {
323
		return extraParams;
324
	}
325

    
326
	public void setExtraParams(final String extraParams) {
327
		this.extraParams = extraParams;
328
	}
329

    
330
	public String getCurrentlyValidatedParam() {
331
		return currentlyValidatedParam;
332
	}
333

    
334
	public void setCurrentlyValidatedParam(final String currentlyValidatedParam) {
335
		this.currentlyValidatedParam = currentlyValidatedParam;
336
	}
337

    
338
	public int getJobStatusUpdateInterval() {
339
		return jobStatusUpdateInterval;
340
	}
341

    
342
	public void setJobStatusUpdateInterval(final int jobStatusUpdateInterval) {
343
		this.jobStatusUpdateInterval = jobStatusUpdateInterval;
344
	}
345

    
346
	public int getCurrentlyValidated() {
347
		return currentlyValidated;
348
	}
349

    
350
	public void setCurrentlyValidated(final int currentlyValidated) {
351
		this.currentlyValidated = currentlyValidated;
352
	}
353

    
354
	public boolean isShouldOutputRecords() {
355
		return shouldOutputRecords;
356
	}
357

    
358
	public void setShouldOutputRecords(final boolean shouldOutputRecords) {
359
		this.shouldOutputRecords = shouldOutputRecords;
360
	}
361

    
362
	public boolean isShouldUpdateBlacklist() {
363
		return shouldUpdateBlacklist;
364
	}
365

    
366
	public void setShouldUpdateBlacklist(final boolean shouldUpdateBlacklist) {
367
		this.shouldUpdateBlacklist = shouldUpdateBlacklist;
368
	}
369

    
370
	public String getBlacklistRuleName() {
371
		return blacklistRuleName;
372
	}
373

    
374
	public void setBlacklistRuleName(final String blacklistRuleName) {
375
		this.blacklistRuleName = blacklistRuleName;
376
	}
377

    
378
}
(2-2/2)