Project

General

Profile

« Previous | Next » 

Revision 60357

Removed MessageType from Message

View differences:

modules/dnet-openaireplus-workflows/trunk/src/test/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/message/DnetMessageManagerTest.java
6 6
import org.junit.Before;
7 7
import org.junit.Test;
8 8

  
9
import eu.dnetlib.message.Message;
10
import eu.dnetlib.message.MessageType;
9
import eu.dnetlib.dhp.message.Message;
11 10
import eu.dnetlib.msro.rmi.MSROException;
12 11

  
13 12
public class DnetMessageManagerTest {
14 13

  
15 14
	private DnetMessageManager manager;
16 15

  
16
	private static final int MAX_N_MESSAGES = 10;
17

  
17 18
	@Before
18 19
	public void setUp() throws Exception {
19 20
		manager = new DnetMessageManager();
20
		manager.setMaxNumberOfOnGoingMessages(10);
21
		manager.setMaxNumberOfReports(100);
21
		manager.setMaxNumberOfMessages(MAX_N_MESSAGES);
22 22
	}
23 23

  
24 24
	@Test
25
	public void testRegisterMessage_ongoing_multi_wfs() throws MSROException {
26
		for (long i = 0; i < manager.getMaxNumberOfOnGoingMessages(); i++) {
27
			manager.registerMessage(prepareMessage("wf-" + i, MessageType.ONGOING));
28
			assertTrue(manager.getOnGonginMessages().size() <= manager.getMaxNumberOfOnGoingMessages());
25
	public void testRegisterMessage_multi_wfs() throws MSROException {
26
		for (long i = 0; i < MAX_N_MESSAGES; i++) {
27
			manager.registerMessage(prepareMessage("wf-" + i));
28
			assertTrue(manager.getMessages().size() <= MAX_N_MESSAGES);
29 29
		}
30 30

  
31
		for (long i = manager.getMaxNumberOfOnGoingMessages(); i < 200; i++) {
32
			manager.registerMessage(prepareMessage("wf-" + i, MessageType.ONGOING));
33
			assertEquals(manager.getMaxNumberOfOnGoingMessages(), manager.getOnGonginMessages().size());
31
		for (long i = MAX_N_MESSAGES; i < 200; i++) {
32
			manager.registerMessage(prepareMessage("wf-" + i));
33
			assertEquals(MAX_N_MESSAGES, manager.getMessages().size());
34 34
		}
35 35
	}
36 36

  
37 37
	@Test
38
	public void testRegisterMessage_ongoing_single_wf() throws MSROException {
38
	public void testRegisterMessage_single_wf() throws MSROException {
39 39
		for (long i = 0; i < 100; i++) {
40
			manager.registerMessage(prepareMessage("wf-xxxx", MessageType.ONGOING));
41
			assertEquals(1, manager.getOnGonginMessages().size());
40
			manager.registerMessage(prepareMessage("wf-xxxx"));
41
			assertEquals(1, manager.getMessages().size());
42 42
		}
43 43
	}
44 44

  
45
	@Test
46
	public void testRegisterMessage_reports_multi_wf() throws MSROException {
47
		for (long i = 0; i < manager.getMaxNumberOfReports(); i++) {
48
			manager.registerMessage(prepareMessage("wf-" + i, MessageType.REPORT));
49
			assertTrue(manager.getReports().size() <= manager.getMaxNumberOfReports());
50
		}
51

  
52
		for (long i = manager.getMaxNumberOfReports(); i < 200; i++) {
53
			manager.registerMessage(prepareMessage("wf-" + i, MessageType.REPORT));
54
			assertEquals(manager.getMaxNumberOfReports(), manager.getReports().size());
55
		}
56
	}
57

  
58
	@Test
59
	public void testRegisterMessage_reports_single_wf() throws MSROException {
60
		for (long i = 1; i < 100; i++) {
61
			manager.registerMessage(prepareMessage("wf-xxxx", MessageType.REPORT));
62
			assertEquals(1, manager.getReports().size());
63
			assertEquals(i, manager.getReports("wf-xxxx", false).size());
64
		}
65
	}
66

  
67
	@Test
68
	public void testRegisterMessage_reports_single_wf_delete() throws MSROException {
69
		for (long i = 1; i < 100; i++) {
70
			manager.registerMessage(prepareMessage("wf-xxxx", MessageType.REPORT));
71
			assertEquals(1, manager.getReports().size());
72
			assertEquals(1, manager.getReports("wf-xxxx", true).size());
73
			assertEquals(0, manager.getReports().size());
74
		}
75
	}
76

  
77
	private Message prepareMessage(final String wfId, final MessageType type) {
45
	private Message prepareMessage(final String wfId) {
78 46
		final Message msg = new Message();
79 47
		msg.setWorkflowId(wfId);
80
		msg.setType(type);
81 48
		return msg;
82 49
	}
83 50

  
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/message/DnetMessageManager.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message;
2 2

  
3
import java.util.ArrayList;
4 3
import java.util.Collections;
5 4
import java.util.LinkedHashMap;
6
import java.util.List;
7 5
import java.util.Map;
8 6

  
9 7
import org.apache.commons.logging.Log;
10 8
import org.apache.commons.logging.LogFactory;
11 9
import org.springframework.beans.factory.annotation.Required;
12 10

  
13
import eu.dnetlib.message.Message;
14
import eu.dnetlib.message.MessageType;
11
import eu.dnetlib.dhp.message.Message;
15 12
import eu.dnetlib.msro.rmi.MSROException;
16 13

  
17 14
public class DnetMessageManager {
18 15

  
19 16
	private static final Log log = LogFactory.getLog(DnetMessageManager.class);
20 17

  
21
	private final Map<String, Message> onGonginMessages = Collections.synchronizedMap(new LinkedHashMap<>());
18
	private final Map<String, Message> messages = Collections.synchronizedMap(new LinkedHashMap<>());
22 19

  
23
	private final Map<String, List<Message>> reports = Collections.synchronizedMap(new LinkedHashMap<>());
20
	private long maxNumberOfMessages;
24 21

  
25
	private long maxNumberOfOnGoingMessages;
26

  
27
	private long maxNumberOfReports;
28

  
29 22
	public synchronized void registerMessage(final Message message) throws MSROException {
30
		if (message.getType() == MessageType.ONGOING) {
31
			log.debug("Received a ONGOING message");
32
			onGonginMessages.put(message.getWorkflowId(), message);
33
			if (onGonginMessages.size() > maxNumberOfOnGoingMessages) {
34
				onGonginMessages.remove(onGonginMessages.keySet().iterator().next());
35
			}
36
		} else if (message.getType() == MessageType.REPORT) {
37
			log.debug("Received a REPORT message");
38
			if (!reports.containsKey(message.getWorkflowId())) {
39
				reports.put(message.getWorkflowId(), new ArrayList<>());
40
			}
41
			reports.get(message.getWorkflowId()).add(message);
42
			if (reports.size() > maxNumberOfReports) {
43
				reports.remove(reports.keySet().iterator().next());
44
			}
45
		} else {
46
			throw new MSROException("Invalid message type: " + message.getType());
23
		log.debug("Received a ONGOING message");
24
		messages.put(message.getWorkflowId(), message);
25
		if (messages.size() > maxNumberOfMessages) {
26
			messages.remove(messages.keySet().iterator().next());
47 27
		}
48 28
	}
49 29

  
50
	public List<Message> getReports(final String workflowId, final boolean delete) {
51
		if (!reports.containsKey(workflowId)) {
52
			return new ArrayList<>();
53
		} else if (delete) {
54
			return reports.remove(workflowId);
55
		} else {
56
			return reports.get(workflowId);
57
		}
30
	public Message findMessage(final String workflowId) {
31
		return messages.get(workflowId);
58 32
	}
59 33

  
60
	public Message getOnGoingMessage(final String workflowId) {
61
		return onGonginMessages.get(workflowId);
34
	protected Map<String, Message> getMessages() {
35
		return messages;
62 36
	}
63 37

  
64
	public long getMaxNumberOfOnGoingMessages() {
65
		return maxNumberOfOnGoingMessages;
38
	public long getMaxNumberOfMessages() {
39
		return maxNumberOfMessages;
66 40
	}
67 41

  
68 42
	@Required
69
	public void setMaxNumberOfOnGoingMessages(final long maxNumberOfOnGoingMessages) {
70
		this.maxNumberOfOnGoingMessages = maxNumberOfOnGoingMessages;
43
	public void setMaxNumberOfMessages(final long maxNumberOfMessages) {
44
		this.maxNumberOfMessages = maxNumberOfMessages;
71 45
	}
72

  
73
	public long getMaxNumberOfReports() {
74
		return maxNumberOfReports;
75
	}
76

  
77
	@Required
78
	public void setMaxNumberOfReports(final long maxNumberOfReports) {
79
		this.maxNumberOfReports = maxNumberOfReports;
80
	}
81

  
82
	protected Map<String, Message> getOnGonginMessages() {
83
		return onGonginMessages;
84
	}
85

  
86
	protected Map<String, List<Message>> getReports() {
87
		return reports;
88
	}
89 46
}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/message/DnetMessageManagerController.java
17 17
import org.springframework.web.bind.annotation.ResponseBody;
18 18
import org.springframework.web.bind.annotation.ResponseStatus;
19 19

  
20
import eu.dnetlib.message.Message;
20
import eu.dnetlib.dhp.message.Message;
21 21
import eu.dnetlib.msro.rmi.MSROException;
22 22

  
23 23
@Controller
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/SubmitDnetHadoopJobNode.java
11 11
import com.googlecode.sarasvati.NodeToken;
12 12
import com.googlecode.sarasvati.env.Env;
13 13

  
14
import eu.dnetlib.dhp.message.Message;
14 15
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
15
import eu.dnetlib.message.Message;
16 16
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
17 17
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
18 18
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
......
42 42
	}
43 43

  
44 44
	private void updateProgressProvider() {
45
		final Message mess = dnetMessageManager.getOnGoingMessage(wfId);
45
		final Message mess = dnetMessageManager.findMessage(wfId);
46 46
		if (mess != null && mess.getBody() != null) {
47 47
			if (mess.getBody().containsKey("current")) {
48 48
				try {
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/message/applicationContext-msro-openaire-message.properties
1
dnet.openaire.messageManager.maxNumberOfOnGoingMessages = 100
2
dnet.openaire.messageManager.maxNumberOfReports         = 1000
1
dnet.openaire.messageManager.maxNumberOfMessages = 100
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/message/applicationContext-msro-openaire-message.xml
4 4
       xmlns="http://www.springframework.org/schema/beans"
5 5
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
6 6

  
7
    <bean id="dnetMessageManager" class="eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager" 
8
    	p:maxNumberOfOnGoingMessages="${dnet.openaire.messageManager.maxNumberOfOnGoingMessages}"
9
    	p:maxNumberOfReports="${dnet.openaire.messageManager.maxNumberOfReports}" />
7
    <bean id="dnetMessageManager" 
8
    	class="eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager" 
9
    	p:maxNumberOfMessages="${dnet.openaire.messageManager.maxNumberOfMessages}" />
10 10

  
11 11
</beans>
modules/dnet-openaireplus-workflows/trunk/pom.xml
186 186
		<dependency>
187 187
			<groupId>eu.dnetlib.dhp</groupId>
188 188
			<artifactId>dhp-common</artifactId>
189
			<version>[1.0.0, 2.0.0)</version>
189
			<version>1.2.4-branch_hadoop_aggregator-SNAPSHOT</version>
190 190
			<exclusions>
191 191
				<exclusion>
192 192
					<groupId>net.sf.saxon</groupId>

Also available in: Unified diff