Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2

    
3
import java.util.Map;
4
import java.util.Optional;
5
import java.util.stream.Collectors;
6

    
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Autowired;
10

    
11
import com.google.gson.Gson;
12
import com.googlecode.sarasvati.Engine;
13
import com.googlecode.sarasvati.NodeToken;
14
import com.googlecode.sarasvati.env.Env;
15

    
16
import eu.dnetlib.collector.worker.model.ApiDescriptor;
17
import eu.dnetlib.enabling.datasources.common.ApiParam;
18
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
19
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
20
import eu.dnetlib.message.Message;
21
import eu.dnetlib.msro.message.DnetMessageManager;
22
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
23
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
24
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
25
import eu.dnetlib.msro.workflows.util.ProgressProvider;
26
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
27

    
28
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
29

    
30
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
31

    
32
	@Autowired
33
	DnetMessageManager dnetMessageManager;
34

    
35
	@Autowired
36
	private LocalDatasourceManager<?, ?> dsManager;
37

    
38
	private boolean ongoing = true;
39

    
40
	private int currentValue;
41

    
42
	private String wfId;
43

    
44
	@Override
45
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
46

    
47
		final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
48
		final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
49
		final String mdId = token.getEnv().getAttribute("mdId");
50
		final String versionId = token.getEnv().getAttribute("versionId");
51

    
52
		final Optional<ApiDescriptor> opt = dsManager.getApis(dsId).stream().filter(a -> a.getId().equals(apiId)).map(a -> {
53
			final ApiDescriptor res = new ApiDescriptor();
54
			res.setBaseUrl(a.getBaseurl());
55
			res.setId(a.getId());
56
			res.setProtocol(a.getProtocol());
57
			res.getParams().putAll(a.getApiParams().stream().map(o -> {
58
				return (ApiParam) o;
59
			}).collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
60
			return res;
61
		}).findFirst();
62

    
63
		if (!opt.isPresent()) { return; }
64

    
65
		final ApiDescriptor api = opt.get();
66
		final String hdfsPath = "hdfs://mdstores/" + mdId + "/" + versionId;
67
		final String nameNode = "SubmitDnetHadoop";
68

    
69
		log.info("Collection param 1 : hdfsPath = " + hdfsPath);
70
		log.info("Collection param 2 : api descriptor json = " + new Gson().toJson(api));
71
		log.info("Collection param 3 : nameNode = " + nameNode);
72

    
73
		wfId = token.getProcess().getEnv().getAttribute("system:processId");
74
		job.getParameters().put("wfId", wfId);
75
		final Runnable runnable = () -> {
76
			while (ongoing) {
77
				final Message mess = dnetMessageManager.getOnGoingMessages(wfId);
78
				if (mess != null && mess.getBody() != null && mess.getBody().containsKey("progressCount")) {
79
					try {
80
						currentValue = Integer.parseInt(mess.getBody().get("progressCount"));
81
						Thread.sleep(1000);
82
					} catch (final Throwable e) {
83
						e.printStackTrace();
84
					}
85
				}
86
			}
87
		};
88

    
89
		job.getParameters().put("hdfsPath", hdfsPath);
90
		job.getParameters().put("nameNode", nameNode);
91
		job.getParameters().put("api", new Gson().toJson(api));
92

    
93
		new Thread(runnable).start();
94
		super.prepareJob(job, token);
95
	}
96

    
97
	@Override
98
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
99
		return new BlackboardWorkflowJobListener(engine, token) {
100

    
101
			@Override
102
			protected void onFailed(final BlackboardJob job) {
103
				ongoing = false;
104
				log.warn("Blackboard workflow node FAILED: " + job.getError());
105
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
106
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
107
				complete(job, "abort");
108
			}
109

    
110
			@Override
111
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
112
				ongoing = false;
113

    
114
				final Message report = dnetMessageManager.getReport(wfId, "Collection");
115
				if (report != null) {
116
					report.getBody().forEach(env::setAttribute);
117
				}
118
			}
119

    
120
		};
121
	}
122

    
123
	@Override
124
	public int getTotalValue() {
125
		return 0;
126
	}
127

    
128
	@Override
129
	public int getCurrentValue() {
130
		return currentValue;
131
	}
132

    
133
	@Override
134
	public boolean isInaccurate() {
135
		return false;
136
	}
137

    
138
	@Override
139
	public ProgressProvider getProgressProvider() {
140
		return this;
141
	}
142
}
(9-9/10)