Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2

    
3
import com.googlecode.sarasvati.Engine;
4
import com.googlecode.sarasvati.NodeToken;
5
import com.googlecode.sarasvati.env.Env;
6
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
7
import eu.dnetlib.message.Message;
8
import eu.dnetlib.msro.message.DnetMessageManager;
9
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
10
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12
import eu.dnetlib.msro.workflows.util.ProgressProvider;
13
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17

    
18
import java.util.Map;
19

    
20
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
21
    private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
22

    
23
    @Autowired
24
    DnetMessageManager  dnetMessageManager;
25

    
26
    private boolean ongoing = true;
27

    
28
    private int currentValue;
29

    
30
    private String wfId;
31

    
32

    
33

    
34
    @Override
35
    protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
36
        this.wfId =token.getProcess().getEnv().getAttribute("system:processId");
37
        job.getParameters().put("wfId",wfId);
38
        Runnable r = () -> {
39
          while (ongoing) {
40
              Message mess = dnetMessageManager.getOnGoingMessages(wfId);
41
              if (mess!= null && mess.getBody()!= null && mess.getBody().containsKey("progressCount")) {
42
                  try {
43
                      this.currentValue = Integer.parseInt(mess.getBody().get("progressCount"));
44
                      Thread.sleep(1000);
45
                  } catch (Throwable e) {
46

    
47
                  }
48
              }
49
          }
50
        };
51

    
52
        new Thread(r).start();
53
        super.prepareJob(job, token);
54
    }
55

    
56

    
57
    @Override
58
    protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
59
        return new BlackboardWorkflowJobListener(engine, token) {
60

    
61

    
62
            @Override
63
            protected void onFailed(final BlackboardJob job) {
64
                ongoing = false;
65
                log.warn("Blackboard workflow node FAILED: " + job.getError());
66
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
67
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
68
                complete(job, "abort");
69
            }
70

    
71
            @Override
72
            protected void populateEnv(Env env, Map<String, String> responseParams) {
73
                ongoing=false;
74

    
75
                Message report = dnetMessageManager.getReport(wfId, "Collection");
76
                if (report!= null)
77
                    report.getBody().forEach(env::setAttribute);
78
            }
79

    
80

    
81
        };
82
    }
83

    
84
    @Override
85
    public int getTotalValue() {
86
        return 0;
87
    }
88

    
89
    @Override
90
    public int getCurrentValue() {
91
        return currentValue;
92
    }
93

    
94
    @Override
95
    public boolean isInaccurate() {
96
        return false;
97
    }
98

    
99
    @Override
100
    public ProgressProvider getProgressProvider() {
101
        return this;
102
    }
103
}
(10-10/11)