Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

    
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5
import org.springframework.beans.factory.annotation.Autowired;
6

    
7
import com.googlecode.sarasvati.Engine;
8
import com.googlecode.sarasvati.NodeToken;
9

    
10
import eu.dnetlib.dhp.message.Message;
11
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
12
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
13
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
14
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
15
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
16
import eu.dnetlib.msro.workflows.util.ProgressProvider;
17

    
18
import java.util.Objects;
19

    
20
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
21

    
22
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
23

    
24
	@Autowired
25
	private DnetMessageManager dnetMessageManager;
26

    
27
	private int currentValue = 0;
28

    
29
	private int totalValue = 0;
30

    
31
	private boolean accurate = false;
32

    
33
	private String wfId;
34

    
35
	@Override
36
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
37
		this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
38
		super.prepareJob(job, token);
39
	}
40

    
41
	private void updateProgressProvider() {
42
		final Message mess = dnetMessageManager.findOngoingMessage(wfId);
43
		if (mess != null && mess.getBody() != null) {
44
			if (mess.getBody().containsKey(Message.CURRENT_PARAM)) {
45
				try {
46
					setCurrentValue(Integer.parseInt(mess.getBody().get(Message.CURRENT_PARAM)));
47
				} catch (final Throwable e) {
48
					log.error("Error parsing value", e);
49
				}
50
			}
51
			if (mess.getBody().containsKey(Message.TOTAL_PARAM)) {
52
				try {
53
					setTotalValue(Integer.parseInt(mess.getBody().get(Message.TOTAL_PARAM)));
54
					setAccurate(true);
55
				} catch (final Throwable e) {
56
					log.error("Error parsing value", e);
57
				}
58
			}
59
		}
60
	}
61

    
62
	@Override
63
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
64
		return new BlackboardWorkflowJobListener(engine, token) {
65

    
66
			@Override
67
			protected void onDone(BlackboardJob job) {
68
				super.onDone(getOozieWorkflowReport(job));
69
			}
70

    
71
			@Override
72
			protected void onFailed(final BlackboardJob job) {
73
				super.onFailed(getOozieWorkflowReport(job));
74
			}
75

    
76
			private BlackboardJob getOozieWorkflowReport(BlackboardJob job) {
77
				final Message report = dnetMessageManager.findReportMessage(wfId);
78

    
79
				if (Objects.isNull(report)) {
80
					log.error("cannot find report for workflow id: " + wfId);
81
				} else {
82
					report.getBody().forEach((k, v) -> log.info(String.format("%s - %s", k, v)));
83
					job.getParameters().putAll(report.getBody());
84
				}
85
				return job;
86
			}
87

    
88
		};
89
	}
90

    
91
	@Override
92
	public ProgressProvider getProgressProvider() {
93
		return this;
94
	}
95

    
96
	public String getWfId() {
97
		return wfId;
98
	}
99

    
100
	public void setWfId(final String wfId) {
101
		this.wfId = wfId;
102
	}
103

    
104
	public boolean isAccurate() {
105
		return accurate;
106
	}
107

    
108
	public void setAccurate(final boolean accurate) {
109
		this.accurate = accurate;
110
	}
111

    
112
	@Override
113
	public int getCurrentValue() {
114
		updateProgressProvider();
115
		return currentValue;
116
	}
117

    
118
	public void setCurrentValue(final int currentValue) {
119
		this.currentValue = currentValue;
120
	}
121

    
122
	@Override
123
	public int getTotalValue() {
124
		return totalValue;
125
	}
126

    
127
	public void setTotalValue(final int totalValue) {
128
		this.totalValue = totalValue;
129
	}
130

    
131
	@Override
132
	public boolean isInaccurate() {
133
		return !accurate;
134
	}
135

    
136
}
(11-11/11)