Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

    
3
import eu.dnetlib.msro.rmi.MSROException;
4
import eu.dnetlib.msro.rmi.MSRORuntimeException;
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Autowired;
8

    
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11

    
12
import eu.dnetlib.dhp.message.Message;
13
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
14
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
15
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
16
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
17
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
18
import eu.dnetlib.msro.workflows.util.ProgressProvider;
19

    
20
import java.util.Objects;
21
import java.util.Optional;
22

    
23
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
24

    
25
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
26

    
27
	@Autowired
28
	private DnetMessageManager dnetMessageManager;
29

    
30
	private int currentValue = 0;
31

    
32
	private int totalValue = 0;
33

    
34
	private boolean accurate = false;
35

    
36
	private String wfId;
37

    
38
	@Override
39
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
40
		this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
41
		super.prepareJob(job, token);
42
	}
43

    
44
	private void updateProgressProvider() {
45
		final Message mess = dnetMessageManager.findOngoingMessage(wfId);
46
		if (mess != null && mess.getBody() != null) {
47
			if (mess.getBody().containsKey(Message.CURRENT_PARAM)) {
48
				try {
49
					setCurrentValue(Integer.parseInt(mess.getBody().get(Message.CURRENT_PARAM)));
50
				} catch (final Throwable e) {
51
					log.error("Error parsing value", e);
52
				}
53
			}
54
			if (mess.getBody().containsKey(Message.TOTAL_PARAM)) {
55
				try {
56
					setTotalValue(Integer.parseInt(mess.getBody().get(Message.TOTAL_PARAM)));
57
					setAccurate(true);
58
				} catch (final Throwable e) {
59
					log.error("Error parsing value", e);
60
				}
61
			}
62
		}
63
	}
64

    
65
	@Override
66
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
67
		return new BlackboardWorkflowJobListener(engine, token) {
68

    
69
			@Override
70
			protected void onDone(BlackboardJob job) {
71
				super.onDone(getOozieWorkflowReport(job));
72
			}
73

    
74
			@Override
75
			protected void onFailed(final BlackboardJob job) {
76
				super.onFailed(getOozieWorkflowReport(job));
77
			}
78

    
79
			private BlackboardJob getOozieWorkflowReport(BlackboardJob job) {
80
				final Message report = dnetMessageManager.findReportMessage(wfId);
81

    
82
				if (Objects.isNull(report)) {
83
					final String msg = "cannot find report for workflow id: " + wfId;
84
					log.error(msg);
85
					throw new MSRORuntimeException(msg);
86
				}
87
				report.getBody().forEach((k, v) -> log.info(String.format("%s - %s", k, v)));
88
				job.getParameters().putAll(report.getBody());
89
				return job;
90
			}
91

    
92
		};
93
	}
94

    
95
	@Override
96
	public ProgressProvider getProgressProvider() {
97
		return this;
98
	}
99

    
100
	public String getWfId() {
101
		return wfId;
102
	}
103

    
104
	public void setWfId(final String wfId) {
105
		this.wfId = wfId;
106
	}
107

    
108
	public boolean isAccurate() {
109
		return accurate;
110
	}
111

    
112
	public void setAccurate(final boolean accurate) {
113
		this.accurate = accurate;
114
	}
115

    
116
	@Override
117
	public int getCurrentValue() {
118
		updateProgressProvider();
119
		return currentValue;
120
	}
121

    
122
	public void setCurrentValue(final int currentValue) {
123
		this.currentValue = currentValue;
124
	}
125

    
126
	@Override
127
	public int getTotalValue() {
128
		return totalValue;
129
	}
130

    
131
	public void setTotalValue(final int totalValue) {
132
		this.totalValue = totalValue;
133
	}
134

    
135
	@Override
136
	public boolean isInaccurate() {
137
		return !accurate;
138
	}
139

    
140
}
(11-11/11)