Project

General

Profile

« Previous | Next » 

Revision 55245

Added by Enrico Ottonello over 5 years ago

alignment with rabbitMQ server configuration on dev_ongoing queue, durable property = false

View differences:

modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/message/DnetMessageManager.java
1 1
package eu.dnetlib.msro.message;
2 2

  
3
import eu.dnetlib.message.Message;
4
import eu.dnetlib.message.MessageManager;
5
import eu.dnetlib.message.MessageType;
3
import java.util.HashMap;
4
import java.util.Map;
5
import java.util.concurrent.LinkedBlockingQueue;
6

  
6 7
import org.apache.commons.logging.Log;
7 8
import org.apache.commons.logging.LogFactory;
8 9
import org.springframework.beans.factory.annotation.Required;
9 10

  
10
import java.util.HashMap;
11
import java.util.Map;
12
import java.util.concurrent.LinkedBlockingQueue;
11
import eu.dnetlib.message.Message;
12
import eu.dnetlib.message.MessageManager;
13
import eu.dnetlib.message.MessageType;
13 14

  
14 15
public class DnetMessageManager {
15
    private static final Log log = LogFactory.getLog(DnetMessageManager.class);
16 16

  
17
    private MessageManager manager;
17
	private static final Log log = LogFactory.getLog(DnetMessageManager.class);
18 18

  
19
    private LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
19
	private MessageManager manager;
20 20

  
21
	private final LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
21 22

  
22
    private final  Map<String, Message> onGonginMessages = new HashMap<>();
23
	private final Map<String, Message> onGonginMessages = new HashMap<>();
23 24

  
24
    private final Map<String, Message> reportMessages = new HashMap<>();
25
	private final Map<String, Message> reportMessages = new HashMap<>();
25 26

  
26
    private String messageQueueServer;
27
	private String messageQueueServer;
27 28

  
28
    private String username;
29
	private String username;
29 30

  
30
    private String password;
31
	private String password;
31 32

  
33
	public DnetMessageManager() {
32 34

  
33
    public DnetMessageManager() {
35
	}
34 36

  
35
    }
37
	private String createReportId(final String wfId, final String jobName) {
38
		return String.format("%s::%s", wfId, jobName);
39
	}
36 40

  
41
	public void startListeningMessage() throws Exception {
42
		if (manager == null) {
43
			manager = new MessageManager(messageQueueServer, username, password, messages);
44
			manager.startConsumingMessage("dev_ongoing", false, false);
45
			manager.startConsumingMessage("dev_report", true, false);
37 46

  
38
    private String createReportId(final String wfId, final String jobName) {
39
        return String.format("%s::%s", wfId, jobName);
40
    }
47
			final Runnable r = () -> {
48
				while (true) {
49
					try {
50
						final Message currentMessage = messages.take();
41 51

  
42
    public void startListeningMessage() throws Exception {
43
        if (manager == null) {
44
            manager = new MessageManager(messageQueueServer, username, password, messages);
45
            manager.startConsumingMessage("dev_ongoing", true, false);
46
            manager.startConsumingMessage("dev_report", true, false);
52
						if (currentMessage.getType() == MessageType.ONGOING) {
53
							synchronized (onGonginMessages) {
54
								onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
55
							}
56
						} else {
57
							synchronized (reportMessages) {
58
								reportMessages.put(createReportId(currentMessage.getWorkflowId(), currentMessage.getJobName()), currentMessage);
59
							}
60
						}
61
					} catch (final InterruptedException e) {
62
						log.error("An error occured on retrieving messages from the blocking queue", e);
63
						throw new RuntimeException("An error occured on retrieving messages from the blocking queue", e);
64
					}
47 65

  
48
            Runnable r = () -> {
49
                while (true) {
50
                    try {
51
                        Message currentMessage = messages.take();
66
				}
67
			};
68
			new Thread(r).start();
69
		}
70
	}
52 71

  
53
                        if (currentMessage.getType() == MessageType.ONGOING) {
54
                            synchronized (onGonginMessages) {
55
                                onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
56
                            }
57
                        } else {
58
                            synchronized (reportMessages) {
59
                                reportMessages.put(createReportId(currentMessage.getWorkflowId(), currentMessage.getJobName()), currentMessage);
60
                            }
61
                        }
62
                    } catch (InterruptedException e) {
63
                        log.error("An error occured on retrieving messages from the blocking queue",e);
64
                        throw new RuntimeException("An error occured on retrieving messages from the blocking queue",e);
65
                    }
72
	public Message getReport(final String workflowId, final String jobName) {
73
		final String reportId = createReportId(workflowId, jobName);
74
		return getMessage(reportMessages, reportId);
75
	}
66 76

  
67
                }
68
            };
69
            new Thread(r).start();
70
        }
71
    }
77
	private Message getMessage(final Map<String, Message> messageMap, final String reportId) {
78
		if (messageMap.containsKey(reportId)) {
79
			final Message m = messageMap.get(reportId);
80
			messageMap.remove(reportId);
81
			return m;
82
		}
83
		return null;
84
	}
72 85

  
73
    public Message getReport(final String workflowId, final String jobName) {
74
        final String reportId = createReportId(workflowId, jobName);
75
        return getMessage(reportMessages, reportId);
76
    }
86
	public Message getOnGoingMessages(final String workflowId) {
87
		return getMessage(onGonginMessages, workflowId);
88
	}
77 89

  
78
    private Message getMessage(final Map<String, Message> messageMap, String reportId) {
79
        if (messageMap.containsKey(reportId)) {
80
            Message m = messageMap.get(reportId);
81
            messageMap.remove(reportId);
82
            return m;
83
        }
84
        return null;
85
    }
90
	public String getMessageQueueServer() {
91
		return messageQueueServer;
92
	}
86 93

  
94
	@Required
95
	public void setMessageQueueServer(final String messageQueueServer) {
96
		this.messageQueueServer = messageQueueServer;
97
	}
87 98

  
88
    public Message getOnGoingMessages(final String workflowId) {
89
        return getMessage(onGonginMessages, workflowId);
90
    }
99
	public String getUsername() {
100
		return username;
101
	}
91 102

  
103
	@Required
104
	public void setUsername(final String username) {
105
		this.username = username;
106
	}
92 107

  
93
    public String getMessageQueueServer() {
94
        return messageQueueServer;
95
    }
108
	public String getPassword() {
109
		return password;
110
	}
96 111

  
97
    @Required
98
    public void setMessageQueueServer(String messageQueueServer) {
99
        this.messageQueueServer = messageQueueServer;
100
    }
101

  
102
    public String getUsername() {
103
        return username;
104
    }
105

  
106
    @Required
107
    public void setUsername(String username) {
108
        this.username = username;
109
    }
110

  
111
    public String getPassword() {
112
        return password;
113
    }
114

  
115
    @Required
116
    public void setPassword(String password) {
117
        this.password = password;
118
    }
112
	@Required
113
	public void setPassword(final String password) {
114
		this.password = password;
115
	}
119 116
}

Also available in: Unified diff