Revision 41550
Added by Michele Artini about 8 years ago
ProcessRegistry.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.workflows.procs; |
2 | 2 |
|
3 |
import java.util.*; |
|
3 |
import java.util.ArrayList; |
|
4 |
import java.util.Collection; |
|
5 |
import java.util.HashMap; |
|
6 |
import java.util.Map; |
|
7 |
import java.util.Set; |
|
4 | 8 |
import java.util.concurrent.PriorityBlockingQueue; |
5 | 9 |
|
10 |
import org.apache.commons.logging.Log; |
|
11 |
import org.apache.commons.logging.LogFactory; |
|
12 |
import org.springframework.beans.factory.annotation.Required; |
|
13 |
|
|
6 | 14 |
import com.google.common.collect.BiMap; |
7 | 15 |
import com.google.common.collect.HashBiMap; |
16 |
|
|
8 | 17 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
9 | 18 |
import eu.dnetlib.msro.rmi.MSROException; |
10 |
import eu.dnetlib.msro.workflows.util.ProcessUtils; |
|
11 | 19 |
import eu.dnetlib.msro.workflows.util.WorkflowsConstants; |
12 |
import org.apache.commons.logging.Log; |
|
13 |
import org.apache.commons.logging.LogFactory; |
|
14 |
import org.springframework.beans.factory.annotation.Required; |
|
15 | 20 |
|
16 | 21 |
/** |
17 | 22 |
* Created by michele on 20/11/15. |
... | ... | |
19 | 24 |
public class ProcessRegistry { |
20 | 25 |
|
21 | 26 |
private static final Log log = LogFactory.getLog(ProcessRegistry.class); |
22 |
private BiMap<String, WorkflowProcess> procs = HashBiMap.create(); |
|
23 |
private Map<String, Collection<WorkflowProcess>> byOtherId = new HashMap<String, Collection<WorkflowProcess>>(); |
|
27 |
private final BiMap<String, WorkflowProcess> procs = HashBiMap.create();
|
|
28 |
private final Map<String, Collection<WorkflowProcess>> byOtherId = new HashMap<String, Collection<WorkflowProcess>>();
|
|
24 | 29 |
|
25 |
private PriorityBlockingQueue<WorkflowProcess> pendingProcs = new PriorityBlockingQueue<WorkflowProcess>(); |
|
30 |
private final PriorityBlockingQueue<WorkflowProcess> pendingProcs = new PriorityBlockingQueue<WorkflowProcess>();
|
|
26 | 31 |
|
27 | 32 |
private int maxSize; |
28 | 33 |
|
29 | 34 |
synchronized public int countRunningWfs() { |
30 | 35 |
int count = 0; |
31 |
for (Map.Entry<String, WorkflowProcess> e : procs.entrySet()) {
|
|
36 |
for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
|
|
32 | 37 |
final WorkflowProcess proc = e.getValue(); |
33 | 38 |
if (!proc.isTerminated()) { |
34 | 39 |
count++; |
... | ... | |
38 | 43 |
} |
39 | 44 |
|
40 | 45 |
public WorkflowProcess findProcess(final String procId) { |
41 |
return procs.get(procId); |
|
46 |
return this.procs.get(procId);
|
|
42 | 47 |
} |
43 | 48 |
|
44 | 49 |
public Set<WorkflowProcess> listProcesses() { |
45 |
return procs.values(); |
|
50 |
return this.procs.values();
|
|
46 | 51 |
} |
47 | 52 |
|
48 | 53 |
public Collection<WorkflowProcess> findProcsByOtherId(final String id) { |
49 | 54 |
synchronized (this) { |
50 |
final Collection<WorkflowProcess> res = byOtherId.get(id); |
|
55 |
final Collection<WorkflowProcess> res = this.byOtherId.get(id);
|
|
51 | 56 |
return res != null ? res : new ArrayList<WorkflowProcess>(); |
52 | 57 |
} |
53 | 58 |
} |
54 | 59 |
|
55 | 60 |
public String registerProcess(final WorkflowProcess process, final String... ids) throws MSROException { |
56 |
if (procs.containsValue(process) || procs.containsKey(process.getId())) {
|
|
61 |
if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) {
|
|
57 | 62 |
log.error("Already registerd process: " + process); |
58 | 63 |
throw new MSROException("Already registerd process: " + process); |
59 | 64 |
} |
60 | 65 |
|
61 |
if (procs.size() >= maxSize) {
|
|
66 |
if (this.procs.size() >= this.maxSize) {
|
|
62 | 67 |
removeOldestProcess(); |
63 | 68 |
} |
64 | 69 |
|
65 |
procs.put(process.getId(), process); |
|
66 |
for (String id : ids) { |
|
70 |
this.procs.put(process.getId(), process);
|
|
71 |
for (final String id : ids) {
|
|
67 | 72 |
synchronized (this) { |
68 |
if (!byOtherId.containsKey(id)) { |
|
69 |
byOtherId.put(id, new ArrayList<WorkflowProcess>()); |
|
73 |
if (!this.byOtherId.containsKey(id)) {
|
|
74 |
this.byOtherId.put(id, new ArrayList<WorkflowProcess>());
|
|
70 | 75 |
} |
71 |
byOtherId.get(id).add(process); |
|
76 |
this.byOtherId.get(id).add(process);
|
|
72 | 77 |
} |
73 | 78 |
} |
74 | 79 |
|
75 |
synchronized (pendingProcs) { |
|
76 |
if (pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) { |
|
80 |
synchronized (this.pendingProcs) {
|
|
81 |
if (this.pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
|
|
77 | 82 |
log.warn("Wf [" + process.getName() + "] not launched, Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE); |
78 | 83 |
throw new MSROException("Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE); |
79 | 84 |
} |
80 |
pendingProcs.put(process); |
|
85 |
this.pendingProcs.put(process);
|
|
81 | 86 |
|
82 | 87 |
log.info("WorkflowProcess [" + process + "] in queue, priority=" + process.getPriority()); |
83 | 88 |
} |
... | ... | |
89 | 94 |
long oldDate = DateUtils.now(); |
90 | 95 |
String oldId = null; |
91 | 96 |
|
92 |
for (Map.Entry<String, WorkflowProcess> e : procs.entrySet()) {
|
|
97 |
for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
|
|
93 | 98 |
final WorkflowProcess proc = e.getValue(); |
94 | 99 |
|
95 | 100 |
if (proc.isTerminated()) { |
96 |
final long date = ProcessUtils.calculateLastActivityDate(proc);
|
|
101 |
final long date = proc.getLastActivityDate();
|
|
97 | 102 |
if (date < oldDate) { |
98 | 103 |
oldDate = date; |
99 | 104 |
oldId = e.getKey(); |
... | ... | |
109 | 114 |
|
110 | 115 |
public void unregisterProcess(final String procId) { |
111 | 116 |
synchronized (this) { |
112 |
final WorkflowProcess process = procs.remove(procId); |
|
117 |
final WorkflowProcess process = this.procs.remove(procId);
|
|
113 | 118 |
if (process != null) { |
114 |
for (final Collection<WorkflowProcess> processes : byOtherId.values()) { |
|
119 |
for (final Collection<WorkflowProcess> processes : this.byOtherId.values()) {
|
|
115 | 120 |
processes.remove(process); |
116 | 121 |
} |
117 | 122 |
} |
... | ... | |
119 | 124 |
} |
120 | 125 |
|
121 | 126 |
public WorkflowProcess nextProcessToStart() { |
122 |
synchronized (pendingProcs) { |
|
123 |
return pendingProcs.poll(); |
|
127 |
synchronized (this.pendingProcs) {
|
|
128 |
return this.pendingProcs.poll();
|
|
124 | 129 |
} |
125 | 130 |
} |
126 | 131 |
|
127 | 132 |
public int getMaxSize() { |
128 |
return maxSize; |
|
133 |
return this.maxSize;
|
|
129 | 134 |
} |
130 | 135 |
|
131 | 136 |
@Required |
Also available in: Unified diff
refactoring