1 |
55168
|
enrico.ott
|
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
|
2 |
|
|
|
3 |
55240
|
enrico.ott
|
import java.util.Map;
|
4 |
|
|
|
5 |
55168
|
enrico.ott
|
import org.apache.commons.logging.Log;
|
6 |
|
|
import org.apache.commons.logging.LogFactory;
|
7 |
55240
|
enrico.ott
|
import org.springframework.beans.factory.annotation.Autowired;
|
8 |
55168
|
enrico.ott
|
|
9 |
55240
|
enrico.ott
|
import com.googlecode.sarasvati.Engine;
|
10 |
55168
|
enrico.ott
|
import com.googlecode.sarasvati.NodeToken;
|
11 |
55240
|
enrico.ott
|
import com.googlecode.sarasvati.env.Env;
|
12 |
55168
|
enrico.ott
|
|
13 |
55240
|
enrico.ott
|
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
|
14 |
|
|
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
|
15 |
|
|
import eu.dnetlib.message.Message;
|
16 |
|
|
import eu.dnetlib.msro.message.DnetMessageManager;
|
17 |
|
|
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
|
18 |
|
|
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
|
19 |
|
|
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
|
20 |
|
|
import eu.dnetlib.msro.workflows.util.ProgressProvider;
|
21 |
|
|
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
|
22 |
55168
|
enrico.ott
|
|
23 |
55240
|
enrico.ott
|
public class TransformHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
|
24 |
55168
|
enrico.ott
|
|
25 |
|
|
private static final Log log = LogFactory.getLog(TransformHadoopJobNode.class);
|
26 |
|
|
|
27 |
55240
|
enrico.ott
|
@Autowired
|
28 |
|
|
DnetMessageManager dnetMessageManager;
|
29 |
|
|
|
30 |
|
|
@Autowired
|
31 |
|
|
private LocalDatasourceManager<?, ?> dsManager;
|
32 |
|
|
|
33 |
|
|
private boolean ongoing = true;
|
34 |
|
|
|
35 |
|
|
private int currentValue;
|
36 |
|
|
|
37 |
|
|
private String wfId;
|
38 |
|
|
|
39 |
55168
|
enrico.ott
|
@Override
|
40 |
55240
|
enrico.ott
|
protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
|
41 |
55168
|
enrico.ott
|
|
42 |
|
|
// param 1 : hdfs path from
|
43 |
|
|
// param 2 : hdfs path to
|
44 |
|
|
// param 3 : transformation rule
|
45 |
|
|
// param 4 : nameNode
|
46 |
|
|
|
47 |
|
|
final String reading_mdId = token.getEnv().getAttribute("reading_mdId");
|
48 |
|
|
final String reading_versionId = token.getEnv().getAttribute("reading_versionId");
|
49 |
|
|
|
50 |
|
|
final String writing_mdId = token.getEnv().getAttribute("mdId");
|
51 |
|
|
final String writing_versionId = token.getEnv().getAttribute("versionId");
|
52 |
|
|
|
53 |
|
|
final String readingHdfsPath = "hdfs://mdstores/" + reading_mdId + "/" + reading_versionId;
|
54 |
|
|
final String writingHdfsPath = "hdfs://mdstores/" + writing_mdId + "/" + writing_versionId;
|
55 |
|
|
|
56 |
|
|
final String rule = "XXXXXXXX";
|
57 |
55240
|
enrico.ott
|
final String nameNode = "TransformHadoop";
|
58 |
55168
|
enrico.ott
|
|
59 |
55240
|
enrico.ott
|
wfId = token.getProcess().getEnv().getAttribute("system:processId");
|
60 |
|
|
job.getParameters().put("wfId", wfId);
|
61 |
|
|
final Runnable runnable = () -> {
|
62 |
|
|
while (ongoing) {
|
63 |
|
|
final Message mess = dnetMessageManager.getOnGoingMessages(wfId);
|
64 |
|
|
if (mess != null && mess.getBody() != null && mess.getBody().containsKey("progressCount")) {
|
65 |
|
|
try {
|
66 |
|
|
currentValue = Integer.parseInt(mess.getBody().get("progressCount"));
|
67 |
|
|
Thread.sleep(1000);
|
68 |
|
|
} catch (final Throwable e) {
|
69 |
|
|
e.printStackTrace();
|
70 |
|
|
}
|
71 |
|
|
}
|
72 |
|
|
}
|
73 |
|
|
};
|
74 |
55168
|
enrico.ott
|
|
75 |
55240
|
enrico.ott
|
job.getParameters().put("readingHdfsPath", readingHdfsPath);
|
76 |
|
|
job.getParameters().put("writingHdfsPath", writingHdfsPath);
|
77 |
|
|
job.getParameters().put("nameNode", nameNode);
|
78 |
55168
|
enrico.ott
|
|
79 |
55240
|
enrico.ott
|
new Thread(runnable).start();
|
80 |
|
|
super.prepareJob(job, token);
|
81 |
|
|
}
|
82 |
55168
|
enrico.ott
|
|
83 |
55240
|
enrico.ott
|
@Override
|
84 |
|
|
protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
|
85 |
|
|
return new BlackboardWorkflowJobListener(engine, token) {
|
86 |
55168
|
enrico.ott
|
|
87 |
55240
|
enrico.ott
|
@Override
|
88 |
|
|
protected void onFailed(final BlackboardJob job) {
|
89 |
|
|
ongoing = false;
|
90 |
|
|
log.warn("Blackboard workflow node FAILED: " + job.getError());
|
91 |
|
|
token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
|
92 |
|
|
token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
|
93 |
|
|
complete(job, "abort");
|
94 |
|
|
}
|
95 |
55168
|
enrico.ott
|
|
96 |
55240
|
enrico.ott
|
@Override
|
97 |
|
|
protected void populateEnv(final Env env, final Map<String, String> responseParams) {
|
98 |
|
|
ongoing = false;
|
99 |
|
|
|
100 |
|
|
final Message report = dnetMessageManager.getReport(wfId, "Collection");
|
101 |
|
|
if (report != null) {
|
102 |
|
|
report.getBody().forEach(env::setAttribute);
|
103 |
|
|
}
|
104 |
|
|
}
|
105 |
|
|
|
106 |
|
|
};
|
107 |
55168
|
enrico.ott
|
}
|
108 |
|
|
|
109 |
55240
|
enrico.ott
|
@Override
|
110 |
|
|
public int getTotalValue() {
|
111 |
|
|
return 0;
|
112 |
|
|
}
|
113 |
|
|
|
114 |
|
|
@Override
|
115 |
|
|
public int getCurrentValue() {
|
116 |
|
|
return currentValue;
|
117 |
|
|
}
|
118 |
|
|
|
119 |
|
|
@Override
|
120 |
|
|
public boolean isInaccurate() {
|
121 |
|
|
return false;
|
122 |
|
|
}
|
123 |
|
|
|
124 |
|
|
@Override
|
125 |
|
|
public ProgressProvider getProgressProvider() {
|
126 |
|
|
return this;
|
127 |
|
|
}
|
128 |
55168
|
enrico.ott
|
}
|