Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.msro.workflows.nodes;
2
3
import javax.annotation.Resource;
4
5 30782 claudio.at
import org.apache.commons.lang.StringUtils;
6 26600 sandro.lab
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11
12
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
13 32639 michele.ar
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
14 26600 sandro.lab
import eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
16
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobImpl;
17
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobRegistry;
18
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
19
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
20
21 30782 claudio.at
public abstract class BlackboardJobNode extends SarasvatiJobNode {
22 26600 sandro.lab
23 32639 michele.ar
	@Resource
24
	private UniqueServiceLocator serviceLocator;
25 30782 claudio.at
26 26600 sandro.lab
	/**
27
	 * logger.
28
	 */
29
	private static final Log log = LogFactory.getLog(BlackboardJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
30
31
	/**
32
	 * blackboard handler.
33
	 */
34
	@Resource
35
	private BlackboardClientHandler blackboardClientHandler;
36
37
	/**
38
	 * blackboard job registry.
39
	 */
40
	@Resource
41
	private BlackboardJobRegistry jobRegistry;
42
43
	@Override
44
	public void execute(final Engine engine, final NodeToken token) {
45
		super.execute(engine, token);
46 30782 claudio.at
47 26600 sandro.lab
		log.info("executing blackboard node");
48
49
		try {
50
			token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_IS_BLACKBOARD, true);
51 30819 michele.ar
			final String query = getXqueryForServiceId(token);
52 32639 michele.ar
			final String serviceId = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
53 30787 claudio.at
			if (StringUtils.isBlank(serviceId)) {
54 26600 sandro.lab
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
55 30819 michele.ar
				final String msg = "cannot locate target service profile, using query: " + query;
56 30787 claudio.at
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, msg);
57
				log.error(msg);
58 26600 sandro.lab
				engine.complete(token, "failed");
59
				return;
60
			}
61
62
			final BlackboardJob job = blackboardClientHandler.newJob(serviceId);
63 30782 claudio.at
64 26600 sandro.lab
			token.getEnv().setTransientAttribute(WorkflowsConstants.BLACKBOARD_JOB, job);
65
			token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_SERVICE_ID, ((BlackboardJobImpl) job).getServiceId());
66
			prepareJob(job, token);
67
68
			jobRegistry.registerJobListener(job, generateBlackboardListener(engine, token));
69
70
			blackboardClientHandler.assign(job);
71
72 30782 claudio.at
		} catch (final Throwable e) {
73 26600 sandro.lab
			token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
74
			token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, "cannot prepare blackboard job: " + e);
75
			engine.complete(token, "failed");
76 30787 claudio.at
			log.error("cannot prepare blackboard job", e);
77 26600 sandro.lab
		}
78
	}
79
80
	abstract protected String getXqueryForServiceId(NodeToken token);
81 30782 claudio.at
82 26600 sandro.lab
	abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;
83 30782 claudio.at
84
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
85 26600 sandro.lab
		return new BlackboardWorkflowJobListener(engine, token);
86
	}
87
88
}