Project

General

Profile

« Previous | Next » 

Revision 60351

implematation of dhp messages with a MVC controller

View differences:

DnetMessageManager.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message;
2 2

  
3
import java.util.ArrayList;
4
import java.util.Collections;
5
import java.util.LinkedHashMap;
6
import java.util.List;
7
import java.util.Map;
3 8

  
4
import eu.dnetlib.message.Message;
5
import eu.dnetlib.message.MessageManager;
6
import eu.dnetlib.message.MessageType;
7
import org.apache.commons.lang3.StringUtils;
8 9
import org.apache.commons.logging.Log;
9 10
import org.apache.commons.logging.LogFactory;
10 11
import org.springframework.beans.factory.annotation.Required;
11
import org.springframework.beans.factory.annotation.Value;
12 12

  
13
import java.util.ArrayList;
14
import java.util.HashMap;
15
import java.util.List;
16
import java.util.Map;
17
import java.util.concurrent.LinkedBlockingQueue;
13
import eu.dnetlib.message.Message;
14
import eu.dnetlib.message.MessageType;
15
import eu.dnetlib.msro.rmi.MSROException;
18 16

  
19 17
public class DnetMessageManager {
20
    private static final Log log = LogFactory.getLog(DnetMessageManager.class);
21 18

  
22
    private MessageManager manager;
19
	private static final Log log = LogFactory.getLog(DnetMessageManager.class);
23 20

  
24
    private LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
21
	private final Map<String, Message> onGonginMessages = Collections.synchronizedMap(new LinkedHashMap<>());
25 22

  
23
	private final Map<String, List<Message>> reports = Collections.synchronizedMap(new LinkedHashMap<>());
26 24

  
27
    private final  Map<String, Message> onGonginMessages = new HashMap<>();
25
	private long maxNumberOfOnGoingMessages;
28 26

  
29
    private final Map<String, List<Message>> reportMessages = new HashMap<>();
27
	private long maxNumberOfReports;
30 28

  
31
    private String messageQueueServer;
29
	public synchronized void registerMessage(final Message message) throws MSROException {
30
		if (message.getType() == MessageType.ONGOING) {
31
			log.debug("Received a ONGOING message");
32
			onGonginMessages.put(message.getWorkflowId(), message);
33
			if (onGonginMessages.size() > maxNumberOfOnGoingMessages) {
34
				onGonginMessages.remove(onGonginMessages.keySet().iterator().next());
35
			}
36
		} else if (message.getType() == MessageType.REPORT) {
37
			log.debug("Received a REPORT message");
38
			if (!reports.containsKey(message.getWorkflowId())) {
39
				reports.put(message.getWorkflowId(), new ArrayList<>());
40
			}
41
			reports.get(message.getWorkflowId()).add(message);
42
			if (reports.size() > maxNumberOfReports) {
43
				reports.remove(reports.keySet().iterator().next());
44
			}
45
		} else {
46
			throw new MSROException("Invalid message type: " + message.getType());
47
		}
48
	}
32 49

  
33
    private String username;
50
	public List<Message> getReports(final String workflowId, final boolean delete) {
51
		if (!reports.containsKey(workflowId)) {
52
			return new ArrayList<>();
53
		} else if (delete) {
54
			return reports.remove(workflowId);
55
		} else {
56
			return reports.get(workflowId);
57
		}
58
	}
34 59

  
35
    private String password;
60
	public Message getOnGoingMessage(final String workflowId) {
61
		return onGonginMessages.get(workflowId);
62
	}
36 63

  
37
    @Value("${dnet.openaire.messageManager.ongoingMessageQueueName}")
38
    private String onGoingMessageQueue;
64
	public long getMaxNumberOfOnGoingMessages() {
65
		return maxNumberOfOnGoingMessages;
66
	}
39 67

  
40
    @Value("${dnet.openaire.messageManager.reportMessageQueueName}")
41
    private String reportMessageQueue;
68
	@Required
69
	public void setMaxNumberOfOnGoingMessages(final long maxNumberOfOnGoingMessages) {
70
		this.maxNumberOfOnGoingMessages = maxNumberOfOnGoingMessages;
71
	}
42 72

  
73
	public long getMaxNumberOfReports() {
74
		return maxNumberOfReports;
75
	}
43 76

  
44
    public DnetMessageManager() {
77
	@Required
78
	public void setMaxNumberOfReports(final long maxNumberOfReports) {
79
		this.maxNumberOfReports = maxNumberOfReports;
80
	}
45 81

  
46
    }
82
	protected Map<String, Message> getOnGonginMessages() {
83
		return onGonginMessages;
84
	}
47 85

  
48

  
49
    private String createReportId(final String wfId, final String jobName) {
50
        return String.format("%s::%s", wfId, jobName);
51
    }
52

  
53
    public void startListeningMessage() throws Exception {
54
        if (manager == null && StringUtils.isNotBlank(messageQueueServer) && StringUtils.isNotBlank(reportMessageQueue)) {
55
            manager = new MessageManager(messageQueueServer, username, password, messages);
56
            manager.startConsumingMessage(onGoingMessageQueue, true, false);
57
            manager.startConsumingMessage(reportMessageQueue, true, false);
58

  
59
            Runnable r = () -> {
60
                while (true) {
61
                    try {
62
                        Message currentMessage = messages.take();
63

  
64
                        if (currentMessage.getType() == MessageType.ONGOING) {
65
                            synchronized (onGonginMessages) {
66
                                onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
67
                            }
68
                        } else {
69
                            synchronized (reportMessages) {
70
                                if (!reportMessages.containsKey(currentMessage.getWorkflowId()))
71
                                {
72
                                    reportMessages.put(currentMessage.getWorkflowId(), new ArrayList<>());
73
                                }
74
                                reportMessages.get(currentMessage.getWorkflowId()).add(currentMessage);
75
                            }
76
                        }
77
                    } catch (InterruptedException e) {
78
                        log.error("An error occured on retrieving messages from the blocking queue",e);
79
                        throw new RuntimeException("An error occured on retrieving messages from the blocking queue",e);
80
                    }
81

  
82
                }
83
            };
84
            new Thread(r).start();
85
        }
86
    }
87

  
88
    public List<Message> getReport(final String workflowId) {
89

  
90
        return getMessages(reportMessages, workflowId);
91
    }
92

  
93
    private List<Message> getMessages(final Map<String, List<Message>> messageMap, String reportId) {
94
        if (messageMap.containsKey(reportId)) {
95
            List<Message> m = messageMap.get(reportId);
96
            messageMap.remove(reportId);
97
            return m;
98
        }
99
        return null;
100
    }
101

  
102

  
103
    private Message getMessage(final Map<String, Message> messageMap, String reportId) {
104
        if (messageMap.containsKey(reportId)) {
105
            Message m = messageMap.get(reportId);
106
            messageMap.remove(reportId);
107
            return m;
108
        }
109
        return null;
110
    }
111

  
112

  
113
    public Message getOnGoingMessages(final String workflowId) {
114
        return getMessage(onGonginMessages, workflowId);
115
    }
116

  
117

  
118
    public String getMessageQueueServer() {
119
        return messageQueueServer;
120
    }
121

  
122
    @Required
123
    public void setMessageQueueServer(String messageQueueServer) {
124
        this.messageQueueServer = messageQueueServer;
125
    }
126

  
127
    public String getUsername() {
128
        return username;
129
    }
130

  
131
    @Required
132
    public void setUsername(String username) {
133
        this.username = username;
134
    }
135

  
136
    public String getPassword() {
137
        return password;
138
    }
139

  
140
    @Required
141
    public void setPassword(String password) {
142
        this.password = password;
143
    }
86
	protected Map<String, List<Message>> getReports() {
87
		return reports;
88
	}
144 89
}

Also available in: Unified diff