Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes;
2

    
3
import java.util.concurrent.ExecutorService;
4
import java.util.concurrent.Executors;
5

    
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

    
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11

    
12
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
13

    
14
public abstract class AsyncJobNode extends SarasvatiJobNode {
15

    
16
	/**
17
	 * logger.
18
	 */
19
	private static final Log log = LogFactory.getLog(AsyncJobNode.class);
20

    
21
	private final ExecutorService executor = Executors.newCachedThreadPool();
22

    
23
	@Override
24
	public void execute(final Engine engine, final NodeToken token) {
25
		super.execute(engine, token);
26

    
27
		log.info("executing async node");
28

    
29
		executor.execute(new Runnable() {
30

    
31
			@Override
32
			public void run() {
33
				try {
34
					log.debug("START NODE: " + getBeanName());
35
					beforeStart(token);
36
					String arc = execute(token);
37
					beforeCompleted(token);
38
					log.debug("END NODE (SUCCESS): " + getBeanName());
39
					engine.complete(token, arc);
40
				} catch (Throwable e) {
41
					log.error("got exception while executing workflow node", e);
42
					log.debug("END NODE (FAILED): " + getBeanName());
43
					beforeFailed(token);
44
					token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
45
					token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, e.getMessage());
46
					engine.complete(token, "failed");
47
				}
48
			}
49
		});
50
	}
51

    
52
	abstract protected String execute(final NodeToken token) throws Exception;
53

    
54
	protected void beforeStart(final NodeToken token) {}
55

    
56
	protected void beforeCompleted(final NodeToken token) {}
57

    
58
	protected void beforeFailed(final NodeToken token) {}
59
}
(1-1/9)