Project

General

Profile

1 57299 sandro.lab
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2
3 60424 claudio.at
import eu.dnetlib.msro.rmi.MSROException;
4
import eu.dnetlib.msro.rmi.MSRORuntimeException;
5 60351 michele.ar
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Autowired;
8
9 57299 sandro.lab
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11 60351 michele.ar
12 60357 michele.ar
import eu.dnetlib.dhp.message.Message;
13 57299 sandro.lab
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
14
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
15
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
16
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
17
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
18
import eu.dnetlib.msro.workflows.util.ProgressProvider;
19
20 60424 claudio.at
import java.util.Objects;
21 60423 claudio.at
import java.util.Optional;
22
23 57299 sandro.lab
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
24
25 60351 michele.ar
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
26 57299 sandro.lab
27 60351 michele.ar
	@Autowired
28
	private DnetMessageManager dnetMessageManager;
29 57299 sandro.lab
30 60351 michele.ar
	private int currentValue = 0;
31 57299 sandro.lab
32 60351 michele.ar
	private int totalValue = 0;
33 57299 sandro.lab
34 60351 michele.ar
	private boolean accurate = false;
35 57299 sandro.lab
36 60351 michele.ar
	private String wfId;
37 57299 sandro.lab
38 60351 michele.ar
	@Override
39
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
40
		this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
41
		super.prepareJob(job, token);
42
	}
43 57299 sandro.lab
44 60351 michele.ar
	private void updateProgressProvider() {
45 60423 claudio.at
		final Message mess = dnetMessageManager.findOngoingMessage(wfId);
46 60351 michele.ar
		if (mess != null && mess.getBody() != null) {
47 60361 michele.ar
			if (mess.getBody().containsKey(Message.CURRENT_PARAM)) {
48 60351 michele.ar
				try {
49 60361 michele.ar
					setCurrentValue(Integer.parseInt(mess.getBody().get(Message.CURRENT_PARAM)));
50 60351 michele.ar
				} catch (final Throwable e) {
51
					log.error("Error parsing value", e);
52
				}
53
			}
54 60386 michele.ar
			if (mess.getBody().containsKey(Message.TOTAL_PARAM)) {
55
				try {
56
					setTotalValue(Integer.parseInt(mess.getBody().get(Message.TOTAL_PARAM)));
57
					setAccurate(true);
58
				} catch (final Throwable e) {
59
					log.error("Error parsing value", e);
60
				}
61 60351 michele.ar
			}
62
		}
63
	}
64 57299 sandro.lab
65 60351 michele.ar
	@Override
66
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
67
		return new BlackboardWorkflowJobListener(engine, token) {
68 57299 sandro.lab
69 60351 michele.ar
			@Override
70 60423 claudio.at
			protected void onDone(BlackboardJob job) {
71
				super.onDone(getOozieWorkflowReport(job));
72
			}
73
74
			@Override
75 60351 michele.ar
			protected void onFailed(final BlackboardJob job) {
76 60423 claudio.at
				super.onFailed(getOozieWorkflowReport(job));
77 60351 michele.ar
			}
78 57299 sandro.lab
79 60423 claudio.at
			private BlackboardJob getOozieWorkflowReport(BlackboardJob job) {
80 60424 claudio.at
				final Message report = dnetMessageManager.findReportMessage(wfId);
81
82
				if (Objects.isNull(report)) {
83
					throw new MSRORuntimeException("cannot find report for workflow id: " + wfId);
84
				}
85
				report.getBody().forEach((k, v) -> log.info(String.format("%s - %s", k, v)));
86
				job.getParameters().putAll(report.getBody());
87 60423 claudio.at
				return job;
88
			}
89
90 60351 michele.ar
		};
91
	}
92 57299 sandro.lab
93 60351 michele.ar
	@Override
94
	public ProgressProvider getProgressProvider() {
95
		return this;
96
	}
97 57299 sandro.lab
98 60351 michele.ar
	public String getWfId() {
99
		return wfId;
100
	}
101 57299 sandro.lab
102 60351 michele.ar
	public void setWfId(final String wfId) {
103
		this.wfId = wfId;
104
	}
105 57299 sandro.lab
106 60351 michele.ar
	public boolean isAccurate() {
107
		return accurate;
108
	}
109 57299 sandro.lab
110 60351 michele.ar
	public void setAccurate(final boolean accurate) {
111
		this.accurate = accurate;
112
	}
113
114
	@Override
115
	public int getCurrentValue() {
116
		updateProgressProvider();
117
		return currentValue;
118
	}
119
120
	public void setCurrentValue(final int currentValue) {
121
		this.currentValue = currentValue;
122
	}
123
124
	@Override
125
	public int getTotalValue() {
126
		return totalValue;
127
	}
128
129
	public void setTotalValue(final int totalValue) {
130
		this.totalValue = totalValue;
131
	}
132
133
	@Override
134
	public boolean isInaccurate() {
135
		return !accurate;
136
	}
137
138
}