Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2

    
3
import com.googlecode.sarasvati.Engine;
4
import com.googlecode.sarasvati.NodeToken;
5
import com.googlecode.sarasvati.env.Env;
6
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
7
import eu.dnetlib.message.Message;
8
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
9
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
10
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12
import eu.dnetlib.msro.workflows.util.ProgressProvider;
13
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17

    
18
import java.util.List;
19
import java.util.Map;
20

    
21
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
22

    
23
    private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
24

    
25
    @Autowired
26
    DnetMessageManager dnetMessageManager;
27

    
28
    private boolean ongoing = true;
29

    
30
    private int currentValue;
31

    
32
    private String wfId;
33

    
34

    
35
    @Override
36
    protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
37
        this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
38

    
39

    
40
        Runnable r = () -> {
41
            while (ongoing) {
42
                Message mess = dnetMessageManager.getOnGoingMessages(wfId);
43
                if (mess != null && mess.getBody() != null && mess.getBody().containsKey("ongoing")) {
44
                    try {
45
                        this.currentValue = Integer.parseInt(mess.getBody().get("ongoing"));
46
                        Thread.sleep(1000);
47
                    } catch (Throwable e) {
48
                        log.error("Error ono receiving messages ", e);
49
                    }
50
                }
51
            }
52
        };
53
        new Thread(r).start();
54
        super.prepareJob(job, token);
55
    }
56

    
57

    
58
    @Override
59
    protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
60
        return new BlackboardWorkflowJobListener(engine, token) {
61
            @Override
62
            protected void onFailed(final BlackboardJob job) {
63
                ongoing = false;
64
                log.warn("Blackboard workflow node FAILED: " + job.getError());
65
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
66
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
67
                complete(job, "abort");
68
            }
69
            @Override
70
            protected void populateEnv(Env env, Map<String, String> responseParams) {
71
                ongoing = false;
72

    
73
                List<Message> reports = dnetMessageManager.getReport(wfId);
74
                if (reports == null) {
75
                    int numberOftries = 0;
76
                    try {
77
                        while (reports == null && numberOftries < 3) {
78
                            reports = dnetMessageManager.getReport(wfId);
79
                            Thread.sleep(3000 * numberOftries++);
80
                        }
81
                    } catch (InterruptedException e) {
82
                        log.error("Error on waiting report", e);
83
                    }
84
                }
85

    
86
                if (reports == null) {
87
                    throw new RuntimeException("Unable to get report for WorklowId " + wfId);
88
                }
89

    
90

    
91
                reports.forEach(it -> it.getBody().forEach(env::setAttribute));
92
            }
93
        };
94
    }
95

    
96
    @Override
97
    public int getTotalValue() {
98
        return 0;
99
    }
100

    
101
    @Override
102
    public int getCurrentValue() {
103
        return currentValue;
104
    }
105

    
106
    @Override
107
    public boolean isInaccurate() {
108
        return false;
109
    }
110

    
111
    @Override
112
    public ProgressProvider getProgressProvider() {
113
        return this;
114
    }
115
}
(10-10/10)