Project

General

Profile

« Previous | Next » 

Revision 41550

refactoring

View differences:

ProcessRegistry.java
1 1
package eu.dnetlib.msro.workflows.procs;
2 2

  
3
import java.util.*;
3
import java.util.ArrayList;
4
import java.util.Collection;
5
import java.util.HashMap;
6
import java.util.Map;
7
import java.util.Set;
4 8
import java.util.concurrent.PriorityBlockingQueue;
5 9

  
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.springframework.beans.factory.annotation.Required;
13

  
6 14
import com.google.common.collect.BiMap;
7 15
import com.google.common.collect.HashBiMap;
16

  
8 17
import eu.dnetlib.miscutils.datetime.DateUtils;
9 18
import eu.dnetlib.msro.rmi.MSROException;
10
import eu.dnetlib.msro.workflows.util.ProcessUtils;
11 19
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.springframework.beans.factory.annotation.Required;
15 20

  
16 21
/**
17 22
 * Created by michele on 20/11/15.
......
19 24
public class ProcessRegistry {
20 25

  
21 26
	private static final Log log = LogFactory.getLog(ProcessRegistry.class);
22
	private BiMap<String, WorkflowProcess> procs = HashBiMap.create();
23
	private Map<String, Collection<WorkflowProcess>> byOtherId = new HashMap<String, Collection<WorkflowProcess>>();
27
	private final BiMap<String, WorkflowProcess> procs = HashBiMap.create();
28
	private final Map<String, Collection<WorkflowProcess>> byOtherId = new HashMap<String, Collection<WorkflowProcess>>();
24 29

  
25
	private PriorityBlockingQueue<WorkflowProcess> pendingProcs = new PriorityBlockingQueue<WorkflowProcess>();
30
	private final PriorityBlockingQueue<WorkflowProcess> pendingProcs = new PriorityBlockingQueue<WorkflowProcess>();
26 31

  
27 32
	private int maxSize;
28 33

  
29 34
	synchronized public int countRunningWfs() {
30 35
		int count = 0;
31
		for (Map.Entry<String, WorkflowProcess> e : procs.entrySet()) {
36
		for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
32 37
			final WorkflowProcess proc = e.getValue();
33 38
			if (!proc.isTerminated()) {
34 39
				count++;
......
38 43
	}
39 44

  
40 45
	public WorkflowProcess findProcess(final String procId) {
41
		return procs.get(procId);
46
		return this.procs.get(procId);
42 47
	}
43 48

  
44 49
	public Set<WorkflowProcess> listProcesses() {
45
		return procs.values();
50
		return this.procs.values();
46 51
	}
47 52

  
48 53
	public Collection<WorkflowProcess> findProcsByOtherId(final String id) {
49 54
		synchronized (this) {
50
			final Collection<WorkflowProcess> res = byOtherId.get(id);
55
			final Collection<WorkflowProcess> res = this.byOtherId.get(id);
51 56
			return res != null ? res : new ArrayList<WorkflowProcess>();
52 57
		}
53 58
	}
54 59

  
55 60
	public String registerProcess(final WorkflowProcess process, final String... ids) throws MSROException {
56
		if (procs.containsValue(process) || procs.containsKey(process.getId())) {
61
		if (this.procs.containsValue(process) || this.procs.containsKey(process.getId())) {
57 62
			log.error("Already registerd process: " + process);
58 63
			throw new MSROException("Already registerd process: " + process);
59 64
		}
60 65

  
61
		if (procs.size() >= maxSize) {
66
		if (this.procs.size() >= this.maxSize) {
62 67
			removeOldestProcess();
63 68
		}
64 69

  
65
		procs.put(process.getId(), process);
66
		for (String id : ids) {
70
		this.procs.put(process.getId(), process);
71
		for (final String id : ids) {
67 72
			synchronized (this) {
68
				if (!byOtherId.containsKey(id)) {
69
					byOtherId.put(id, new ArrayList<WorkflowProcess>());
73
				if (!this.byOtherId.containsKey(id)) {
74
					this.byOtherId.put(id, new ArrayList<WorkflowProcess>());
70 75
				}
71
				byOtherId.get(id).add(process);
76
				this.byOtherId.get(id).add(process);
72 77
			}
73 78
		}
74 79

  
75
		synchronized (pendingProcs) {
76
			if (pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
80
		synchronized (this.pendingProcs) {
81
			if (this.pendingProcs.size() > WorkflowsConstants.MAX_PENDING_PROCS_SIZE) {
77 82
				log.warn("Wf [" + process.getName() + "] not launched, Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
78 83
				throw new MSROException("Max number of pending procs reached: " + WorkflowsConstants.MAX_PENDING_PROCS_SIZE);
79 84
			}
80
			pendingProcs.put(process);
85
			this.pendingProcs.put(process);
81 86

  
82 87
			log.info("WorkflowProcess [" + process + "] in queue, priority=" + process.getPriority());
83 88
		}
......
89 94
		long oldDate = DateUtils.now();
90 95
		String oldId = null;
91 96

  
92
		for (Map.Entry<String, WorkflowProcess> e : procs.entrySet()) {
97
		for (final Map.Entry<String, WorkflowProcess> e : this.procs.entrySet()) {
93 98
			final WorkflowProcess proc = e.getValue();
94 99

  
95 100
			if (proc.isTerminated()) {
96
				final long date = ProcessUtils.calculateLastActivityDate(proc);
101
				final long date = proc.getLastActivityDate();
97 102
				if (date < oldDate) {
98 103
					oldDate = date;
99 104
					oldId = e.getKey();
......
109 114

  
110 115
	public void unregisterProcess(final String procId) {
111 116
		synchronized (this) {
112
			final WorkflowProcess process = procs.remove(procId);
117
			final WorkflowProcess process = this.procs.remove(procId);
113 118
			if (process != null) {
114
				for (final Collection<WorkflowProcess> processes : byOtherId.values()) {
119
				for (final Collection<WorkflowProcess> processes : this.byOtherId.values()) {
115 120
					processes.remove(process);
116 121
				}
117 122
			}
......
119 124
	}
120 125

  
121 126
	public WorkflowProcess nextProcessToStart() {
122
		synchronized (pendingProcs) {
123
			return pendingProcs.poll();
127
		synchronized (this.pendingProcs) {
128
			return this.pendingProcs.poll();
124 129
		}
125 130
	}
126 131

  
127 132
	public int getMaxSize() {
128
		return maxSize;
133
		return this.maxSize;
129 134
	}
130 135

  
131 136
	@Required

Also available in: Unified diff