Revision 60424
Added by Claudio Atzori about 2 years ago
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/dhp/SubmitDnetHadoopJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp; |
2 | 2 |
|
3 |
import eu.dnetlib.msro.rmi.MSROException; |
|
4 |
import eu.dnetlib.msro.rmi.MSRORuntimeException; |
|
3 | 5 |
import org.apache.commons.logging.Log; |
4 | 6 |
import org.apache.commons.logging.LogFactory; |
5 | 7 |
import org.springframework.beans.factory.annotation.Autowired; |
... | ... | |
15 | 17 |
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener; |
16 | 18 |
import eu.dnetlib.msro.workflows.util.ProgressProvider; |
17 | 19 |
|
20 |
import java.util.Objects; |
|
18 | 21 |
import java.util.Optional; |
19 | 22 |
|
20 | 23 |
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode { |
... | ... | |
74 | 77 |
} |
75 | 78 |
|
76 | 79 |
private BlackboardJob getOozieWorkflowReport(BlackboardJob job) { |
77 |
Optional.ofNullable(dnetMessageManager.findReportMessage(wfId)) |
|
78 |
.map(m -> m.getBody()) |
|
79 |
.ifPresent(map -> job.getParameters().putAll(map)); |
|
80 |
final Message report = dnetMessageManager.findReportMessage(wfId); |
|
81 |
|
|
82 |
if (Objects.isNull(report)) { |
|
83 |
throw new MSRORuntimeException("cannot find report for workflow id: " + wfId); |
|
84 |
} |
|
85 |
report.getBody().forEach((k, v) -> log.info(String.format("%s - %s", k, v))); |
|
86 |
job.getParameters().putAll(report.getBody()); |
|
80 | 87 |
return job; |
81 | 88 |
} |
82 | 89 |
|
Also available in: Unified diff
updated hadoop aggregation workflow reporting