Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2

    
3
import java.util.Map;
4

    
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Autowired;
8

    
9
import com.googlecode.sarasvati.Engine;
10
import com.googlecode.sarasvati.NodeToken;
11
import com.googlecode.sarasvati.env.Env;
12

    
13
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
14
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
15
import eu.dnetlib.message.Message;
16
import eu.dnetlib.msro.message.DnetMessageManager;
17
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
18
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
19
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
20
import eu.dnetlib.msro.workflows.util.ProgressProvider;
21
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
22

    
23
public class TransformHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
24

    
25
	private static final Log log = LogFactory.getLog(TransformHadoopJobNode.class);
26

    
27
	@Autowired
28
	DnetMessageManager dnetMessageManager;
29

    
30
	@Autowired
31
	private LocalDatasourceManager<?, ?> dsManager;
32

    
33
	private boolean ongoing = true;
34

    
35
	private int currentValue;
36

    
37
	private String wfId;
38

    
39
	@Override
40
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
41

    
42
		// param 1 : hdfs path from
43
		// param 2 : hdfs path to
44
		// param 3 : transformation rule
45
		// param 4 : nameNode
46

    
47
		final String reading_mdId = token.getEnv().getAttribute("reading_mdId");
48
		final String reading_versionId = token.getEnv().getAttribute("reading_versionId");
49

    
50
		final String writing_mdId = token.getEnv().getAttribute("mdId");
51
		final String writing_versionId = token.getEnv().getAttribute("versionId");
52

    
53
		final String readingHdfsPath = "hdfs://mdstores/" + reading_mdId + "/" + reading_versionId;
54
		final String writingHdfsPath = "hdfs://mdstores/" + writing_mdId + "/" + writing_versionId;
55

    
56
		final String rule = "XXXXXXXX";
57
		final String nameNode = "TransformHadoop";
58

    
59
		wfId = token.getProcess().getEnv().getAttribute("system:processId");
60
		job.getParameters().put("wfId", wfId);
61
		final Runnable runnable = () -> {
62
			while (ongoing) {
63
				final Message mess = dnetMessageManager.getOnGoingMessages(wfId);
64
				if (mess != null && mess.getBody() != null && mess.getBody().containsKey("progressCount")) {
65
					try {
66
						currentValue = Integer.parseInt(mess.getBody().get("progressCount"));
67
						Thread.sleep(1000);
68
					} catch (final Throwable e) {
69
						e.printStackTrace();
70
					}
71
				}
72
			}
73
		};
74

    
75
		job.getParameters().put("readingHdfsPath", readingHdfsPath);
76
		job.getParameters().put("writingHdfsPath", writingHdfsPath);
77
		job.getParameters().put("nameNode", nameNode);
78

    
79
		new Thread(runnable).start();
80
		super.prepareJob(job, token);
81
	}
82

    
83
	@Override
84
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
85
		return new BlackboardWorkflowJobListener(engine, token) {
86

    
87
			@Override
88
			protected void onFailed(final BlackboardJob job) {
89
				ongoing = false;
90
				log.warn("Blackboard workflow node FAILED: " + job.getError());
91
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
92
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
93
				complete(job, "abort");
94
			}
95

    
96
			@Override
97
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
98
				ongoing = false;
99

    
100
				final Message report = dnetMessageManager.getReport(wfId, "Collection");
101
				if (report != null) {
102
					report.getBody().forEach(env::setAttribute);
103
				}
104
			}
105

    
106
		};
107
	}
108

    
109
	@Override
110
	public int getTotalValue() {
111
		return 0;
112
	}
113

    
114
	@Override
115
	public int getCurrentValue() {
116
		return currentValue;
117
	}
118

    
119
	@Override
120
	public boolean isInaccurate() {
121
		return false;
122
	}
123

    
124
	@Override
125
	public ProgressProvider getProgressProvider() {
126
		return this;
127
	}
128
}
(10-10/10)