Project

General

Profile

« Previous | Next » 

Revision 55196

- added new Node for submitting hadoop Job and get info using rabbitmq client

View differences:

modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/message/DnetMessageManager.java
1
package eu.dnetlib.msro.message;
2

  
3
import eu.dnetlib.message.Message;
4
import eu.dnetlib.message.MessageManager;
5
import eu.dnetlib.message.MessageType;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Required;
9

  
10
import java.util.HashMap;
11
import java.util.Map;
12
import java.util.concurrent.LinkedBlockingQueue;
13

  
14
public class DnetMessageManager {
15
    private static final Log log = LogFactory.getLog(DnetMessageManager.class);
16

  
17
    private MessageManager manager;
18

  
19
    private LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>();
20

  
21

  
22
    private final  Map<String, Message> onGonginMessages = new HashMap<>();
23

  
24
    private final Map<String, Message> reportMessages = new HashMap<>();
25

  
26
    private String messageQueueServer;
27

  
28
    private String username;
29

  
30
    private String password;
31

  
32

  
33
    public DnetMessageManager() {
34

  
35
    }
36

  
37

  
38
    private String createReportId(final String wfId, final String jobName) {
39
        return String.format("%s::%s", wfId, jobName);
40
    }
41

  
42
    public void startListeningMessage() throws Exception {
43
        if (manager == null) {
44
            manager = new MessageManager(messageQueueServer, username, password, messages);
45
            manager.startConsumingMessage("dev_ongoing", true, false);
46
            manager.startConsumingMessage("dev_report", true, false);
47

  
48
            Runnable r = () -> {
49
                while (true) {
50
                    try {
51
                        Message currentMessage = messages.take();
52

  
53
                        if (currentMessage.getType() == MessageType.ONGOING) {
54
                            synchronized (onGonginMessages) {
55
                                onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage);
56
                            }
57
                        } else {
58
                            synchronized (reportMessages) {
59
                                reportMessages.put(createReportId(currentMessage.getWorkflowId(), currentMessage.getJobName()), currentMessage);
60
                            }
61
                        }
62
                    } catch (InterruptedException e) {
63
                        log.error("An error occured on retrieving messages from the blocking queue",e);
64
                        throw new RuntimeException("An error occured on retrieving messages from the blocking queue",e);
65
                    }
66

  
67
                }
68
            };
69
            new Thread(r).start();
70
        }
71
    }
72

  
73
    public Message getReport(final String workflowId, final String jobName) {
74
        final String reportId = createReportId(workflowId, jobName);
75
        return getMessage(reportMessages, reportId);
76
    }
77

  
78
    private Message getMessage(final Map<String, Message> messageMap, String reportId) {
79
        if (messageMap.containsKey(reportId)) {
80
            Message m = messageMap.get(reportId);
81
            messageMap.remove(reportId);
82
            return m;
83
        }
84
        return null;
85
    }
86

  
87

  
88
    public Message getOnGoingMessages(final String workflowId) {
89
        return getMessage(onGonginMessages, workflowId);
90
    }
91

  
92

  
93
    public String getMessageQueueServer() {
94
        return messageQueueServer;
95
    }
96

  
97
    @Required
98
    public void setMessageQueueServer(String messageQueueServer) {
99
        this.messageQueueServer = messageQueueServer;
100
    }
101

  
102
    public String getUsername() {
103
        return username;
104
    }
105

  
106
    @Required
107
    public void setUsername(String username) {
108
        this.username = username;
109
    }
110

  
111
    public String getPassword() {
112
        return password;
113
    }
114

  
115
    @Required
116
    public void setPassword(String password) {
117
        this.password = password;
118
    }
119
}
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hadoop/SubmitDnetHadoopJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2

  
3
import com.googlecode.sarasvati.Engine;
4
import com.googlecode.sarasvati.NodeToken;
5
import com.googlecode.sarasvati.env.Env;
6
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
7
import eu.dnetlib.message.Message;
8
import eu.dnetlib.msro.message.DnetMessageManager;
9
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
10
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12
import eu.dnetlib.msro.workflows.util.ProgressProvider;
13
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17

  
18
import java.util.Map;
19

  
20
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
21
    private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
22

  
23
    @Autowired
24
    DnetMessageManager  dnetMessageManager;
25

  
26
    private boolean ongoing = true;
27

  
28
    private int currentValue;
29

  
30
    private String wfId;
31

  
32

  
33

  
34
    @Override
35
    protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
36
        this.wfId =token.getProcess().getEnv().getAttribute("system:processId");
37
        job.getParameters().put("wfId",wfId);
38
        Runnable r = () -> {
39
          while (ongoing) {
40
              Message mess = dnetMessageManager.getOnGoingMessages(wfId);
41
              if (mess!= null && mess.getBody()!= null && mess.getBody().containsKey("progressCount")) {
42
                  try {
43
                      this.currentValue = Integer.parseInt(mess.getBody().get("progressCount"));
44
                      Thread.sleep(1000);
45
                  } catch (Throwable e) {
46

  
47
                  }
48
              }
49
          }
50
        };
51

  
52
        new Thread(r).start();
53
        super.prepareJob(job, token);
54
    }
55

  
56

  
57
    @Override
58
    protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
59
        return new BlackboardWorkflowJobListener(engine, token) {
60

  
61

  
62
            @Override
63
            protected void onFailed(final BlackboardJob job) {
64
                ongoing = false;
65
                log.warn("Blackboard workflow node FAILED: " + job.getError());
66
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
67
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
68
                complete(job, "abort");
69
            }
70

  
71
            @Override
72
            protected void populateEnv(Env env, Map<String, String> responseParams) {
73
                ongoing=false;
74

  
75
                Message report = dnetMessageManager.getReport(wfId, "Collection");
76
                if (report!= null)
77
                    report.getBody().forEach(env::setAttribute);
78
            }
79

  
80

  
81
        };
82
    }
83

  
84
    @Override
85
    public int getTotalValue() {
86
        return 0;
87
    }
88

  
89
    @Override
90
    public int getCurrentValue() {
91
        return currentValue;
92
    }
93

  
94
    @Override
95
    public boolean isInaccurate() {
96
        return false;
97
    }
98

  
99
    @Override
100
    public ProgressProvider getProgressProvider() {
101
        return this;
102
    }
103
}
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/repo-hi/Aggregate_Metadata_from_PubsRepository_Hadoop.xml
43 43
				<DESCRIPTION>Create MetaWorkflow</DESCRIPTION>
44 44
				<PARAMETERS>
45 45
					<PARAM name="wfName" managedBy="system" required="true"
46
					       type="string">Aggregate Metadata (publications) from PubsRepository [Support]
46
					       type="string">Aggregate Metadata (publications) from PubsRepository [Hadoop]
47 47
					</PARAM>
48 48
				</PARAMETERS>
49 49
				<ARCS>
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/message/applicationContext-msro-openaire-message.xml
1
<?xml version="1.0" encoding="UTF-8"?>
2
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3
       xmlns:p="http://www.springframework.org/schema/p"
4
       xmlns="http://www.springframework.org/schema/beans"
5
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
6

  
7

  
8
    <bean id="dnetMessageManager" class="eu.dnetlib.msro.message.DnetMessageManager"
9
          p:messageQueueServer="${dnet.openaire.messageManager.host}"
10
          p:username="${dnet.openaire.messageManager.username}"
11
          p:password="${dnet.openaire.messageManager..password}"
12
          init-method="startListeningMessage"
13
    ></bean>
14

  
15

  
16
</beans>
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/applicationContext-repohi.xml
56 56
		</property>
57 57
	</bean>
58 58

  
59

  
60 59
	<bean id="metaWfDataRepository"
61 60
		class="eu.dnetlib.msro.workflows.metawf.DatasourceMetaWorkflow"
62 61
		p:destroyWorkflowTemplate="classpath:/eu/dnetlib/msro/openaireplus/workflows/repo-hi/dataRepository/repoBye.wf.st"
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml
381 381
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.CommitMDStoreVersionJobNode"
382 382
		p:mdStoreManagerUrl="${dhp.mdstore.manager.url}" scope="prototype" />
383 383

  
384
	<bean id="wfNodeSubmitDnetHadoopJobNode"
385
		  class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.SubmitDnetHadoopJobNode"
386
		  scope="prototype" />
387

  
384 388
	<bean id="wfNodeDeleteMDStoreHadoop"
385 389
		class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.DeleteMDStoreHadoopJobNode"
386 390
		p:mdStoreManagerUrl="${dhp.mdstore.manager.url}" scope="prototype" />
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/pom.xml
12 12
	<groupId>eu.dnetlib</groupId>
13 13
	<artifactId>dnet-openaireplus-workflows</artifactId>
14 14
	<packaging>jar</packaging>
15
	<version>6.3.30-solr75-SNAPSHOT</version>
15
	<version>7.0.0-hadoop-SNAPSHOT</version>
16 16

  
17 17
	<scm>
18 18
		<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-openaireplus-workflows/branches/solr75</developerConnection>

Also available in: Unified diff