Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes;
2

    
3
import javax.annotation.Resource;
4

    
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

    
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11

    
12
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
13
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
14
import eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
16
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobImpl;
17
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobRegistry;
18
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
19
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
20

    
21
public abstract class BlackboardJobNode extends SarasvatiJobNode {
22

    
23
	@Resource
24
	private UniqueServiceLocator serviceLocator;
25

    
26
	/**
27
	 * logger.
28
	 */
29
	private static final Log log = LogFactory.getLog(BlackboardJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
30

    
31
	/**
32
	 * blackboard handler.
33
	 */
34
	@Resource
35
	private BlackboardClientHandler blackboardClientHandler;
36

    
37
	/**
38
	 * blackboard job registry.
39
	 */
40
	@Resource
41
	private BlackboardJobRegistry jobRegistry;
42

    
43
	@Override
44
	public void execute(final Engine engine, final NodeToken token) {
45
		super.execute(engine, token);
46

    
47
		log.info("executing blackboard node");
48

    
49
		try {
50
			token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_IS_BLACKBOARD, true);
51
			final String query = getXqueryForServiceId(token);
52
			final String serviceId = serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
53
			if (StringUtils.isBlank(serviceId)) {
54
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
55
				final String msg = "cannot locate target service profile, using query: " + query;
56
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, msg);
57
				log.error(msg);
58
				engine.complete(token, "failed");
59
				return;
60
			}
61

    
62
			final BlackboardJob job = blackboardClientHandler.newJob(serviceId);
63

    
64
			token.getEnv().setTransientAttribute(WorkflowsConstants.BLACKBOARD_JOB, job);
65
			token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_SERVICE_ID, ((BlackboardJobImpl) job).getServiceId());
66
			prepareJob(job, token);
67

    
68
			jobRegistry.registerJobListener(job, generateBlackboardListener(engine, token));
69

    
70
			blackboardClientHandler.assign(job);
71

    
72
		} catch (final Throwable e) {
73
			token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
74
			token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, "cannot prepare blackboard job: " + e);
75
			engine.complete(token, "failed");
76
			log.error("cannot prepare blackboard job", e);
77
		}
78
	}
79

    
80
	abstract protected String getXqueryForServiceId(NodeToken token);
81

    
82
	abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;
83

    
84
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
85
		return new BlackboardWorkflowJobListener(engine, token);
86
	}
87

    
88
}
(2-2/8)