Project

General

Profile

« Previous | Next » 

Revision 60357

Removed MessageType from Message

View differences:

DnetMessageManager.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message;
2 2

  
3
import java.util.ArrayList;
4 3
import java.util.Collections;
5 4
import java.util.LinkedHashMap;
6
import java.util.List;
7 5
import java.util.Map;
8 6

  
9 7
import org.apache.commons.logging.Log;
10 8
import org.apache.commons.logging.LogFactory;
11 9
import org.springframework.beans.factory.annotation.Required;
12 10

  
13
import eu.dnetlib.message.Message;
14
import eu.dnetlib.message.MessageType;
11
import eu.dnetlib.dhp.message.Message;
15 12
import eu.dnetlib.msro.rmi.MSROException;
16 13

  
17 14
public class DnetMessageManager {
18 15

  
19 16
	private static final Log log = LogFactory.getLog(DnetMessageManager.class);
20 17

  
21
	private final Map<String, Message> onGonginMessages = Collections.synchronizedMap(new LinkedHashMap<>());
18
	private final Map<String, Message> messages = Collections.synchronizedMap(new LinkedHashMap<>());
22 19

  
23
	private final Map<String, List<Message>> reports = Collections.synchronizedMap(new LinkedHashMap<>());
20
	private long maxNumberOfMessages;
24 21

  
25
	private long maxNumberOfOnGoingMessages;
26

  
27
	private long maxNumberOfReports;
28

  
29 22
	public synchronized void registerMessage(final Message message) throws MSROException {
30
		if (message.getType() == MessageType.ONGOING) {
31
			log.debug("Received a ONGOING message");
32
			onGonginMessages.put(message.getWorkflowId(), message);
33
			if (onGonginMessages.size() > maxNumberOfOnGoingMessages) {
34
				onGonginMessages.remove(onGonginMessages.keySet().iterator().next());
35
			}
36
		} else if (message.getType() == MessageType.REPORT) {
37
			log.debug("Received a REPORT message");
38
			if (!reports.containsKey(message.getWorkflowId())) {
39
				reports.put(message.getWorkflowId(), new ArrayList<>());
40
			}
41
			reports.get(message.getWorkflowId()).add(message);
42
			if (reports.size() > maxNumberOfReports) {
43
				reports.remove(reports.keySet().iterator().next());
44
			}
45
		} else {
46
			throw new MSROException("Invalid message type: " + message.getType());
23
		log.debug("Received a ONGOING message");
24
		messages.put(message.getWorkflowId(), message);
25
		if (messages.size() > maxNumberOfMessages) {
26
			messages.remove(messages.keySet().iterator().next());
47 27
		}
48 28
	}
49 29

  
50
	public List<Message> getReports(final String workflowId, final boolean delete) {
51
		if (!reports.containsKey(workflowId)) {
52
			return new ArrayList<>();
53
		} else if (delete) {
54
			return reports.remove(workflowId);
55
		} else {
56
			return reports.get(workflowId);
57
		}
30
	public Message findMessage(final String workflowId) {
31
		return messages.get(workflowId);
58 32
	}
59 33

  
60
	public Message getOnGoingMessage(final String workflowId) {
61
		return onGonginMessages.get(workflowId);
34
	protected Map<String, Message> getMessages() {
35
		return messages;
62 36
	}
63 37

  
64
	public long getMaxNumberOfOnGoingMessages() {
65
		return maxNumberOfOnGoingMessages;
38
	public long getMaxNumberOfMessages() {
39
		return maxNumberOfMessages;
66 40
	}
67 41

  
68 42
	@Required
69
	public void setMaxNumberOfOnGoingMessages(final long maxNumberOfOnGoingMessages) {
70
		this.maxNumberOfOnGoingMessages = maxNumberOfOnGoingMessages;
43
	public void setMaxNumberOfMessages(final long maxNumberOfMessages) {
44
		this.maxNumberOfMessages = maxNumberOfMessages;
71 45
	}
72

  
73
	public long getMaxNumberOfReports() {
74
		return maxNumberOfReports;
75
	}
76

  
77
	@Required
78
	public void setMaxNumberOfReports(final long maxNumberOfReports) {
79
		this.maxNumberOfReports = maxNumberOfReports;
80
	}
81

  
82
	protected Map<String, Message> getOnGonginMessages() {
83
		return onGonginMessages;
84
	}
85

  
86
	protected Map<String, List<Message>> getReports() {
87
		return reports;
88
	}
89 46
}

Also available in: Unified diff