Project

General

Profile

« Previous | Next » 

Revision 60424

updated hadoop aggregation workflow reporting

View differences:

SubmitDnetHadoopJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2 2

  
3
import eu.dnetlib.msro.rmi.MSROException;
4
import eu.dnetlib.msro.rmi.MSRORuntimeException;
3 5
import org.apache.commons.logging.Log;
4 6
import org.apache.commons.logging.LogFactory;
5 7
import org.springframework.beans.factory.annotation.Autowired;
......
15 17
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
16 18
import eu.dnetlib.msro.workflows.util.ProgressProvider;
17 19

  
20
import java.util.Objects;
18 21
import java.util.Optional;
19 22

  
20 23
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
......
74 77
			}
75 78

  
76 79
			private BlackboardJob getOozieWorkflowReport(BlackboardJob job) {
77
				Optional.ofNullable(dnetMessageManager.findReportMessage(wfId))
78
						.map(m -> m.getBody())
79
						.ifPresent(map -> job.getParameters().putAll(map));
80
				final Message report = dnetMessageManager.findReportMessage(wfId);
81

  
82
				if (Objects.isNull(report)) {
83
					throw new MSRORuntimeException("cannot find report for workflow id: " + wfId);
84
				}
85
				report.getBody().forEach((k, v) -> log.info(String.format("%s - %s", k, v)));
86
				job.getParameters().putAll(report.getBody());
80 87
				return job;
81 88
			}
82 89

  

Also available in: Unified diff