Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.msro.workflows.nodes;
2
3
import javax.annotation.Resource;
4
5 30782 claudio.at
import org.apache.commons.lang.StringUtils;
6 26600 sandro.lab
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11
12 32639 michele.ar
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13 26600 sandro.lab
import eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler;
14
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobImpl;
16
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobRegistry;
17
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
18
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
19
20 30782 claudio.at
public abstract class BlackboardJobNode extends SarasvatiJobNode {
21 26600 sandro.lab
22 32639 michele.ar
	@Resource
23
	private UniqueServiceLocator serviceLocator;
24 30782 claudio.at
25 26600 sandro.lab
	/**
26
	 * logger.
27
	 */
28
	private static final Log log = LogFactory.getLog(BlackboardJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
29
30
	/**
31
	 * blackboard handler.
32
	 */
33
	@Resource
34
	private BlackboardClientHandler blackboardClientHandler;
35
36
	/**
37
	 * blackboard job registry.
38
	 */
39
	@Resource
40
	private BlackboardJobRegistry jobRegistry;
41
42
	@Override
43
	public void execute(final Engine engine, final NodeToken token) {
44
		super.execute(engine, token);
45 30782 claudio.at
46 26600 sandro.lab
		log.info("executing blackboard node");
47
48
		try {
49
			token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_IS_BLACKBOARD, true);
50 32877 michele.ar
51
			final String serviceId = obtainServiceId(token);
52 30787 claudio.at
			if (StringUtils.isBlank(serviceId)) {
53 26600 sandro.lab
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
54 32877 michele.ar
				final String msg = "cannot locate target service profile: " + serviceId;
55 30787 claudio.at
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, msg);
56
				log.error(msg);
57 26600 sandro.lab
				engine.complete(token, "failed");
58
				return;
59
			}
60
61
			final BlackboardJob job = blackboardClientHandler.newJob(serviceId);
62 30782 claudio.at
63 26600 sandro.lab
			token.getEnv().setTransientAttribute(WorkflowsConstants.BLACKBOARD_JOB, job);
64
			token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_SERVICE_ID, ((BlackboardJobImpl) job).getServiceId());
65
			prepareJob(job, token);
66
67
			jobRegistry.registerJobListener(job, generateBlackboardListener(engine, token));
68
69
			blackboardClientHandler.assign(job);
70
71 30782 claudio.at
		} catch (final Throwable e) {
72 26600 sandro.lab
			token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
73
			token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, "cannot prepare blackboard job: " + e);
74
			engine.complete(token, "failed");
75 30787 claudio.at
			log.error("cannot prepare blackboard job", e);
76 26600 sandro.lab
		}
77
	}
78
79 32877 michele.ar
	abstract protected String obtainServiceId(NodeToken token);
80 30782 claudio.at
81 26600 sandro.lab
	abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;
82 30782 claudio.at
83
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
84 26600 sandro.lab
		return new BlackboardWorkflowJobListener(engine, token);
85
	}
86
87 32877 michele.ar
	public UniqueServiceLocator getServiceLocator() {
88
		return serviceLocator;
89
	}
90
91 26600 sandro.lab
}