Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2

    
3
import com.googlecode.sarasvati.Engine;
4
import com.googlecode.sarasvati.NodeToken;
5
import com.googlecode.sarasvati.env.Env;
6
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
7
import eu.dnetlib.message.Message;
8
import eu.dnetlib.msro.message.DnetMessageManager;
9
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
10
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12
import eu.dnetlib.msro.workflows.util.ProgressProvider;
13
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17

    
18
import java.util.List;
19
import java.util.Map;
20

    
21
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
22
    private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
23

    
24
    @Autowired
25
    DnetMessageManager  dnetMessageManager;
26

    
27
    private boolean ongoing = true;
28

    
29
    private int currentValue;
30

    
31
    private String wfId;
32

    
33

    
34

    
35
    @Override
36
    protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
37
        this.wfId =token.getProcess().getEnv().getAttribute("system:processId");
38

    
39

    
40
        Runnable r = () -> {
41
          while (ongoing) {
42
              Message mess = dnetMessageManager.getOnGoingMessages(wfId);
43
              if (mess!= null && mess.getBody()!= null && mess.getBody().containsKey("ongoing")) {
44
                  try {
45
                      this.currentValue = Integer.parseInt(mess.getBody().get("ongoing"));
46
                      Thread.sleep(1000);
47
                  } catch (Throwable e) {
48
                    log.error("Error ono receiving messages ", e);
49
                  }
50
              }
51
          }
52
        };
53
        new Thread(r).start();
54
        super.prepareJob(job, token);
55
    }
56

    
57

    
58
    @Override
59
    protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
60
        return new BlackboardWorkflowJobListener(engine, token) {
61

    
62

    
63
            @Override
64
            protected void onFailed(final BlackboardJob job) {
65
                ongoing = false;
66
                log.warn("Blackboard workflow node FAILED: " + job.getError());
67
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
68
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
69
                complete(job, "abort");
70
            }
71

    
72
            @Override
73
            protected void populateEnv(Env env, Map<String, String> responseParams) {
74
                ongoing = false;
75
                List<Message> reports = dnetMessageManager.getReport(wfId);
76
                if (reports != null)
77
                    reports.forEach(it -> it.getBody().forEach(env::setAttribute));
78
            }
79
        };
80
    }
81

    
82
    @Override
83
    public int getTotalValue() {
84
        return 0;
85
    }
86

    
87
    @Override
88
    public int getCurrentValue() {
89
        return currentValue;
90
    }
91

    
92
    @Override
93
    public boolean isInaccurate() {
94
        return false;
95
    }
96

    
97
    @Override
98
    public ProgressProvider getProgressProvider() {
99
        return this;
100
    }
101
}
(10-10/11)