Project

General

Profile

1
package eu.dnetlib.msro.workflows.procs;
2

    
3
import java.util.ArrayList;
4
import java.util.Collection;
5
import java.util.HashMap;
6
import java.util.Map;
7
import java.util.Set;
8
import java.util.concurrent.PriorityBlockingQueue;
9

    
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.springframework.beans.factory.annotation.Required;
13

    
14
import com.google.common.collect.BiMap;
15
import com.google.common.collect.HashBiMap;
16

    
17
import eu.dnetlib.miscutils.datetime.DateUtils;
18
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
19
import eu.dnetlib.rmi.manager.MSROException;
20

    
21
/**
22
 * Created by michele on 20/11/15.
23
 */
24
public class ProcessRegistry {
25

    
26
	private static final Log log = LogFactory.getLog(ProcessRegistry.class);
27
	private final BiMap<String, WorkflowProcess> procs = HashBiMap.create();
28
	private final Map<String, Collection<WorkflowProcess>> byOtherId = new HashMap<String, Collection<WorkflowProcess>>();
29

    
30
	private final PriorityBlockingQueue<WorkflowProcess> pendingProcs = new PriorityBlockingQueue<WorkflowProcess>();
31

    
32
	private int maxSize;
33

    
34
	synchronized public int countRunningWfs() {
35
		int count = 0;
36
		for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
37
			final WorkflowProcess proc = e.getValue();
38
			if (!proc.isTerminated()) {
39
				count++;
40
			}
41
		}
42
		return count;
43
	}
44

    
45
	public WorkflowProcess findProcess(final String procId) {
46
		return this.procs.get(procId);
47
	}
48

    
49
	public Set<WorkflowProcess> listProcesses() {
50
		return this.procs.values();
51
	}
52

    
53
	public Collection<WorkflowProcess> findProcsByOtherId(final String id) {
54
		synchronized (this) {
55
			final Collection<WorkflowProcess> res = this.byOtherId.get(id);
56
			return res != null ? res : new ArrayList<WorkflowProcess>();
57
		}
58
	}
59

    
60
	public String registerProcess(final WorkflowProcess process, final String... ids) throws MSROException {
61
		if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) {
62
			log.error("Already registerd process: " + process);
63
			throw new MSROException("Already registerd process: " + process);
64
		}
65

    
66
		if (this.procs.size() >= this.maxSize) {
67
			removeOldestProcess();
68
		}
69

    
70
		this.procs.put(process.getId(), process);
71
		for (final String id : ids) {
72
			synchronized (this) {
73
				if (!this.byOtherId.containsKey(id)) {
74
					this.byOtherId.put(id, new ArrayList<WorkflowProcess>());
75
				}
76
				this.byOtherId.get(id).add(process);
77
			}
78
		}
79

    
80
		synchronized (this.pendingProcs) {
81
			if (this.pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
82
				log.warn("Wf [" + process.getName() + "] not launched, Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
83
				throw new MSROException("Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
84
			}
85
			this.pendingProcs.put(process);
86

    
87
			log.info("WorkflowProcess [" + process + "] in queue, priority=" + process.getPriority());
88
		}
89

    
90
		return process.getId();
91
	}
92

    
93
	private void removeOldestProcess() {
94
		long oldDate = DateUtils.now();
95
		String oldId = null;
96

    
97
		for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
98
			final WorkflowProcess proc = e.getValue();
99

    
100
			if (proc.isTerminated()) {
101
				final long date = proc.getLastActivityDate();
102
				if (date < oldDate) {
103
					oldDate = date;
104
					oldId = e.getKey();
105
				}
106
			}
107
		}
108

    
109
		if (oldId != null) {
110
			unregisterProcess(oldId);
111
		}
112

    
113
	}
114

    
115
	public void unregisterProcess(final String procId) {
116
		synchronized (this) {
117
			final WorkflowProcess process = this.procs.remove(procId);
118
			if (process != null) {
119
				for (final Collection<WorkflowProcess> processes : this.byOtherId.values()) {
120
					processes.remove(process);
121
				}
122
			}
123
		}
124
	}
125

    
126
	public WorkflowProcess nextProcessToStart() {
127
		synchronized (this.pendingProcs) {
128
			return this.pendingProcs.poll();
129
		}
130
	}
131

    
132
	public int getMaxSize() {
133
		return this.maxSize;
134
	}
135

    
136
	@Required
137
	public void setMaxSize(final int maxSize) {
138
		this.maxSize = maxSize;
139
	}
140

    
141
}
(5-5/8)