Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.validation;
2

    
3
import javax.annotation.Resource;
4

    
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

    
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11

    
12
import eu.dnetlib.data.mdstore.MDStoreService;
13
import eu.dnetlib.data.mdstore.MDStoreServiceException;
14
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
15
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
16
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
17
import eu.dnetlib.miscutils.datetime.DateUtils;
18
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
19
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
20
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
21
import eu.dnetlib.msro.workflows.util.ProgressProvider;
22

    
23
/**
24
 * This node send the BB message to the ValidatorService to validate records from an mdstore.
25
 * <p>
26
 * See ticket https://issue.openaire.research-infrastructures.eu/issues/538 for specs.
27
 * </p>
28
 * 
29
 * @author andrea.mannocci, alessia
30
 * 
31
 */
32
public class ValidatorServiceBlackboardJobNode extends BlackboardJobNode implements ProgressJobNode, ProgressProvider {
33

    
34
	public static final Log log = LogFactory.getLog(ValidatorServiceBlackboardJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
35

    
36
	public static final String VALIDATE_ACTION = "VALIDATE";
37
	public static final String VALIDATION_TYPE = "DNET";
38

    
39
	public static final String VALIDATOR_EPR_PARAM_NAME = "outputResultSetEpr";
40

    
41
	private String xQueryForValidatorServiceId;
42
	private String outputEprName = VALIDATOR_EPR_PARAM_NAME;
43

    
44
	// VALIDATOR JOB PARAM VALUES
45
	/** Where to find the id of the datasource in the env. **/
46
	private String datasourceIDEnvParam;
47
	/** Where to find the name of the datasource in the env. **/
48
	private String datasourceNameEnvParam;
49

    
50
	private String mdstoreId;
51
	private String guidelines;
52
	private String recordsSampleSize;
53
	/** Json key-value map of other parameters that will be stored in the validator back-end as they are. **/
54
	private String extraParams;
55

    
56
	// PROGRESS
57
	@Resource
58
	private UniqueServiceLocator serviceLocator;
59

    
60
	/** Number of records after which the Validator must update its progress status in currentlyValidatedParam **/
61
	private int jobStatusUpdateInterval = 1000;
62
	/** Parameter where the validator tells how many records have been processed sofar **/
63
	private String currentlyValidatedParam = "recordsTested";
64
	/** Number of records validated so far. **/
65
	private int currentlyValidated = 0;
66
	private int total = 0;
67
	private boolean accurate;
68

    
69
	/**
70
	 * <p>
71
	 * Parameters to set for the VALIDATE BB job:
72
	 * 
73
	 * <pre>
74
	 * <PARAMETER name="mdstoreId" value=" "/>
75
	 * <PARAMETER name="guidelines" value="openaire3.0"/> //values: openaire3.0, openaire2.0, openaire2.0_data, driver
76
	 * <PARAMETER name="type" value="DNET"/>
77
	 * <PARAMETER name="datasourceID" value=""/>
78
	 * <PARAMETER name="datasourceNamespacePrefix" value=""/> //OPTIONAL
79
	 * <PARAMETER name="datasourceName" value=""/> //OPTIONAL
80
	 * <PARAMETER name="submissionDate" value="YYYY-MM-DD HH24:MI:SS"/>
81
	 * <PARAMETER name="baseUrl" value=""/> //OPTIONAL
82
	 * <PARAMETER name="records" value="#records"/> //OPTIONAL
83
	 * <PARAMETER name="extraParams" value="json key-value map"/> //OPTIONAL
84
	 * </pre>
85
	 * 
86
	 * </p>
87
	 * {@inheritDoc}
88
	 * 
89
	 * @see eu.dnetlib.msro.workflows.nodes.BlackboardJobNode#prepareJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob,
90
	 *      com.googlecode.sarasvati.NodeToken)
91
	 */
92
	@Override
93
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
94

    
95
		// required params
96
		job.setAction(VALIDATE_ACTION);
97
		job.getParameters().put("datasourceID", token.getFullEnv().getAttribute(datasourceIDEnvParam));
98
		String datasourceName = token.getFullEnv().getAttribute(datasourceNameEnvParam);
99
		if (StringUtils.isNotBlank(datasourceName)) {
100
			job.getParameters().put("datasourceName", datasourceName);
101
		}
102

    
103
		job.getParameters().put("mdstoreId", mdstoreId);
104
		job.getParameters().put("guidelines", guidelines);
105
		job.getParameters().put("type", VALIDATION_TYPE);
106

    
107
		job.getParameters().put("submissionDate", DateUtils.now_ISO8601());
108

    
109
		// optional params
110
		String nsPrefix = token.getEnv().getAttribute("datasourceNamespacePrefix");
111
		if (StringUtils.isNotBlank(nsPrefix)) {
112
			job.getParameters().put("datasourceNamespacePrefix", nsPrefix);
113
		}
114
		String baseUrl = token.getEnv().getAttribute("baseUrl");
115
		if (StringUtils.isNotBlank(baseUrl)) {
116
			job.getParameters().put("baseUrl", baseUrl);
117
		}
118
		if (StringUtils.isNotBlank(getRecordsSampleSize())) {
119
			job.getParameters().put("records", getRecordsSampleSize());
120
		}
121
		if (StringUtils.isBlank(getRecordsSampleSize()) || getRecordsSampleSize().equalsIgnoreCase("ALL")) {
122
			total = getTotal();
123
		} else {
124
			total = Integer.getInteger(getRecordsSampleSize());
125
		}
126

    
127
		if (StringUtils.isNotBlank(extraParams)) {
128
			job.getParameters().put("extraParams", extraParams);
129
		}
130

    
131
		if (jobStatusUpdateInterval > 0) {
132
			job.getParameters().put("jobStatusUpdateInterval", "" + jobStatusUpdateInterval);
133
		}
134

    
135
		log.debug("*****Launching validation with the following parameters:****");
136
		log.debug("Total records: " + total);
137
		log.debug("Number of records to validate: " + getRecordsSampleSize());
138
		log.debug("jobStatusUpdateInterval (set only if > 0, wf default is 1000, validator's default is 100): " + jobStatusUpdateInterval);
139
	}
140

    
141
	private int getTotal() {
142
		int size = -1;
143
		accurate = false;
144
		try {
145
			if (StringUtils.isBlank(getRecordsSampleSize()) || getRecordsSampleSize().equalsIgnoreCase("ALL")) {
146
				size = serviceLocator.getService(MDStoreService.class).size(mdstoreId);
147
				accurate = true;
148
			} else {
149
				total = Integer.parseInt(getRecordsSampleSize());
150
				accurate = true;
151
			}
152
		} catch (MDStoreServiceException e) {
153
			log.warn("Can't get size of mdstore " + mdstoreId + " progress will be INACCURATE");
154
			e.printStackTrace();
155
		} catch (NumberFormatException e) {
156
			log.warn("The parameter recordSampleSize is invalid: progress will be INACCURATE");
157
			e.printStackTrace();
158
		}
159
		return size;
160
	}
161

    
162
	/**
163
	 * <p>
164
	 * Parameters in the VALIDATE BB job response:
165
	 * 
166
	 * <pre>
167
	 * <PARAMETER name="error" value=""/>
168
	 * <PARAMETER name="outputResultSetEpr" value=""/>
169
	 * <PARAMETER name="jobId" value=""/>
170
	 * </pre>
171
	 * 
172
	 * </p>
173
	 * 
174
	 * 
175
	 * {@inheritDoc}
176
	 * 
177
	 * @see eu.dnetlib.msro.workflows.nodes.BlackboardJobNode#generateBlackboardListener(com.googlecode.sarasvati.Engine,
178
	 *      com.googlecode.sarasvati.NodeToken)
179
	 */
180
	@Override
181
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
182
		return new BlackboardWorkflowJobListener(engine, token) {
183

    
184
			@Override
185
			protected void onDone(final BlackboardJob job) {
186

    
187
				token.getEnv().setAttribute("validatorJobId", job.getParameters().get("jobId"));
188
				String error = job.getParameters().get("error");
189
				if (StringUtils.isNotBlank(error)) {
190
					token.getEnv().setAttribute("validatorError", error);
191
				}
192
				super.onDone(job);
193
			}
194

    
195
			@Override
196
			protected void onOngoing(final BlackboardJob job) {
197
				try {
198
					Thread.sleep(5000);
199
				} catch (InterruptedException e) {
200
					log.error("Exception while sleeping..." + e.getMessage());
201
					super.onFailed(job);
202
				}
203
				String epr = job.getParameters().get(VALIDATOR_EPR_PARAM_NAME);
204
				log.fatal(VALIDATOR_EPR_PARAM_NAME + "= " + epr);
205
				token.getEnv().setAttribute(outputEprName, epr);
206
				// currentlyValidated = Integer.parseInt(job.getParameters().get(currentlyValidatedParam));
207
				super.onDone(job);
208
			}
209

    
210
		};
211
	}
212

    
213
	@Override
214
	protected String obtainServiceId(final NodeToken token) {
215
		try {
216
			return getServiceLocator().getService(ISLookUpService.class).getResourceProfileByQuery(xQueryForValidatorServiceId);
217
		} catch (Exception e) {
218
			throw new RuntimeException(e);
219
		}
220
	}
221

    
222
	@Override
223
	public ProgressProvider getProgressProvider() {
224
		return this;
225
	}
226

    
227
	@Override
228
	public int getTotalValue() {
229
		return total;
230
	}
231

    
232
	@Override
233
	public int getCurrentValue() {
234
		return currentlyValidated;
235
	}
236

    
237
	@Override
238
	public boolean isInaccurate() {
239
		return !accurate;
240
	}
241

    
242
	public String getGuidelines() {
243
		return guidelines;
244
	}
245

    
246
	public void setGuidelines(final String guidelines) {
247
		this.guidelines = guidelines;
248
	}
249

    
250
	public String getRecordsSampleSize() {
251
		return recordsSampleSize;
252
	}
253

    
254
	public void setRecordsSampleSize(final String recordsSampleSize) {
255
		this.recordsSampleSize = recordsSampleSize;
256
	}
257

    
258
	public String getMdstoreId() {
259
		return mdstoreId;
260
	}
261

    
262
	public void setMdstoreId(final String mdstoreId) {
263
		this.mdstoreId = mdstoreId;
264
	}
265

    
266
	public String getExtra_param() {
267
		return extraParams;
268
	}
269

    
270
	public void setExtra_param(final String extra_param) {
271
		this.extraParams = extra_param;
272
	}
273

    
274
	public String getxQueryForValidatorServiceId() {
275
		return xQueryForValidatorServiceId;
276
	}
277

    
278
	public void setxQueryForValidatorServiceId(final String xQueryForValidatorServiceId) {
279
		this.xQueryForValidatorServiceId = xQueryForValidatorServiceId;
280
	}
281

    
282
	public String getDatasourceIDEnvParam() {
283
		return datasourceIDEnvParam;
284
	}
285

    
286
	public void setDatasourceIDEnvParam(final String datasourceIDEnvParam) {
287
		this.datasourceIDEnvParam = datasourceIDEnvParam;
288
	}
289

    
290
	public String getDatasourceNameEnvParam() {
291
		return datasourceNameEnvParam;
292
	}
293

    
294
	public void setDatasourceNameEnvParam(final String datasourceNameEnvParam) {
295
		this.datasourceNameEnvParam = datasourceNameEnvParam;
296
	}
297

    
298
	public String getOutputEprName() {
299
		return outputEprName;
300
	}
301

    
302
	public void setOutputEprName(final String outputEprName) {
303
		this.outputEprName = outputEprName;
304
	}
305

    
306
	public String getExtraParams() {
307
		return extraParams;
308
	}
309

    
310
	public void setExtraParams(final String extraParams) {
311
		this.extraParams = extraParams;
312
	}
313

    
314
	public String getCurrentlyValidatedParam() {
315
		return currentlyValidatedParam;
316
	}
317

    
318
	public void setCurrentlyValidatedParam(final String currentlyValidatedParam) {
319
		this.currentlyValidatedParam = currentlyValidatedParam;
320
	}
321

    
322
	public int getJobStatusUpdateInterval() {
323
		return jobStatusUpdateInterval;
324
	}
325

    
326
	public void setJobStatusUpdateInterval(final int jobStatusUpdateInterval) {
327
		this.jobStatusUpdateInterval = jobStatusUpdateInterval;
328
	}
329

    
330
	public int getCurrentlyValidated() {
331
		return currentlyValidated;
332
	}
333

    
334
	public void setCurrentlyValidated(final int currentlyValidated) {
335
		this.currentlyValidated = currentlyValidated;
336
	}
337

    
338
}
(2-2/2)