Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message;
2

    
3
import java.util.*;
4

    
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Required;
8

    
9
import eu.dnetlib.dhp.message.Message;
10
import eu.dnetlib.msro.rmi.MSROException;
11

    
12
public class DnetMessageManager {
13

    
14
	private static final Log log = LogFactory.getLog(DnetMessageManager.class);
15

    
16
	private final Map<String, Message> ongoingMessages = Collections.synchronizedMap(new LinkedHashMap<>());
17

    
18
	private final Map<String, Message> reportMessages = Collections.synchronizedMap(new LinkedHashMap<>());
19

    
20
	private long maxNumberOfMessages;
21

    
22
	public synchronized void registerMessage(final Message message) throws MSROException {
23
		log.debug(String.format("Received %s message", message.getMessageType()));
24

    
25
		if (Objects.isNull(message.getMessageType())) {
26
			throw new MSROException("missing message type in: " + message.toString());
27
		}
28
		switch (message.getMessageType()) {
29
			case ONGOING:
30
				ongoingMessages.put(message.getWorkflowId(), message);
31
				if (ongoingMessages.size() > maxNumberOfMessages) {
32
					ongoingMessages.remove(ongoingMessages.keySet().iterator().next());
33
				}
34
				break;
35
			case REPORT:
36
				reportMessages.put(message.getWorkflowId(), message);
37
				if (reportMessages.size() > maxNumberOfMessages) {
38
					reportMessages.remove(reportMessages.keySet().iterator().next());
39
				}
40
				break;
41
			default:
42
				throw new MSROException("message type not managed: " + message.getMessageType());
43
		}
44
	}
45

    
46
	public Message findOngoingMessage(final String workflowId) {
47
		return ongoingMessages.get(workflowId);
48
	}
49

    
50
	public Message findReportMessage(final String workflowId) {
51
		return reportMessages.get(workflowId);
52
	}
53

    
54
	protected Map<String, Message> getOngoingMessages() {
55
		return ongoingMessages;
56
	}
57

    
58
	protected Map<String, Message> getReportMessages() {
59
		return reportMessages;
60
	}
61

    
62
	public long getMaxNumberOfMessages() {
63
		return maxNumberOfMessages;
64
	}
65

    
66
	@Required
67
	public void setMaxNumberOfMessages(final long maxNumberOfMessages) {
68
		this.maxNumberOfMessages = maxNumberOfMessages;
69
	}
70
}
(1-1/2)