Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes;
2

    
3
import javax.annotation.Resource;
4

    
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7

    
8
import com.googlecode.sarasvati.Engine;
9
import com.googlecode.sarasvati.NodeToken;
10

    
11
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
12
import eu.dnetlib.enabling.tools.ServiceLocator;
13
import eu.dnetlib.enabling.tools.blackboard.BlackboardClientHandler;
14
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobImpl;
16
import eu.dnetlib.enabling.tools.blackboard.BlackboardJobRegistry;
17
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
18
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
19

    
20

    
21
public abstract class BlackboardJobNode extends SarasvatiJobNode {
22
	
23
	@Resource(name="lookupLocator")
24
	private ServiceLocator<ISLookUpService> lookupLocator;
25
	
26
	/**
27
	 * logger.
28
	 */
29
	private static final Log log = LogFactory.getLog(BlackboardJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
30

    
31
	/**
32
	 * blackboard handler.
33
	 */
34
	@Resource
35
	private BlackboardClientHandler blackboardClientHandler;
36

    
37
	/**
38
	 * blackboard job registry.
39
	 */
40
	@Resource
41
	private BlackboardJobRegistry jobRegistry;
42

    
43
	@Override
44
	public void execute(final Engine engine, final NodeToken token) {
45
		super.execute(engine, token);
46
		
47
		log.info("executing blackboard node");
48

    
49
		try {
50
			token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_IS_BLACKBOARD, true);
51

    
52
			final String serviceId = lookupLocator.getService().getResourceProfileByQuery(getXqueryForServiceId(token));
53
			if (serviceId == null) {
54
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
55
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, "cannot locate target service profile");
56
				engine.complete(token, "failed");
57
				return;
58
			}
59

    
60
			final BlackboardJob job = blackboardClientHandler.newJob(serviceId);
61
			
62
			token.getEnv().setTransientAttribute(WorkflowsConstants.BLACKBOARD_JOB, job);
63
			token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_SERVICE_ID, ((BlackboardJobImpl) job).getServiceId());
64
			prepareJob(job, token);
65

    
66
			jobRegistry.registerJobListener(job, generateBlackboardListener(engine, token));
67

    
68
			blackboardClientHandler.assign(job);
69

    
70
		} catch (final Exception e) {
71
			token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
72
			token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, "cannot prepare blackboard job: " + e);
73
			engine.complete(token, "failed");
74
			log.warn("cannot prepare blackboard job", e);
75
		}
76
	}
77

    
78
	abstract protected String getXqueryForServiceId(NodeToken token);
79
	abstract protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception;
80
	
81
	protected  BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
82
		return new BlackboardWorkflowJobListener(engine, token);
83
	}
84

    
85
}
(2-2/7)