Project

General

Profile

1
package eu.dnetlib.msro.workflows.procs;
2

    
3
import java.util.concurrent.Executors;
4
import java.util.concurrent.TimeUnit;
5

    
6
import javax.annotation.PostConstruct;
7

    
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Autowired;
11
import org.springframework.stereotype.Component;
12

    
13
import eu.dnetlib.enabling.common.Stoppable;
14
import eu.dnetlib.enabling.common.StoppableDetails;
15
import eu.dnetlib.msro.exceptions.MSROException;
16
import eu.dnetlib.msro.workflows.WorkflowsConstants;
17

    
18
@Component
19
public class ProcessExecutor implements Stoppable {
20

    
21
	@Autowired
22
	private ProcessEngine processEngine;
23
	@Autowired
24
	private ProcessRegistry processRegistry;
25

    
26
	private boolean paused = false;
27

    
28
	private static final Log log = LogFactory.getLog(ProcessExecutor.class);
29

    
30
	@PostConstruct
31
	public void init() {
32

    
33
		Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {
34
			if (isPaused() || (processRegistry.countRunningWfs() >= WorkflowsConstants.MAX_RUNNING_PROCS_SIZE)) { return; }
35

    
36
			final WorkflowProcess process = processRegistry.nextProcessToStart();
37
			if (process != null) {
38
				processEngine.startProcess(process);
39
			} else {
40
				log.debug("WorkflowProcess queue is empty");
41
			}
42
		}, 10, 10, TimeUnit.SECONDS);
43
	}
44

    
45
	public void execute(final WorkflowProcess proc) throws MSROException {
46
		processRegistry.registerProcess(proc);
47
	}
48

    
49
	@Override
50
	public void stop() {
51
		paused = true;
52
	}
53

    
54
	@Override
55
	public void resume() {
56
		paused = false;
57
	}
58

    
59
	@Override
60
	public StoppableDetails getStopDetails() {
61
		final int count = processRegistry.countRunningWfs();
62

    
63
		final StoppableDetails.StopStatus status =
64
				isPaused() ? (count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING) : StoppableDetails.StopStatus.RUNNING;
65

    
66
		return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
67
	}
68

    
69
	public boolean isPaused() {
70
		return paused;
71
	}
72

    
73
	public void setPaused(final boolean paused) {
74
		this.paused = paused;
75
	}
76
}
(4-4/8)