Project

General

Profile

1
package eu.dnetlib.msro.workflows.procs;
2

    
3
import java.util.ArrayList;
4
import java.util.Collection;
5
import java.util.HashMap;
6
import java.util.List;
7
import java.util.Map;
8
import java.util.Set;
9
import java.util.concurrent.PriorityBlockingQueue;
10
import java.util.stream.Collectors;
11

    
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.springframework.beans.factory.annotation.Autowired;
15
import org.springframework.stereotype.Component;
16

    
17
import com.google.common.collect.BiMap;
18
import com.google.common.collect.HashBiMap;
19

    
20
import eu.dnetlib.miscutils.datetime.DateUtils;
21
import eu.dnetlib.WorkerProperties;
22
import eu.dnetlib.msro.exceptions.MSROException;
23
import eu.dnetlib.msro.workflows.WorkflowsConstants;
24

    
25
/**
26
 * Created by michele on 20/11/15.
27
 */
28
@Component
29
public class ProcessRegistry {
30

    
31
	private static final Log log = LogFactory.getLog(ProcessRegistry.class);
32
	private final BiMap<String, WorkflowProcess> procs = HashBiMap.create();
33
	private final Map<String, Collection<WorkflowProcess>> byOtherId = new HashMap<String, Collection<WorkflowProcess>>();
34

    
35
	private final PriorityBlockingQueue<WorkflowProcess> pendingProcs = new PriorityBlockingQueue<WorkflowProcess>();
36

    
37
	@Autowired
38
	private WorkerProperties props;
39

    
40
	synchronized public int countRunningWfs() {
41
		int count = 0;
42
		for (final Map.Entry<String, WorkflowProcess> e : procs.entrySet()) {
43
			final WorkflowProcess proc = e.getValue();
44
			if (!proc.isTerminated()) {
45
				count++;
46
			}
47
		}
48
		return count;
49
	}
50

    
51
	public int countQueuedWfs() {
52
		return pendingProcs.size();
53
	}
54

    
55
	public WorkflowProcess findProcess(final String procId) {
56
		return procs.get(procId);
57
	}
58

    
59
	public Set<WorkflowProcess> listProcesses() {
60
		return procs.values();
61
	}
62

    
63
	public List<WorkflowProcess> listTerminated() {
64
		return procs.values().stream().filter(p -> p.isTerminated()).sorted().collect(Collectors.toList());
65
	}
66

    
67
	public Collection<WorkflowProcess> findProcsByOtherId(final String id) {
68
		synchronized (this) {
69
			final Collection<WorkflowProcess> res = byOtherId.get(id);
70
			return res != null ? res : new ArrayList<WorkflowProcess>();
71
		}
72
	}
73

    
74
	public String registerProcess(final WorkflowProcess process) throws MSROException {
75
		if (procs.containsValue(process) || procs.containsKey(process.getId())) {
76
			log.error("Already registerd process: " + process);
77
			throw new MSROException("Already registerd process: " + process);
78
		}
79

    
80
		if (procs.size() >= props.getMaxSize()) {
81
			removeOldestProcess();
82
		}
83

    
84
		procs.put(process.getId(), process);
85

    
86
		synchronized (this) {
87
			if (!byOtherId.containsKey(process.getProfileId())) {
88
				byOtherId.put(process.getProfileId(), new ArrayList<WorkflowProcess>());
89
			}
90
			byOtherId.get(process.getProfileId()).add(process);
91
		}
92

    
93
		synchronized (pendingProcs) {
94
			if (pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
95
				log.warn("Wf [" + process.getName() + "] not launched, Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
96
				throw new MSROException("Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
97
			}
98
			pendingProcs.put(process);
99

    
100
			log.info("WorkflowProcess [" + process + "] in queue, priority=" + process.getPriority());
101
		}
102

    
103
		return process.getId();
104
	}
105

    
106
	private void removeOldestProcess() {
107
		long oldDate = DateUtils.now();
108
		String oldId = null;
109

    
110
		for (final Map.Entry<String, WorkflowProcess> e : procs.entrySet()) {
111
			final WorkflowProcess proc = e.getValue();
112

    
113
			if (proc.isTerminated()) {
114
				final long date = proc.getLastActivityDate();
115
				if (date < oldDate) {
116
					oldDate = date;
117
					oldId = e.getKey();
118
				}
119
			}
120
		}
121

    
122
		if (oldId != null) {
123
			unregisterProcess(oldId);
124
		}
125

    
126
	}
127

    
128
	public void unregisterProcess(final String procId) {
129
		synchronized (this) {
130
			final WorkflowProcess process = procs.remove(procId);
131
			if (process != null) {
132
				for (final Collection<WorkflowProcess> processes : byOtherId.values()) {
133
					processes.remove(process);
134
				}
135
			}
136
		}
137
	}
138

    
139
	public WorkflowProcess nextProcessToStart() {
140
		synchronized (pendingProcs) {
141
			return pendingProcs.poll();
142
		}
143
	}
144

    
145
}
(5-5/8)