Revision 60423
Added by Claudio Atzori about 3 years ago
DnetMessageManager.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message; |
2 | 2 |
|
3 |
import java.util.Collections; |
|
4 |
import java.util.LinkedHashMap; |
|
5 |
import java.util.Map; |
|
3 |
import java.util.*; |
|
6 | 4 |
|
7 | 5 |
import org.apache.commons.logging.Log; |
8 | 6 |
import org.apache.commons.logging.LogFactory; |
... | ... | |
15 | 13 |
|
16 | 14 |
private static final Log log = LogFactory.getLog(DnetMessageManager.class); |
17 | 15 |
|
18 |
private final Map<String, Message> messages = Collections.synchronizedMap(new LinkedHashMap<>());
|
|
16 |
private final Map<String, Message> ongoingMessages = Collections.synchronizedMap(new LinkedHashMap<>());
|
|
19 | 17 |
|
18 |
private final Map<String, Message> reportMessages = Collections.synchronizedMap(new LinkedHashMap<>()); |
|
19 |
|
|
20 | 20 |
private long maxNumberOfMessages; |
21 | 21 |
|
22 | 22 |
public synchronized void registerMessage(final Message message) throws MSROException { |
23 | 23 |
log.debug("Received a ONGOING message"); |
24 |
messages.put(message.getWorkflowId(), message); |
|
25 |
if (messages.size() > maxNumberOfMessages) {
|
|
26 |
messages.remove(messages.keySet().iterator().next());
|
|
24 |
|
|
25 |
if (Objects.isNull(message.getMessageType())) {
|
|
26 |
throw new MSROException("missing message type in: " + message.toString());
|
|
27 | 27 |
} |
28 |
switch (message.getMessageType()) { |
|
29 |
case ONGOING: |
|
30 |
ongoingMessages.put(message.getWorkflowId(), message); |
|
31 |
if (ongoingMessages.size() > maxNumberOfMessages) { |
|
32 |
ongoingMessages.remove(ongoingMessages.keySet().iterator().next()); |
|
33 |
} |
|
34 |
break; |
|
35 |
case REPORT: |
|
36 |
reportMessages.put(message.getWorkflowId(), message); |
|
37 |
if (reportMessages.size() > maxNumberOfMessages) { |
|
38 |
reportMessages.remove(reportMessages.keySet().iterator().next()); |
|
39 |
} |
|
40 |
break; |
|
41 |
default: |
|
42 |
throw new MSROException("message type not managed: " + message.getMessageType()); |
|
43 |
} |
|
28 | 44 |
} |
29 | 45 |
|
30 |
public Message findMessage(final String workflowId) { |
|
31 |
return messages.get(workflowId);
|
|
46 |
public Message findOngoingMessage(final String workflowId) {
|
|
47 |
return ongoingMessages.get(workflowId);
|
|
32 | 48 |
} |
33 | 49 |
|
34 |
protected Map<String, Message> getMessages() {
|
|
35 |
return messages;
|
|
50 |
public Message findReportMessage(final String workflowId) {
|
|
51 |
return reportMessages.get(workflowId);
|
|
36 | 52 |
} |
37 | 53 |
|
54 |
protected Map<String, Message> getOngoingMessages() { |
|
55 |
return ongoingMessages; |
|
56 |
} |
|
57 |
|
|
58 |
protected Map<String, Message> getReportMessages() { |
|
59 |
return reportMessages; |
|
60 |
} |
|
61 |
|
|
38 | 62 |
public long getMaxNumberOfMessages() { |
39 | 63 |
return maxNumberOfMessages; |
40 | 64 |
} |
Also available in: Unified diff
updated hadoop aggregation workflow reporting