1
|
package eu.dnetlib.msro.workflows.procs;
|
2
|
|
3
|
import java.util.concurrent.Executors;
|
4
|
import java.util.concurrent.TimeUnit;
|
5
|
|
6
|
import javax.annotation.PostConstruct;
|
7
|
|
8
|
import org.apache.commons.logging.Log;
|
9
|
import org.apache.commons.logging.LogFactory;
|
10
|
import org.springframework.beans.factory.annotation.Autowired;
|
11
|
import org.springframework.stereotype.Component;
|
12
|
|
13
|
import eu.dnetlib.enabling.common.Stoppable;
|
14
|
import eu.dnetlib.enabling.common.StoppableDetails;
|
15
|
import eu.dnetlib.msro.exceptions.MSROException;
|
16
|
import eu.dnetlib.msro.workflows.WorkflowsConstants;
|
17
|
|
18
|
@Component
|
19
|
public class ProcessExecutor implements Stoppable {
|
20
|
|
21
|
@Autowired
|
22
|
private ProcessEngine processEngine;
|
23
|
@Autowired
|
24
|
private ProcessRegistry processRegistry;
|
25
|
|
26
|
private boolean paused = false;
|
27
|
|
28
|
private static final Log log = LogFactory.getLog(ProcessExecutor.class);
|
29
|
|
30
|
@PostConstruct
|
31
|
public void init() {
|
32
|
|
33
|
Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
|
34
|
if (isPaused() || (processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE)) { return; }
|
35
|
|
36
|
final WorkflowProcess process = processRegistry.nextProcessToStart();
|
37
|
if (process != null) {
|
38
|
processEngine.startProcess(process);
|
39
|
} else {
|
40
|
log.debug("WorkflowProcess queue is empty");
|
41
|
}
|
42
|
}, 10, 10, TimeUnit.SECONDS);
|
43
|
}
|
44
|
|
45
|
public void execute(final WorkflowProcess proc) throws MSROException {
|
46
|
processRegistry.registerProcess(proc);
|
47
|
}
|
48
|
|
49
|
@Override
|
50
|
public void stop() {
|
51
|
paused = true;
|
52
|
}
|
53
|
|
54
|
@Override
|
55
|
public void resume() {
|
56
|
paused = false;
|
57
|
}
|
58
|
|
59
|
@Override
|
60
|
public StoppableDetails getStopDetails() {
|
61
|
final int count = processRegistry.countRunningWfs();
|
62
|
|
63
|
final StoppableDetails.StopStatus status =
|
64
|
isPaused() ? (count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING) : StoppableDetails.StopStatus.RUNNING;
|
65
|
|
66
|
return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
|
67
|
}
|
68
|
|
69
|
public boolean isPaused() {
|
70
|
return paused;
|
71
|
}
|
72
|
|
73
|
public void setPaused(final boolean paused) {
|
74
|
this.paused = paused;
|
75
|
}
|
76
|
}
|