Project

General

Profile

1
package eu.dnetlib.msro.worker.nodes;
2

    
3
import javax.annotation.Resource;
4

    
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

    
9
import com.googlecode.sarasvati.Arc;
10
import com.googlecode.sarasvati.Engine;
11
import com.googlecode.sarasvati.NodeToken;
12
import com.googlecode.sarasvati.env.Env;
13

    
14
import eu.dnetlib.common.ifaces.BlackboardCallback;
15
import eu.dnetlib.common.services.locators.DnetServiceLocator;
16
import eu.dnetlib.enabling.blackboard.BlackboardDispatcher;
17
import eu.dnetlib.msro.worker.WorkflowConstants;
18
import eu.dnetlib.rmi.blackboard.AbstractBlackboardMessage;
19

    
20
public abstract class BlackboardJobNode<T> extends SarasvatiJobNode implements BlackboardCallback<T> {
21

    
22
	/**
23
	 * logger.
24
	 */
25
	private static final Log log = LogFactory.getLog(BlackboardJobNode.class);
26

    
27
	@Resource
28
	private DnetServiceLocator serviceLocator;
29

    
30
	@Resource
31
	private BlackboardDispatcher blackboardDispatcher;
32

    
33
	private Engine engine;
34

    
35
	private NodeToken token;
36

    
37
	@Override
38
	public void execute(final Engine engine, final NodeToken token) {
39
		super.execute(engine, token);
40

    
41
		this.engine = engine;
42
		this.token = token;
43

    
44
		log.info("executing blackboard node");
45

    
46
		try {
47
			token.getEnv().setAttribute(WorkflowConstants.BLACKBOARD_IS_BLACKBOARD, true);
48

    
49
			final String serviceId = obtainServiceId(token);
50
			if (StringUtils.isBlank(serviceId)) {
51
				token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true);
52
				final String msg = "cannot locate target service profile: " + serviceId;
53
				token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, msg);
54
				log.error(msg);
55
				engine.complete(token, "failed");
56
				return;
57
			}
58
			blackboardDispatcher.createDispatcher(serviceId, prepareMessage(token), this);
59
		} catch (final Throwable e) {
60
			token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true);
61
			token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, "cannot prepare blackboard job: " + e);
62
			engine.complete(token, "failed");
63
			log.error("cannot prepare blackboard job", e);
64
		}
65
	}
66

    
67
	abstract protected String obtainServiceId(final NodeToken token) throws Exception;
68

    
69
	abstract protected T prepareMessage(final NodeToken token) throws Exception;
70

    
71
	abstract protected void saveResponseInEnv(final T t, final Env env);
72

    
73
	@Override
74
	public void onSuccess(final T t) {
75
		log.debug("Blackboard workflow node DONE");
76
		saveResponseInEnv(t, token.getEnv());
77
		engine.complete(token, Arc.DEFAULT_ARC);
78
		engine.executeQueuedArcTokens(token.getProcess());
79
	}
80

    
81
	@Override
82
	public void onFail(final T t) {
83
		final String error = t instanceof AbstractBlackboardMessage ? ((AbstractBlackboardMessage) t).getError() : "Unknown Error";
84
		log.warn("Blackboard workflow node FAILED: " + error);
85
		saveResponseInEnv(t, token.getEnv());
86
		token.getEnv().setAttribute(WorkflowConstants.SYSTEM_HAS_FAILED, true);
87
		token.getEnv().setAttribute(WorkflowConstants.SYSTEM_ERROR, error);
88
		engine.complete(token, "failed");
89
	}
90

    
91
	@Override
92
	public void onOngoing(final T t) {
93
		getToken().getEnv().setAttribute(WorkflowConstants.BLACKBOARD_IS_GOING, true);
94
	}
95

    
96
	protected Engine getEngine() {
97
		return engine;
98
	}
99

    
100
	protected NodeToken getToken() {
101
		return token;
102
	}
103

    
104
	protected DnetServiceLocator getServiceLocator() {
105
		return serviceLocator;
106
	}
107

    
108
}
(2-2/9)