Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message;
2

    
3

    
4
import eu.dnetlib.message.Message;
5
import eu.dnetlib.message.MessageManager;
6
import eu.dnetlib.message.MessageType;
7
import org.apache.commons.lang3.StringUtils;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Required;
11
import org.springframework.beans.factory.annotation.Value;
12

    
13
import java.util.ArrayList;
14
import java.util.HashMap;
15
import java.util.List;
16
import java.util.Map;
17
import java.util.concurrent.LinkedBlockingQueue;
18

    
19
public class DnetMessageManager {
20
    private static final Log log = LogFactory.getLog(DnetMessageManager.class);
21

    
22
    private MessageManager manager;
23

    
24
    private LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
25

    
26

    
27
    private final  Map<String, Message> onGonginMessages = new HashMap<>();
28

    
29
    private final Map<String, List<Message>> reportMessages = new HashMap<>();
30

    
31
    private String messageQueueServer;
32

    
33
    private String username;
34

    
35
    private String password;
36

    
37
    @Value("${dnet.openaire.messageManager.ongoingMessageQueueName}")
38
    private String onGoingMessageQueue;
39

    
40
    @Value("${dnet.openaire.messageManager.reportMessageQueueName}")
41
    private String reportMessageQueue;
42

    
43

    
44
    public DnetMessageManager() {
45

    
46
    }
47

    
48

    
49
    private String createReportId(final String wfId, final String jobName) {
50
        return String.format("%s::%s", wfId, jobName);
51
    }
52

    
53
    public void startListeningMessage() throws Exception {
54
        if (manager == null && StringUtils.isNotBlank(messageQueueServer) && StringUtils.isNotBlank(reportMessageQueue)) {
55
            manager = new MessageManager(messageQueueServer, username, password, messages);
56
            manager.startConsumingMessage(onGoingMessageQueue, true, false);
57
            manager.startConsumingMessage(reportMessageQueue, true, false);
58

    
59
            Runnable r = () -> {
60
                while (true) {
61
                    try {
62
                        Message currentMessage = messages.take();
63

    
64
                        if (currentMessage.getType() == MessageType.ONGOING) {
65
                            synchronized (onGonginMessages) {
66
                                onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
67
                            }
68
                        } else {
69
                            synchronized (reportMessages) {
70
                                if (!reportMessages.containsKey(currentMessage.getWorkflowId()))
71
                                {
72
                                    reportMessages.put(currentMessage.getWorkflowId(), new ArrayList<>());
73
                                }
74
                                reportMessages.get(currentMessage.getWorkflowId()).add(currentMessage);
75
                            }
76
                        }
77
                    } catch (InterruptedException e) {
78
                        log.error("An error occured on retrieving messages from the blocking queue",e);
79
                        throw new RuntimeException("An error occured on retrieving messages from the blocking queue",e);
80
                    }
81

    
82
                }
83
            };
84
            new Thread(r).start();
85
        }
86
    }
87

    
88
    public List<Message> getReport(final String workflowId) {
89

    
90
        return getMessages(reportMessages, workflowId);
91
    }
92

    
93
    private List<Message> getMessages(final Map<String, List<Message>> messageMap, String reportId) {
94
        if (messageMap.containsKey(reportId)) {
95
            List<Message> m = messageMap.get(reportId);
96
            messageMap.remove(reportId);
97
            return m;
98
        }
99
        return null;
100
    }
101

    
102

    
103
    private Message getMessage(final Map<String, Message> messageMap, String reportId) {
104
        if (messageMap.containsKey(reportId)) {
105
            Message m = messageMap.get(reportId);
106
            messageMap.remove(reportId);
107
            return m;
108
        }
109
        return null;
110
    }
111

    
112

    
113
    public Message getOnGoingMessages(final String workflowId) {
114
        return getMessage(onGonginMessages, workflowId);
115
    }
116

    
117

    
118
    public String getMessageQueueServer() {
119
        return messageQueueServer;
120
    }
121

    
122
    @Required
123
    public void setMessageQueueServer(String messageQueueServer) {
124
        this.messageQueueServer = messageQueueServer;
125
    }
126

    
127
    public String getUsername() {
128
        return username;
129
    }
130

    
131
    @Required
132
    public void setUsername(String username) {
133
        this.username = username;
134
    }
135

    
136
    public String getPassword() {
137
        return password;
138
    }
139

    
140
    @Required
141
    public void setPassword(String password) {
142
        this.password = password;
143
    }
144
}
    (1-1/1)