Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

    
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5
import org.springframework.beans.factory.annotation.Autowired;
6

    
7
import com.googlecode.sarasvati.Engine;
8
import com.googlecode.sarasvati.NodeToken;
9

    
10
import eu.dnetlib.dhp.message.Message;
11
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
12
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
13
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
14
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
15
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
16
import eu.dnetlib.msro.workflows.util.ProgressProvider;
17
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
18

    
19
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
20

    
21
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
22

    
23
	@Autowired
24
	private DnetMessageManager dnetMessageManager;
25

    
26
	private int currentValue = 0;
27

    
28
	private int totalValue = 0;
29

    
30
	private boolean accurate = false;
31

    
32
	private String wfId;
33

    
34
	@Override
35
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
36
		this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
37
		super.prepareJob(job, token);
38
	}
39

    
40
	private void updateProgressProvider() {
41
		final Message mess = dnetMessageManager.findMessage(wfId);
42
		if (mess != null && mess.getBody() != null) {
43
			if (mess.getBody().containsKey("current")) {
44
				try {
45
					setCurrentValue(Integer.parseInt(mess.getBody().get("current")));
46
				} catch (final Throwable e) {
47
					log.error("Error parsing value", e);
48
				}
49
			}
50
		}
51
		if (mess.getBody().containsKey("total")) {
52
			try {
53
				setTotalValue(Integer.parseInt(mess.getBody().get("total")));
54
				setAccurate(true);
55
			} catch (final Throwable e) {
56
				log.error("Error parsing value", e);
57
			}
58
		}
59

    
60
	}
61

    
62
	@Override
63
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
64
		return new BlackboardWorkflowJobListener(engine, token) {
65

    
66
			@Override
67
			protected void onFailed(final BlackboardJob job) {
68
				log.warn("Blackboard workflow node FAILED: " + job.getError());
69
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
70
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
71
				complete(job, "abort");
72
			}
73

    
74
		};
75
	}
76

    
77
	@Override
78
	public ProgressProvider getProgressProvider() {
79
		return this;
80
	}
81

    
82
	public String getWfId() {
83
		return wfId;
84
	}
85

    
86
	public void setWfId(final String wfId) {
87
		this.wfId = wfId;
88
	}
89

    
90
	public boolean isAccurate() {
91
		return accurate;
92
	}
93

    
94
	public void setAccurate(final boolean accurate) {
95
		this.accurate = accurate;
96
	}
97

    
98
	@Override
99
	public int getCurrentValue() {
100
		updateProgressProvider();
101
		return currentValue;
102
	}
103

    
104
	public void setCurrentValue(final int currentValue) {
105
		this.currentValue = currentValue;
106
	}
107

    
108
	@Override
109
	public int getTotalValue() {
110
		return totalValue;
111
	}
112

    
113
	public void setTotalValue(final int totalValue) {
114
		this.totalValue = totalValue;
115
	}
116

    
117
	@Override
118
	public boolean isInaccurate() {
119
		return !accurate;
120
	}
121

    
122
}
(11-11/11)