Project

General

Profile

1 57299 sandro.lab
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2
3 60351 michele.ar
import java.util.List;
4
import java.util.Map;
5
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Autowired;
9
10 57299 sandro.lab
import com.googlecode.sarasvati.Engine;
11
import com.googlecode.sarasvati.NodeToken;
12
import com.googlecode.sarasvati.env.Env;
13 60351 michele.ar
14 60357 michele.ar
import eu.dnetlib.dhp.message.Message;
15 57299 sandro.lab
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
16
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
17
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
18
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
19
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
20
import eu.dnetlib.msro.workflows.util.ProgressProvider;
21
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
22
23
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
24
25 60351 michele.ar
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
26 57299 sandro.lab
27 60351 michele.ar
	@Autowired
28
	private DnetMessageManager dnetMessageManager;
29 57299 sandro.lab
30 60351 michele.ar
	private int currentValue = 0;
31 57299 sandro.lab
32 60351 michele.ar
	private int totalValue = 0;
33 57299 sandro.lab
34 60351 michele.ar
	private boolean accurate = false;
35 57299 sandro.lab
36 60351 michele.ar
	private String wfId;
37 57299 sandro.lab
38 60351 michele.ar
	@Override
39
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
40
		this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
41
		super.prepareJob(job, token);
42
	}
43 57299 sandro.lab
44 60351 michele.ar
	private void updateProgressProvider() {
45 60357 michele.ar
		final Message mess = dnetMessageManager.findMessage(wfId);
46 60351 michele.ar
		if (mess != null && mess.getBody() != null) {
47
			if (mess.getBody().containsKey("current")) {
48
				try {
49
					setCurrentValue(Integer.parseInt(mess.getBody().get("current")));
50
				} catch (final Throwable e) {
51
					log.error("Error parsing value", e);
52
				}
53
			}
54
		}
55
		if (mess.getBody().containsKey("total")) {
56
			try {
57
				setTotalValue(Integer.parseInt(mess.getBody().get("total")));
58
				setAccurate(true);
59
			} catch (final Throwable e) {
60
				log.error("Error parsing value", e);
61
			}
62
		}
63 57299 sandro.lab
64 60351 michele.ar
	}
65 57299 sandro.lab
66 60351 michele.ar
	@Override
67
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
68
		return new BlackboardWorkflowJobListener(engine, token) {
69 57299 sandro.lab
70 60351 michele.ar
			@Override
71
			protected void onFailed(final BlackboardJob job) {
72
				log.warn("Blackboard workflow node FAILED: " + job.getError());
73
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
74
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
75
				complete(job, "abort");
76
			}
77 57299 sandro.lab
78 60351 michele.ar
			@Override
79
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
80
				List<Message> reports = dnetMessageManager.getReports(wfId, true);
81
				if (reports.isEmpty()) {
82
					int numberOftries = 0;
83
					try {
84
						while (reports.isEmpty() && numberOftries < 3) {
85
							reports = dnetMessageManager.getReports(wfId, true);
86
							Thread.sleep(3000 * numberOftries++);
87
						}
88
					} catch (final InterruptedException e) {
89
						log.error("Error on waiting report", e);
90
					}
91
				}
92 57299 sandro.lab
93 60351 michele.ar
				if (reports.isEmpty()) { throw new RuntimeException("Unable to get report for WorklowId " + wfId); }
94 57299 sandro.lab
95 60351 michele.ar
				reports.forEach(it -> it.getBody().forEach(env::setAttribute));
96
			}
97
		};
98
	}
99 57299 sandro.lab
100 60351 michele.ar
	@Override
101
	public ProgressProvider getProgressProvider() {
102
		return this;
103
	}
104 57299 sandro.lab
105 60351 michele.ar
	public String getWfId() {
106
		return wfId;
107
	}
108 57299 sandro.lab
109 60351 michele.ar
	public void setWfId(final String wfId) {
110
		this.wfId = wfId;
111
	}
112 57299 sandro.lab
113 60351 michele.ar
	public boolean isAccurate() {
114
		return accurate;
115
	}
116 57299 sandro.lab
117 60351 michele.ar
	public void setAccurate(final boolean accurate) {
118
		this.accurate = accurate;
119
	}
120
121
	@Override
122
	public int getCurrentValue() {
123
		updateProgressProvider();
124
		return currentValue;
125
	}
126
127
	public void setCurrentValue(final int currentValue) {
128
		this.currentValue = currentValue;
129
	}
130
131
	@Override
132
	public int getTotalValue() {
133
		return totalValue;
134
	}
135
136
	public void setTotalValue(final int totalValue) {
137
		this.totalValue = totalValue;
138
	}
139
140
	@Override
141
	public boolean isInaccurate() {
142
		return !accurate;
143
	}
144
145
}