Project

General

Profile

« Previous | Next » 

Revision 60351

implematation of dhp messages with a MVC controller

View differences:

SubmitDnetHadoopJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.dhp;
2 2

  
3
import java.util.List;
4
import java.util.Map;
5

  
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.springframework.beans.factory.annotation.Autowired;
9

  
3 10
import com.googlecode.sarasvati.Engine;
4 11
import com.googlecode.sarasvati.NodeToken;
5 12
import com.googlecode.sarasvati.env.Env;
13

  
6 14
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
7 15
import eu.dnetlib.message.Message;
8 16
import eu.dnetlib.msro.openaireplus.workflows.nodes.dhp.message.DnetMessageManager;
......
11 19
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12 20
import eu.dnetlib.msro.workflows.util.ProgressProvider;
13 21
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17 22

  
18
import java.util.List;
19
import java.util.Map;
20

  
21 23
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
22 24

  
23
    private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
25
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
24 26

  
25
    @Autowired
26
    DnetMessageManager dnetMessageManager;
27
	@Autowired
28
	private DnetMessageManager dnetMessageManager;
27 29

  
28
    private boolean ongoing = true;
30
	private int currentValue = 0;
29 31

  
30
    private int currentValue;
32
	private int totalValue = 0;
31 33

  
32
    private String wfId;
34
	private boolean accurate = false;
33 35

  
36
	private String wfId;
34 37

  
35
    @Override
36
    protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
37
        this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
38
	@Override
39
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
40
		this.wfId = token.getProcess().getEnv().getAttribute("system:processId");
41
		super.prepareJob(job, token);
42
	}
38 43

  
44
	private void updateProgressProvider() {
45
		final Message mess = dnetMessageManager.getOnGoingMessage(wfId);
46
		if (mess != null && mess.getBody() != null) {
47
			if (mess.getBody().containsKey("current")) {
48
				try {
49
					setCurrentValue(Integer.parseInt(mess.getBody().get("current")));
50
				} catch (final Throwable e) {
51
					log.error("Error parsing value", e);
52
				}
53
			}
54
		}
55
		if (mess.getBody().containsKey("total")) {
56
			try {
57
				setTotalValue(Integer.parseInt(mess.getBody().get("total")));
58
				setAccurate(true);
59
			} catch (final Throwable e) {
60
				log.error("Error parsing value", e);
61
			}
62
		}
39 63

  
40
        Runnable r = () -> {
41
            while (ongoing) {
42
                Message mess = dnetMessageManager.getOnGoingMessages(wfId);
43
                if (mess != null && mess.getBody() != null && mess.getBody().containsKey("ongoing")) {
44
                    try {
45
                        this.currentValue = Integer.parseInt(mess.getBody().get("ongoing"));
46
                        Thread.sleep(1000);
47
                    } catch (Throwable e) {
48
                        log.error("Error ono receiving messages ", e);
49
                    }
50
                }
51
            }
52
        };
53
        new Thread(r).start();
54
        super.prepareJob(job, token);
55
    }
64
	}
56 65

  
66
	@Override
67
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
68
		return new BlackboardWorkflowJobListener(engine, token) {
57 69

  
58
    @Override
59
    protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
60
        return new BlackboardWorkflowJobListener(engine, token) {
61
            @Override
62
            protected void onFailed(final BlackboardJob job) {
63
                ongoing = false;
64
                log.warn("Blackboard workflow node FAILED: " + job.getError());
65
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
66
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
67
                complete(job, "abort");
68
            }
69
            @Override
70
            protected void populateEnv(Env env, Map<String, String> responseParams) {
71
                ongoing = false;
70
			@Override
71
			protected void onFailed(final BlackboardJob job) {
72
				log.warn("Blackboard workflow node FAILED: " + job.getError());
73
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
74
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
75
				complete(job, "abort");
76
			}
72 77

  
73
                List<Message> reports = dnetMessageManager.getReport(wfId);
74
                if (reports == null) {
75
                    int numberOftries = 0;
76
                    try {
77
                        while (reports == null && numberOftries < 3) {
78
                            reports = dnetMessageManager.getReport(wfId);
79
                            Thread.sleep(3000 * numberOftries++);
80
                        }
81
                    } catch (InterruptedException e) {
82
                        log.error("Error on waiting report", e);
83
                    }
84
                }
78
			@Override
79
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
80
				List<Message> reports = dnetMessageManager.getReports(wfId, true);
81
				if (reports.isEmpty()) {
82
					int numberOftries = 0;
83
					try {
84
						while (reports.isEmpty() && numberOftries < 3) {
85
							reports = dnetMessageManager.getReports(wfId, true);
86
							Thread.sleep(3000 * numberOftries++);
87
						}
88
					} catch (final InterruptedException e) {
89
						log.error("Error on waiting report", e);
90
					}
91
				}
85 92

  
86
                if (reports == null) {
87
                    throw new RuntimeException("Unable to get report for WorklowId " + wfId);
88
                }
93
				if (reports.isEmpty()) { throw new RuntimeException("Unable to get report for WorklowId " + wfId); }
89 94

  
95
				reports.forEach(it -> it.getBody().forEach(env::setAttribute));
96
			}
97
		};
98
	}
90 99

  
91
                reports.forEach(it -> it.getBody().forEach(env::setAttribute));
92
            }
93
        };
94
    }
100
	@Override
101
	public ProgressProvider getProgressProvider() {
102
		return this;
103
	}
95 104

  
96
    @Override
97
    public int getTotalValue() {
98
        return 0;
99
    }
105
	public String getWfId() {
106
		return wfId;
107
	}
100 108

  
101
    @Override
102
    public int getCurrentValue() {
103
        return currentValue;
104
    }
109
	public void setWfId(final String wfId) {
110
		this.wfId = wfId;
111
	}
105 112

  
106
    @Override
107
    public boolean isInaccurate() {
108
        return false;
109
    }
113
	public boolean isAccurate() {
114
		return accurate;
115
	}
110 116

  
111
    @Override
112
    public ProgressProvider getProgressProvider() {
113
        return this;
114
    }
115
}
117
	public void setAccurate(final boolean accurate) {
118
		this.accurate = accurate;
119
	}
120

  
121
	@Override
122
	public int getCurrentValue() {
123
		updateProgressProvider();
124
		return currentValue;
125
	}
126

  
127
	public void setCurrentValue(final int currentValue) {
128
		this.currentValue = currentValue;
129
	}
130

  
131
	@Override
132
	public int getTotalValue() {
133
		return totalValue;
134
	}
135

  
136
	public void setTotalValue(final int totalValue) {
137
		this.totalValue = totalValue;
138
	}
139

  
140
	@Override
141
	public boolean isInaccurate() {
142
		return !accurate;
143
	}
144

  
145
}

Also available in: Unified diff