Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.blackboard;
2

    
3
import java.util.Map;
4
import java.util.Map.Entry;
5

    
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

    
9
import com.googlecode.sarasvati.Arc;
10
import com.googlecode.sarasvati.Engine;
11
import com.googlecode.sarasvati.NodeToken;
12
import com.googlecode.sarasvati.env.Env;
13

    
14
import eu.dnetlib.enabling.tools.blackboard.AbstractBlackboardJobListener;
15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
16
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
17

    
18
public class BlackboardWorkflowJobListener extends AbstractBlackboardJobListener {
19

    
20
	/**
21
	 * logger.
22
	 */
23
	private static final Log log = LogFactory.getLog(BlackboardWorkflowJobListener.class);
24

    
25
	/**
26
	 * workflow engine.
27
	 */
28
	private Engine engine;
29

    
30
	/**
31
	 * workflow node token.
32
	 */
33
	private NodeToken token;
34

    
35
	@Override
36
	protected void onDone(final BlackboardJob job) {
37
		log.debug("Blackboard workflow node DONE");
38
		complete(job, Arc.DEFAULT_ARC);
39
	}
40

    
41
	public BlackboardWorkflowJobListener(final Engine engine, final NodeToken token) {
42
		super();
43
		this.engine = engine;
44
		this.token = token;
45
	}
46

    
47
	@Override
48
	final public void processJob(final BlackboardJob job) {
49
		token.getEnv().setTransientAttribute(WorkflowsConstants.BLACKBOARD_JOB, job);
50
		super.processJob(job);
51
	}
52

    
53
	@Override
54
	protected void onFailed(final BlackboardJob job) {
55
		log.warn("Blackboard workflow node FAILED: " + job.getError());
56
		token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
57
		token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
58
		complete(job, "failed");
59
	}
60

    
61
	final protected void complete(final BlackboardJob job, final String arc) {
62
		final Env env = token.getEnv();
63

    
64
		populateEnv(env, job.getParameters());
65

    
66
		engine.complete(token, arc);
67
		engine.executeQueuedArcTokens(token.getProcess());
68
	}
69

    
70
	protected void populateEnv(final Env env, Map<String, String> responseParams) {
71
		for (Entry<String, String> entry : responseParams.entrySet()) {
72
			env.setAttribute(WorkflowsConstants.BLACKBOARD_PARAM_PREFIX + entry.getKey(), entry.getValue());
73
		}
74
	}
75

    
76
	@Override
77
	protected void onOngoing(final BlackboardJob job) {
78
		token.getEnv().setAttribute(WorkflowsConstants.BLACKBOARD_IS_GOING, true);
79
	}
80

    
81
	public Engine getEngine() {
82
		return engine;
83
	}
84

    
85
	public void setEngine(final Engine engine) {
86
		this.engine = engine;
87
	}
88

    
89
	public NodeToken getToken() {
90
		return token;
91
	}
92

    
93
	public void setToken(final NodeToken token) {
94
		this.token = token;
95
	}
96

    
97
}
(2-2/3)