Project

General

Profile

1
package eu.dnetlib.msro.workflows.procs;
2

    
3
import java.util.concurrent.Executors;
4
import java.util.concurrent.TimeUnit;
5

    
6
import javax.annotation.PostConstruct;
7

    
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Autowired;
11
import org.springframework.stereotype.Component;
12

    
13
import eu.dnetlib.clients.is.InformationServiceClient;
14
import eu.dnetlib.enabling.common.Stoppable;
15
import eu.dnetlib.enabling.common.StoppableDetails;
16
import eu.dnetlib.msro.exceptions.MSROException;
17
import eu.dnetlib.msro.workflows.WorkflowInstance;
18
import eu.dnetlib.msro.workflows.WorkflowsConstants;
19
import eu.dnetlib.services.async.AsyncServerCallback;
20

    
21
@Component
22
public class WorkflowExecutor implements Stoppable {
23

    
24
	private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
25

    
26
	@Autowired
27
	private InformationServiceClient isLookup;
28
	@Autowired
29
	private ProcessRegistry processRegistry;
30
	@Autowired
31
	private ProcessFactory processFactory;
32
	@Autowired
33
	private ProcessEngine processEngine;
34

    
35
	private boolean paused = false;
36

    
37
	@PostConstruct
38
	public void init() {
39

    
40
		Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
41
			if (isPaused() || (processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE)) { return; }
42

    
43
			final WorkflowProcess process = processRegistry.nextProcessToStart();
44
			if (process != null) {
45
				processEngine.startProcess(process);
46
			} else {
47
				log.debug("WorkflowProcess queue is empty");
48
			}
49
		}, 10, 10, TimeUnit.SECONDS);
50
	}
51

    
52
	public String startWorkflow(final WorkflowInstance wf, final AsyncServerCallback callback) throws Exception {
53

    
54
		if (isPaused()) {
55
			log.warn("Wf " + wf.getProfileId() + " not launched, because WorkflowExecutor is preparing for shutdown");
56
			throw new MSROException("WorkflowExecutor is preparing for shutdown");
57
		}
58

    
59
		try {
60
			final WorkflowProcess process = processFactory.newProcess(wf, callback);
61
			return processRegistry.registerProcess(process);
62
		} catch (final Exception e) {
63
			log.error("Error ppreparing workflow: " + wf.getProfileId(), e);
64
			throw new MSROException("Error parsing workflow");
65
		}
66
	}
67

    
68
	@Override
69
	public void stop() {
70
		paused = true;
71
	}
72

    
73
	@Override
74
	public void resume() {
75
		paused = false;
76
	}
77

    
78
	@Override
79
	public StoppableDetails getStopDetails() {
80
		final int count = processRegistry.countRunningWfs();
81

    
82
		final StoppableDetails.StopStatus status =
83
				isPaused() ? (count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING) : StoppableDetails.StopStatus.RUNNING;
84

    
85
		return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
86
	}
87

    
88
	public boolean isPaused() {
89
		return paused;
90
	}
91

    
92
	public void setPaused(final boolean paused) {
93
		this.paused = paused;
94
	}
95

    
96
}
(7-7/8)