Project

General

Profile

1
package eu.dnetlib.msro.message;
2

    
3
import eu.dnetlib.message.Message;
4
import eu.dnetlib.message.MessageManager;
5
import eu.dnetlib.message.MessageType;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Required;
9

    
10
import java.util.HashMap;
11
import java.util.Map;
12
import java.util.concurrent.LinkedBlockingQueue;
13

    
14
public class DnetMessageManager {
15
    private static final Log log = LogFactory.getLog(DnetMessageManager.class);
16

    
17
    private MessageManager manager;
18

    
19
    private LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
20

    
21

    
22
    private final  Map<String, Message> onGonginMessages = new HashMap<>();
23

    
24
    private final Map<String, Message> reportMessages = new HashMap<>();
25

    
26
    private String messageQueueServer;
27

    
28
    private String username;
29

    
30
    private String password;
31

    
32

    
33
    public DnetMessageManager() {
34

    
35
    }
36

    
37

    
38
    private String createReportId(final String wfId, final String jobName) {
39
        return String.format("%s::%s", wfId, jobName);
40
    }
41

    
42
    public void startListeningMessage() throws Exception {
43
        if (manager == null) {
44
            manager = new MessageManager(messageQueueServer, username, password, messages);
45
            manager.startConsumingMessage("dev_ongoing", true, false);
46
            manager.startConsumingMessage("dev_report", true, false);
47

    
48
            Runnable r = () -> {
49
                while (true) {
50
                    try {
51
                        Message currentMessage = messages.take();
52

    
53
                        if (currentMessage.getType() == MessageType.ONGOING) {
54
                            synchronized (onGonginMessages) {
55
                                onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
56
                            }
57
                        } else {
58
                            synchronized (reportMessages) {
59
                                reportMessages.put(createReportId(currentMessage.getWorkflowId(), currentMessage.getJobName()), currentMessage);
60
                            }
61
                        }
62
                    } catch (InterruptedException e) {
63
                        log.error("An error occured on retrieving messages from the blocking queue",e);
64
                        throw new RuntimeException("An error occured on retrieving messages from the blocking queue",e);
65
                    }
66

    
67
                }
68
            };
69
            new Thread(r).start();
70
        }
71
    }
72

    
73
    public Message getReport(final String workflowId, final String jobName) {
74
        final String reportId = createReportId(workflowId, jobName);
75
        return getMessage(reportMessages, reportId);
76
    }
77

    
78
    private Message getMessage(final Map<String, Message> messageMap, String reportId) {
79
        if (messageMap.containsKey(reportId)) {
80
            Message m = messageMap.get(reportId);
81
            messageMap.remove(reportId);
82
            return m;
83
        }
84
        return null;
85
    }
86

    
87

    
88
    public Message getOnGoingMessages(final String workflowId) {
89
        return getMessage(onGonginMessages, workflowId);
90
    }
91

    
92

    
93
    public String getMessageQueueServer() {
94
        return messageQueueServer;
95
    }
96

    
97
    @Required
98
    public void setMessageQueueServer(String messageQueueServer) {
99
        this.messageQueueServer = messageQueueServer;
100
    }
101

    
102
    public String getUsername() {
103
        return username;
104
    }
105

    
106
    @Required
107
    public void setUsername(String username) {
108
        this.username = username;
109
    }
110

    
111
    public String getPassword() {
112
        return password;
113
    }
114

    
115
    @Required
116
    public void setPassword(String password) {
117
        this.password = password;
118
    }
119
}
    (1-1/1)