Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes;
2

    
3
import javax.annotation.Resource;
4

    
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

    
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11

    
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13
import eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler;
14
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobImpl;
16
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobRegistry;
17
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
18
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
19

    
20
public abstract class BlackboardJobNode extends SarasvatiJobNode {
21

    
22
	@Resource
23
	private UniqueServiceLocator serviceLocator;
24

    
25
	/**
26
	 * logger.
27
	 */
28
	private static final Log log = LogFactory.getLog(BlackboardJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
29

    
30
	/**
31
	 * blackboard handler.
32
	 */
33
	@Resource
34
	private BlackboardClientHandler blackboardClientHandler;
35

    
36
	/**
37
	 * blackboard job registry.
38
	 */
39
	@Resource
40
	private BlackboardJobRegistry jobRegistry;
41

    
42
	@Override
43
	public void execute(final Engine engine, final NodeToken token) {
44
		super.execute(engine, token);
45

    
46
		log.info("executing blackboard node");
47

    
48
		try {
49
			token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_IS_BLACKBOARD, true);
50

    
51
			final String serviceId = obtainServiceId(token);
52
			if (StringUtils.isBlank(serviceId)) {
53
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
54
				final String msg = "cannot locate target service profile: " + serviceId;
55
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, msg);
56
				log.error(msg);
57
				engine.complete(token, "failed");
58
				return;
59
			}
60

    
61
			final BlackboardJob job = blackboardClientHandler.newJob(serviceId);
62

    
63
			token.getEnv().setTransientAttribute(WorkflowsConstants.BLACKBOARD_JOB, job);
64
			token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_SERVICE_ID, ((BlackboardJobImpl) job).getServiceId());
65
			prepareJob(job, token);
66

    
67
			jobRegistry.registerJobListener(job, generateBlackboardListener(engine, token));
68

    
69
			blackboardClientHandler.assign(job);
70

    
71
		} catch (final Throwable e) {
72
			token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
73
			token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, "cannot prepare blackboard job: " + e);
74
			engine.complete(token, "failed");
75
			log.error("cannot prepare blackboard job", e);
76
		}
77
	}
78

    
79
	abstract protected String obtainServiceId(NodeToken token);
80

    
81
	abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;
82

    
83
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
84
		return new BlackboardWorkflowJobListener(engine, token);
85
	}
86

    
87
	public UniqueServiceLocator getServiceLocator() {
88
		return serviceLocator;
89
	}
90

    
91
}
(2-2/9)