Revision 55196
Added by Sandro La Bruzzo about 5 years ago
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/message/DnetMessageManager.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.message; |
|
2 |
|
|
3 |
import eu.dnetlib.message.Message; |
|
4 |
import eu.dnetlib.message.MessageManager; |
|
5 |
import eu.dnetlib.message.MessageType; |
|
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
8 |
import org.springframework.beans.factory.annotation.Required; |
|
9 |
|
|
10 |
import java.util.HashMap; |
|
11 |
import java.util.Map; |
|
12 |
import java.util.concurrent.LinkedBlockingQueue; |
|
13 |
|
|
14 |
public class DnetMessageManager { |
|
15 |
private static final Log log = LogFactory.getLog(DnetMessageManager.class); |
|
16 |
|
|
17 |
private MessageManager manager; |
|
18 |
|
|
19 |
private LinkedBlockingQueue<Message> messages = new LinkedBlockingQueue<>(); |
|
20 |
|
|
21 |
|
|
22 |
private final Map<String, Message> onGonginMessages = new HashMap<>(); |
|
23 |
|
|
24 |
private final Map<String, Message> reportMessages = new HashMap<>(); |
|
25 |
|
|
26 |
private String messageQueueServer; |
|
27 |
|
|
28 |
private String username; |
|
29 |
|
|
30 |
private String password; |
|
31 |
|
|
32 |
|
|
33 |
public DnetMessageManager() { |
|
34 |
|
|
35 |
} |
|
36 |
|
|
37 |
|
|
38 |
private String createReportId(final String wfId, final String jobName) { |
|
39 |
return String.format("%s::%s", wfId, jobName); |
|
40 |
} |
|
41 |
|
|
42 |
public void startListeningMessage() throws Exception { |
|
43 |
if (manager == null) { |
|
44 |
manager = new MessageManager(messageQueueServer, username, password, messages); |
|
45 |
manager.startConsumingMessage("dev_ongoing", true, false); |
|
46 |
manager.startConsumingMessage("dev_report", true, false); |
|
47 |
|
|
48 |
Runnable r = () -> { |
|
49 |
while (true) { |
|
50 |
try { |
|
51 |
Message currentMessage = messages.take(); |
|
52 |
|
|
53 |
if (currentMessage.getType() == MessageType.ONGOING) { |
|
54 |
synchronized (onGonginMessages) { |
|
55 |
onGonginMessages.put(currentMessage.getWorkflowId(), currentMessage); |
|
56 |
} |
|
57 |
} else { |
|
58 |
synchronized (reportMessages) { |
|
59 |
reportMessages.put(createReportId(currentMessage.getWorkflowId(), currentMessage.getJobName()), currentMessage); |
|
60 |
} |
|
61 |
} |
|
62 |
} catch (InterruptedException e) { |
|
63 |
log.error("An error occured on retrieving messages from the blocking queue",e); |
|
64 |
throw new RuntimeException("An error occured on retrieving messages from the blocking queue",e); |
|
65 |
} |
|
66 |
|
|
67 |
} |
|
68 |
}; |
|
69 |
new Thread(r).start(); |
|
70 |
} |
|
71 |
} |
|
72 |
|
|
73 |
public Message getReport(final String workflowId, final String jobName) { |
|
74 |
final String reportId = createReportId(workflowId, jobName); |
|
75 |
return getMessage(reportMessages, reportId); |
|
76 |
} |
|
77 |
|
|
78 |
private Message getMessage(final Map<String, Message> messageMap, String reportId) { |
|
79 |
if (messageMap.containsKey(reportId)) { |
|
80 |
Message m = messageMap.get(reportId); |
|
81 |
messageMap.remove(reportId); |
|
82 |
return m; |
|
83 |
} |
|
84 |
return null; |
|
85 |
} |
|
86 |
|
|
87 |
|
|
88 |
public Message getOnGoingMessages(final String workflowId) { |
|
89 |
return getMessage(onGonginMessages, workflowId); |
|
90 |
} |
|
91 |
|
|
92 |
|
|
93 |
public String getMessageQueueServer() { |
|
94 |
return messageQueueServer; |
|
95 |
} |
|
96 |
|
|
97 |
@Required |
|
98 |
public void setMessageQueueServer(String messageQueueServer) { |
|
99 |
this.messageQueueServer = messageQueueServer; |
|
100 |
} |
|
101 |
|
|
102 |
public String getUsername() { |
|
103 |
return username; |
|
104 |
} |
|
105 |
|
|
106 |
@Required |
|
107 |
public void setUsername(String username) { |
|
108 |
this.username = username; |
|
109 |
} |
|
110 |
|
|
111 |
public String getPassword() { |
|
112 |
return password; |
|
113 |
} |
|
114 |
|
|
115 |
@Required |
|
116 |
public void setPassword(String password) { |
|
117 |
this.password = password; |
|
118 |
} |
|
119 |
} |
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hadoop/SubmitDnetHadoopJobNode.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop; |
|
2 |
|
|
3 |
import com.googlecode.sarasvati.Engine; |
|
4 |
import com.googlecode.sarasvati.NodeToken; |
|
5 |
import com.googlecode.sarasvati.env.Env; |
|
6 |
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob; |
|
7 |
import eu.dnetlib.message.Message; |
|
8 |
import eu.dnetlib.msro.message.DnetMessageManager; |
|
9 |
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode; |
|
10 |
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode; |
|
11 |
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener; |
|
12 |
import eu.dnetlib.msro.workflows.util.ProgressProvider; |
|
13 |
import eu.dnetlib.msro.workflows.util.WorkflowsConstants; |
|
14 |
import org.apache.commons.logging.Log; |
|
15 |
import org.apache.commons.logging.LogFactory; |
|
16 |
import org.springframework.beans.factory.annotation.Autowired; |
|
17 |
|
|
18 |
import java.util.Map; |
|
19 |
|
|
20 |
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode { |
|
21 |
private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class); |
|
22 |
|
|
23 |
@Autowired |
|
24 |
DnetMessageManager dnetMessageManager; |
|
25 |
|
|
26 |
private boolean ongoing = true; |
|
27 |
|
|
28 |
private int currentValue; |
|
29 |
|
|
30 |
private String wfId; |
|
31 |
|
|
32 |
|
|
33 |
|
|
34 |
@Override |
|
35 |
protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception { |
|
36 |
this.wfId =token.getProcess().getEnv().getAttribute("system:processId"); |
|
37 |
job.getParameters().put("wfId",wfId); |
|
38 |
Runnable r = () -> { |
|
39 |
while (ongoing) { |
|
40 |
Message mess = dnetMessageManager.getOnGoingMessages(wfId); |
|
41 |
if (mess!= null && mess.getBody()!= null && mess.getBody().containsKey("progressCount")) { |
|
42 |
try { |
|
43 |
this.currentValue = Integer.parseInt(mess.getBody().get("progressCount")); |
|
44 |
Thread.sleep(1000); |
|
45 |
} catch (Throwable e) { |
|
46 |
|
|
47 |
} |
|
48 |
} |
|
49 |
} |
|
50 |
}; |
|
51 |
|
|
52 |
new Thread(r).start(); |
|
53 |
super.prepareJob(job, token); |
|
54 |
} |
|
55 |
|
|
56 |
|
|
57 |
@Override |
|
58 |
protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) { |
|
59 |
return new BlackboardWorkflowJobListener(engine, token) { |
|
60 |
|
|
61 |
|
|
62 |
@Override |
|
63 |
protected void onFailed(final BlackboardJob job) { |
|
64 |
ongoing = false; |
|
65 |
log.warn("Blackboard workflow node FAILED: " + job.getError()); |
|
66 |
token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true); |
|
67 |
token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError()); |
|
68 |
complete(job, "abort"); |
|
69 |
} |
|
70 |
|
|
71 |
@Override |
|
72 |
protected void populateEnv(Env env, Map<String, String> responseParams) { |
|
73 |
ongoing=false; |
|
74 |
|
|
75 |
Message report = dnetMessageManager.getReport(wfId, "Collection"); |
|
76 |
if (report!= null) |
|
77 |
report.getBody().forEach(env::setAttribute); |
|
78 |
} |
|
79 |
|
|
80 |
|
|
81 |
}; |
|
82 |
} |
|
83 |
|
|
84 |
@Override |
|
85 |
public int getTotalValue() { |
|
86 |
return 0; |
|
87 |
} |
|
88 |
|
|
89 |
@Override |
|
90 |
public int getCurrentValue() { |
|
91 |
return currentValue; |
|
92 |
} |
|
93 |
|
|
94 |
@Override |
|
95 |
public boolean isInaccurate() { |
|
96 |
return false; |
|
97 |
} |
|
98 |
|
|
99 |
@Override |
|
100 |
public ProgressProvider getProgressProvider() { |
|
101 |
return this; |
|
102 |
} |
|
103 |
} |
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/repo-hi/Aggregate_Metadata_from_PubsRepository_Hadoop.xml | ||
---|---|---|
43 | 43 |
<DESCRIPTION>Create MetaWorkflow</DESCRIPTION> |
44 | 44 |
<PARAMETERS> |
45 | 45 |
<PARAM name="wfName" managedBy="system" required="true" |
46 |
type="string">Aggregate Metadata (publications) from PubsRepository [Support]
|
|
46 |
type="string">Aggregate Metadata (publications) from PubsRepository [Hadoop]
|
|
47 | 47 |
</PARAM> |
48 | 48 |
</PARAMETERS> |
49 | 49 |
<ARCS> |
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/message/applicationContext-msro-openaire-message.xml | ||
---|---|---|
1 |
<?xml version="1.0" encoding="UTF-8"?> |
|
2 |
<beans xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" |
|
3 |
xmlns:p="http://www.springframework.org/schema/p" |
|
4 |
xmlns="http://www.springframework.org/schema/beans" |
|
5 |
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd"> |
|
6 |
|
|
7 |
|
|
8 |
<bean id="dnetMessageManager" class="eu.dnetlib.msro.message.DnetMessageManager" |
|
9 |
p:messageQueueServer="${dnet.openaire.messageManager.host}" |
|
10 |
p:username="${dnet.openaire.messageManager.username}" |
|
11 |
p:password="${dnet.openaire.messageManager..password}" |
|
12 |
init-method="startListeningMessage" |
|
13 |
></bean> |
|
14 |
|
|
15 |
|
|
16 |
</beans> |
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/applicationContext-repohi.xml | ||
---|---|---|
56 | 56 |
</property> |
57 | 57 |
</bean> |
58 | 58 |
|
59 |
|
|
60 | 59 |
<bean id="metaWfDataRepository" |
61 | 60 |
class="eu.dnetlib.msro.workflows.metawf.DatasourceMetaWorkflow" |
62 | 61 |
p:destroyWorkflowTemplate="classpath:/eu/dnetlib/msro/openaireplus/workflows/repo-hi/dataRepository/repoBye.wf.st" |
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml | ||
---|---|---|
381 | 381 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.CommitMDStoreVersionJobNode" |
382 | 382 |
p:mdStoreManagerUrl="${dhp.mdstore.manager.url}" scope="prototype" /> |
383 | 383 |
|
384 |
<bean id="wfNodeSubmitDnetHadoopJobNode" |
|
385 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.SubmitDnetHadoopJobNode" |
|
386 |
scope="prototype" /> |
|
387 |
|
|
384 | 388 |
<bean id="wfNodeDeleteMDStoreHadoop" |
385 | 389 |
class="eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.DeleteMDStoreHadoopJobNode" |
386 | 390 |
p:mdStoreManagerUrl="${dhp.mdstore.manager.url}" scope="prototype" /> |
modules/dnet-openaireplus-workflows/branches/dnet-hadoop/pom.xml | ||
---|---|---|
12 | 12 |
<groupId>eu.dnetlib</groupId> |
13 | 13 |
<artifactId>dnet-openaireplus-workflows</artifactId> |
14 | 14 |
<packaging>jar</packaging> |
15 |
<version>6.3.30-solr75-SNAPSHOT</version>
|
|
15 |
<version>7.0.0-hadoop-SNAPSHOT</version>
|
|
16 | 16 |
|
17 | 17 |
<scm> |
18 | 18 |
<developerConnection>scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet45/modules/dnet-openaireplus-workflows/branches/solr75</developerConnection> |
Also available in: Unified diff
- added new Node for submitting hadoop Job and get info using rabbitmq client