Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

    
3
import eu.dnetlib.msro.rmi.MSRORuntimeException;
4
import org.apache.commons.logging.Log;
5
import org.apache.commons.logging.LogFactory;
6
import org.springframework.beans.factory.annotation.Autowired;
7

    
8
import com.googlecode.sarasvati.Engine;
9
import com.googlecode.sarasvati.NodeToken;
10

    
11
import eu.dnetlib.dhp.message.Message;
12
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
13
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
14
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
15
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
16
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
17
import eu.dnetlib.msro.workflows.util.ProgressProvider;
18

    
19
import java.util.Objects;
20

    
21
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
22

    
23
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
24

    
25
	@Autowired
26
	private DnetMessageManager dnetMessageManager;
27

    
28
	private int currentValue = 0;
29

    
30
	private int totalValue = 0;
31

    
32
	private boolean accurate = false;
33

    
34
	private String wfId;
35

    
36
	@Override
37
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
38
		this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
39
		super.prepareJob(job, token);
40
	}
41

    
42
	private void updateProgressProvider() {
43
		final Message mess = dnetMessageManager.findOngoingMessage(wfId);
44
		if (mess != null && mess.getBody() != null) {
45
			if (mess.getBody().containsKey(Message.CURRENT_PARAM)) {
46
				try {
47
					setCurrentValue(Integer.parseInt(mess.getBody().get(Message.CURRENT_PARAM)));
48
				} catch (final Throwable e) {
49
					log.error("Error parsing value", e);
50
				}
51
			}
52
			if (mess.getBody().containsKey(Message.TOTAL_PARAM)) {
53
				try {
54
					setTotalValue(Integer.parseInt(mess.getBody().get(Message.TOTAL_PARAM)));
55
					setAccurate(true);
56
				} catch (final Throwable e) {
57
					log.error("Error parsing value", e);
58
				}
59
			}
60
		}
61
	}
62

    
63
	@Override
64
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
65
		return new BlackboardWorkflowJobListener(engine, token) {
66

    
67
			@Override
68
			protected void onDone(BlackboardJob job) {
69
				super.onDone(getOozieWorkflowReport(job));
70
			}
71

    
72
			@Override
73
			protected void onFailed(final BlackboardJob job) {
74
				super.onFailed(getOozieWorkflowReport(job));
75
			}
76

    
77
			private BlackboardJob getOozieWorkflowReport(BlackboardJob job) {
78
				final Message report = dnetMessageManager.findReportMessage(wfId);
79

    
80
				if (Objects.isNull(report)) {
81
					log.error("cannot find report for workflow id: " + wfId);
82
				} else {
83
					report.getBody().forEach((k, v) -> log.info(String.format("%s - %s", k, v)));
84
					job.getParameters().putAll(report.getBody());
85
				}
86
				return job;
87
			}
88

    
89
		};
90
	}
91

    
92
	@Override
93
	public ProgressProvider getProgressProvider() {
94
		return this;
95
	}
96

    
97
	public String getWfId() {
98
		return wfId;
99
	}
100

    
101
	public void setWfId(final String wfId) {
102
		this.wfId = wfId;
103
	}
104

    
105
	public boolean isAccurate() {
106
		return accurate;
107
	}
108

    
109
	public void setAccurate(final boolean accurate) {
110
		this.accurate = accurate;
111
	}
112

    
113
	@Override
114
	public int getCurrentValue() {
115
		updateProgressProvider();
116
		return currentValue;
117
	}
118

    
119
	public void setCurrentValue(final int currentValue) {
120
		this.currentValue = currentValue;
121
	}
122

    
123
	@Override
124
	public int getTotalValue() {
125
		return totalValue;
126
	}
127

    
128
	public void setTotalValue(final int totalValue) {
129
		this.totalValue = totalValue;
130
	}
131

    
132
	@Override
133
	public boolean isInaccurate() {
134
		return !accurate;
135
	}
136

    
137
}
(11-11/11)