Revision 60423
Added by Claudio Atzori about 3 years ago
SubmitDnetHadoopJobNode.java | ||
---|---|---|
14 | 14 |
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode; |
15 | 15 |
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener; |
16 | 16 |
import eu.dnetlib.msro.workflows.util.ProgressProvider; |
17 |
import eu.dnetlib.msro.workflows.util.WorkflowsConstants; |
|
18 | 17 |
|
18 |
import java.util.Optional; |
|
19 |
|
|
19 | 20 |
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode { |
20 | 21 |
|
21 | 22 |
private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class); |
... | ... | |
38 | 39 |
} |
39 | 40 |
|
40 | 41 |
private void updateProgressProvider() { |
41 |
final Message mess = dnetMessageManager.findMessage(wfId); |
|
42 |
final Message mess = dnetMessageManager.findOngoingMessage(wfId);
|
|
42 | 43 |
if (mess != null && mess.getBody() != null) { |
43 | 44 |
if (mess.getBody().containsKey(Message.CURRENT_PARAM)) { |
44 | 45 |
try { |
... | ... | |
63 | 64 |
return new BlackboardWorkflowJobListener(engine, token) { |
64 | 65 |
|
65 | 66 |
@Override |
67 |
protected void onDone(BlackboardJob job) { |
|
68 |
super.onDone(getOozieWorkflowReport(job)); |
|
69 |
} |
|
70 |
|
|
71 |
@Override |
|
66 | 72 |
protected void onFailed(final BlackboardJob job) { |
67 |
log.warn("Blackboard workflow node FAILED: " + job.getError()); |
|
68 |
token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true); |
|
69 |
token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError()); |
|
70 |
complete(job, "abort"); |
|
73 |
super.onFailed(getOozieWorkflowReport(job)); |
|
71 | 74 |
} |
72 | 75 |
|
76 |
private BlackboardJob getOozieWorkflowReport(BlackboardJob job) { |
|
77 |
Optional.ofNullable(dnetMessageManager.findReportMessage(wfId)) |
|
78 |
.map(m -> m.getBody()) |
|
79 |
.ifPresent(map -> job.getParameters().putAll(map)); |
|
80 |
return job; |
|
81 |
} |
|
82 |
|
|
73 | 83 |
}; |
74 | 84 |
} |
75 | 85 |
|
Also available in: Unified diff
updated hadoop aggregation workflow reporting