Project

General

Profile

« Previous | Next » 

Revision 55265

complete wf of collection

View differences:

DnetMessageManager.java
1 1
package eu.dnetlib.msro.message;
2 2

  
3
import java.util.HashMap;
4
import java.util.Map;
5
import java.util.concurrent.LinkedBlockingQueue;
6

  
3
import eu.dnetlib.message.Message;
4
import eu.dnetlib.message.MessageManager;
5
import eu.dnetlib.message.MessageType;
7 6
import org.apache.commons.logging.Log;
8 7
import org.apache.commons.logging.LogFactory;
9 8
import org.springframework.beans.factory.annotation.Required;
10 9

  
11
import eu.dnetlib.message.Message;
12
import eu.dnetlib.message.MessageManager;
13
import eu.dnetlib.message.MessageType;
10
import java.util.ArrayList;
11
import java.util.HashMap;
12
import java.util.List;
13
import java.util.Map;
14
import java.util.concurrent.LinkedBlockingQueue;
14 15

  
15 16
public class DnetMessageManager {
17
    private static final Log log = LogFactory.getLog(DnetMessageManager.class);
16 18

  
17
	private static final Log log = LogFactory.getLog(DnetMessageManager.class);
19
    private MessageManager manager;
18 20

  
19
	private MessageManager manager;
21
    private LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
20 22

  
21
	private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
22 23

  
23
	private final Map<String, Message> onGonginMessages = new HashMap<>();
24
    private final  Map<String, Message> onGonginMessages = new HashMap<>();
24 25

  
25
	private final Map<String, Message> reportMessages = new HashMap<>();
26
    private final Map<String, List<Message>> reportMessages = new HashMap<>();
26 27

  
27
	private String messageQueueServer;
28
    private String messageQueueServer;
28 29

  
29
	private String username;
30
    private String username;
30 31

  
31
	private String password;
32
    private String password;
32 33

  
33
	public DnetMessageManager() {
34 34

  
35
	}
35
    public DnetMessageManager() {
36 36

  
37
	private String createReportId(final String wfId, final String jobName) {
38
		return String.format("%s::%s", wfId, jobName);
39
	}
37
    }
40 38

  
41
	public void startListeningMessage() throws Exception {
42
		if (manager == null) {
43
			manager = new MessageManager(messageQueueServer, username, password, messages);
44
			manager.startConsumingMessage("dev_ongoing", false, false);
45
			manager.startConsumingMessage("dev_report", true, false);
46 39

  
47
			final Runnable r = () -> {
48
				while (true) {
49
					try {
50
						final Message currentMessage = messages.take();
40
    private String createReportId(final String wfId, final String jobName) {
41
        return String.format("%s::%s", wfId, jobName);
42
    }
51 43

  
52
						if (currentMessage.getType() == MessageType.ONGOING) {
53
							synchronized (onGonginMessages) {
54
								onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
55
							}
56
						} else {
57
							synchronized (reportMessages) {
58
								reportMessages.put(createReportId(currentMessage.getWorkflowId(), currentMessage.getJobName()), currentMessage);
59
							}
60
						}
61
					} catch (final InterruptedException e) {
62
						log.error("An error occured on retrieving messages from the blocking queue", e);
63
						throw new RuntimeException("An error occured on retrieving messages from the blocking queue", e);
64
					}
44
    public void startListeningMessage() throws Exception {
45
        if (manager == null) {
46
            manager = new MessageManager(messageQueueServer, username, password, messages);
47
            manager.startConsumingMessage("dev_ongoing", true, false);
48
            manager.startConsumingMessage("dev_report", true, false);
65 49

  
66
				}
67
			};
68
			new Thread(r).start();
69
		}
70
	}
50
            Runnable r = () -> {
51
                while (true) {
52
                    try {
53
                        Message currentMessage = messages.take();
71 54

  
72
	public Message getReport(final String workflowId, final String jobName) {
73
		final String reportId = createReportId(workflowId, jobName);
74
		return getMessage(reportMessages, reportId);
75
	}
55
                        if (currentMessage.getType() == MessageType.ONGOING) {
56
                            synchronized (onGonginMessages) {
57
                                onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
58
                            }
59
                        } else {
60
                            synchronized (reportMessages) {
61
                                if (!reportMessages.containsKey(currentMessage.getWorkflowId()))
62
                                {
63
                                    reportMessages.put(currentMessage.getWorkflowId(), new ArrayList<>());
64
                                }
65
                                reportMessages.get(currentMessage.getWorkflowId()).add(currentMessage);
66
                            }
67
                        }
68
                    } catch (InterruptedException e) {
69
                        log.error("An error occured on retrieving messages from the blocking queue",e);
70
                        throw new RuntimeException("An error occured on retrieving messages from the blocking queue",e);
71
                    }
76 72

  
77
	private Message getMessage(final Map<String, Message> messageMap, final String reportId) {
78
		if (messageMap.containsKey(reportId)) {
79
			final Message m = messageMap.get(reportId);
80
			messageMap.remove(reportId);
81
			return m;
82
		}
83
		return null;
84
	}
73
                }
74
            };
75
            new Thread(r).start();
76
        }
77
    }
85 78

  
86
	public Message getOnGoingMessages(final String workflowId) {
87
		return getMessage(onGonginMessages, workflowId);
88
	}
79
    public List<Message> getReport(final String workflowId) {
89 80

  
90
	public String getMessageQueueServer() {
91
		return messageQueueServer;
92
	}
81
        return getMessages(reportMessages, workflowId);
82
    }
93 83

  
94
	@Required
95
	public void setMessageQueueServer(final String messageQueueServer) {
96
		this.messageQueueServer = messageQueueServer;
97
	}
84
    private List<Message> getMessages(final Map<String, List<Message>> messageMap, String reportId) {
85
        if (messageMap.containsKey(reportId)) {
86
            List<Message> m = messageMap.get(reportId);
87
            messageMap.remove(reportId);
88
            return m;
89
        }
90
        return null;
91
    }
98 92

  
99
	public String getUsername() {
100
		return username;
101
	}
102 93

  
103
	@Required
104
	public void setUsername(final String username) {
105
		this.username = username;
106
	}
94
    private Message getMessage(final Map<String, Message> messageMap, String reportId) {
95
        if (messageMap.containsKey(reportId)) {
96
            Message m = messageMap.get(reportId);
97
            messageMap.remove(reportId);
98
            return m;
99
        }
100
        return null;
101
    }
107 102

  
108
	public String getPassword() {
109
		return password;
110
	}
111 103

  
112
	@Required
113
	public void setPassword(final String password) {
114
		this.password = password;
115
	}
104
    public Message getOnGoingMessages(final String workflowId) {
105
        return getMessage(onGonginMessages, workflowId);
106
    }
107

  
108

  
109
    public String getMessageQueueServer() {
110
        return messageQueueServer;
111
    }
112

  
113
    @Required
114
    public void setMessageQueueServer(String messageQueueServer) {
115
        this.messageQueueServer = messageQueueServer;
116
    }
117

  
118
    public String getUsername() {
119
        return username;
120
    }
121

  
122
    @Required
123
    public void setUsername(String username) {
124
        this.username = username;
125
    }
126

  
127
    public String getPassword() {
128
        return password;
129
    }
130

  
131
    @Required
132
    public void setPassword(String password) {
133
        this.password = password;
134
    }
116 135
}

Also available in: Unified diff