Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

    
3
import java.util.List;
4
import java.util.Map;
5

    
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Autowired;
9

    
10
import com.googlecode.sarasvati.Engine;
11
import com.googlecode.sarasvati.NodeToken;
12
import com.googlecode.sarasvati.env.Env;
13

    
14
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
15
import eu.dnetlib.message.Message;
16
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
17
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
18
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
19
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
20
import eu.dnetlib.msro.workflows.util.ProgressProvider;
21
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
22

    
23
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
24

    
25
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
26

    
27
	@Autowired
28
	private DnetMessageManager dnetMessageManager;
29

    
30
	private int currentValue = 0;
31

    
32
	private int totalValue = 0;
33

    
34
	private boolean accurate = false;
35

    
36
	private String wfId;
37

    
38
	@Override
39
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
40
		this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
41
		super.prepareJob(job, token);
42
	}
43

    
44
	private void updateProgressProvider() {
45
		final Message mess = dnetMessageManager.getOnGoingMessage(wfId);
46
		if (mess != null && mess.getBody() != null) {
47
			if (mess.getBody().containsKey("current")) {
48
				try {
49
					setCurrentValue(Integer.parseInt(mess.getBody().get("current")));
50
				} catch (final Throwable e) {
51
					log.error("Error parsing value", e);
52
				}
53
			}
54
		}
55
		if (mess.getBody().containsKey("total")) {
56
			try {
57
				setTotalValue(Integer.parseInt(mess.getBody().get("total")));
58
				setAccurate(true);
59
			} catch (final Throwable e) {
60
				log.error("Error parsing value", e);
61
			}
62
		}
63

    
64
	}
65

    
66
	@Override
67
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
68
		return new BlackboardWorkflowJobListener(engine, token) {
69

    
70
			@Override
71
			protected void onFailed(final BlackboardJob job) {
72
				log.warn("Blackboard workflow node FAILED: " + job.getError());
73
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
74
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
75
				complete(job, "abort");
76
			}
77

    
78
			@Override
79
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
80
				List<Message> reports = dnetMessageManager.getReports(wfId, true);
81
				if (reports.isEmpty()) {
82
					int numberOftries = 0;
83
					try {
84
						while (reports.isEmpty() && numberOftries < 3) {
85
							reports = dnetMessageManager.getReports(wfId, true);
86
							Thread.sleep(3000 * numberOftries++);
87
						}
88
					} catch (final InterruptedException e) {
89
						log.error("Error on waiting report", e);
90
					}
91
				}
92

    
93
				if (reports.isEmpty()) { throw new RuntimeException("Unable to get report for WorklowId " + wfId); }
94

    
95
				reports.forEach(it -> it.getBody().forEach(env::setAttribute));
96
			}
97
		};
98
	}
99

    
100
	@Override
101
	public ProgressProvider getProgressProvider() {
102
		return this;
103
	}
104

    
105
	public String getWfId() {
106
		return wfId;
107
	}
108

    
109
	public void setWfId(final String wfId) {
110
		this.wfId = wfId;
111
	}
112

    
113
	public boolean isAccurate() {
114
		return accurate;
115
	}
116

    
117
	public void setAccurate(final boolean accurate) {
118
		this.accurate = accurate;
119
	}
120

    
121
	@Override
122
	public int getCurrentValue() {
123
		updateProgressProvider();
124
		return currentValue;
125
	}
126

    
127
	public void setCurrentValue(final int currentValue) {
128
		this.currentValue = currentValue;
129
	}
130

    
131
	@Override
132
	public int getTotalValue() {
133
		return totalValue;
134
	}
135

    
136
	public void setTotalValue(final int totalValue) {
137
		this.totalValue = totalValue;
138
	}
139

    
140
	@Override
141
	public boolean isInaccurate() {
142
		return !accurate;
143
	}
144

    
145
}
(11-11/11)