Project

General

Profile

« Previous | Next » 

Revision 55265

complete wf of collection

View differences:

SubmitDnetHadoopJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2 2

  
3
import java.util.Map;
4
import java.util.Optional;
5
import java.util.stream.Collectors;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Autowired;
10

  
11
import com.google.gson.Gson;
12 3
import com.googlecode.sarasvati.Engine;
13 4
import com.googlecode.sarasvati.NodeToken;
14 5
import com.googlecode.sarasvati.env.Env;
15

  
16
import eu.dnetlib.collector.worker.model.ApiDescriptor;
17
import eu.dnetlib.enabling.datasources.common.ApiParam;
18
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
19 6
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
20 7
import eu.dnetlib.message.Message;
21 8
import eu.dnetlib.msro.message.DnetMessageManager;
......
24 11
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
25 12
import eu.dnetlib.msro.workflows.util.ProgressProvider;
26 13
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
27 17

  
18
import java.util.List;
19
import java.util.Map;
20

  
28 21
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
22
    private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
29 23

  
30
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
24
    @Autowired
25
    DnetMessageManager  dnetMessageManager;
31 26

  
32
	@Autowired
33
	DnetMessageManager dnetMessageManager;
27
    private boolean ongoing = true;
34 28

  
35
	@Autowired
36
	private LocalDatasourceManager<?, ?> dsManager;
29
    private int currentValue;
37 30

  
38
	private boolean ongoing = true;
31
    private String wfId;
39 32

  
40
	private int currentValue;
41 33

  
42
	private String wfId;
43 34

  
44
	@Override
45
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
35
    @Override
36
    protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
37
        this.wfId =token.getProcess().getEnv().getAttribute("system:processId");
46 38

  
47
		final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
48
		final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
49
		final String mdId = token.getEnv().getAttribute("mdId");
50
		final String versionId = token.getEnv().getAttribute("versionId");
51 39

  
52
		final Optional<ApiDescriptor> opt = dsManager.getApis(dsId).stream().filter(a -> a.getId().equals(apiId)).map(a -> {
53
			final ApiDescriptor res = new ApiDescriptor();
54
			res.setBaseUrl(a.getBaseurl());
55
			res.setId(a.getId());
56
			res.setProtocol(a.getProtocol());
57
			res.getParams().putAll(a.getApiParams().stream().map(o -> {
58
				return (ApiParam) o;
59
			}).collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
60
			return res;
61
		}).findFirst();
40
        Runnable r = () -> {
41
          while (ongoing) {
42
              Message mess = dnetMessageManager.getOnGoingMessages(wfId);
43
              if (mess!= null && mess.getBody()!= null && mess.getBody().containsKey("ongoing")) {
44
                  try {
45
                      this.currentValue = Integer.parseInt(mess.getBody().get("ongoing"));
46
                      Thread.sleep(1000);
47
                  } catch (Throwable e) {
48
                    log.error("Error ono receiving messages ", e);
49
                  }
50
              }
51
          }
52
        };
53
        new Thread(r).start();
54
        super.prepareJob(job, token);
55
    }
62 56

  
63
		if (!opt.isPresent()) { return; }
64 57

  
65
		final ApiDescriptor api = opt.get();
66
		final String hdfsPath = "hdfs://mdstores/" + mdId + "/" + versionId;
67
		final String nameNode = "SubmitDnetHadoop";
58
    @Override
59
    protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
60
        return new BlackboardWorkflowJobListener(engine, token) {
68 61

  
69
		log.info("Collection param 1 : hdfsPath = " + hdfsPath);
70
		log.info("Collection param 2 : api descriptor json = " + new Gson().toJson(api));
71
		log.info("Collection param 3 : nameNode = " + nameNode);
72 62

  
73
		wfId = token.getProcess().getEnv().getAttribute("system:processId");
74
		job.getParameters().put("wfId", wfId);
75
		final Runnable runnable = () -> {
76
			while (ongoing) {
77
				final Message mess = dnetMessageManager.getOnGoingMessages(wfId);
78
				if (mess != null && mess.getBody() != null && mess.getBody().containsKey("progressCount")) {
79
					try {
80
						currentValue = Integer.parseInt(mess.getBody().get("progressCount"));
81
						Thread.sleep(1000);
82
					} catch (final Throwable e) {
83
						e.printStackTrace();
84
					}
85
				}
86
			}
87
		};
63
            @Override
64
            protected void onFailed(final BlackboardJob job) {
65
                ongoing = false;
66
                log.warn("Blackboard workflow node FAILED: " + job.getError());
67
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
68
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
69
                complete(job, "abort");
70
            }
88 71

  
89
		job.getParameters().put("hdfsPath", hdfsPath);
90
		job.getParameters().put("nameNode", nameNode);
91
		job.getParameters().put("api", new Gson().toJson(api));
72
            @Override
73
            protected void populateEnv(Env env, Map<String, String> responseParams) {
74
                ongoing = false;
75
                List<Message> reports = dnetMessageManager.getReport(wfId);
76
                if (reports != null)
77
                    reports.forEach(it -> it.getBody().forEach(env::setAttribute));
78
            }
79
        };
80
    }
92 81

  
93
		new Thread(runnable).start();
94
		super.prepareJob(job, token);
95
	}
82
    @Override
83
    public int getTotalValue() {
84
        return 0;
85
    }
96 86

  
97
	@Override
98
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
99
		return new BlackboardWorkflowJobListener(engine, token) {
87
    @Override
88
    public int getCurrentValue() {
89
        return currentValue;
90
    }
100 91

  
101
			@Override
102
			protected void onFailed(final BlackboardJob job) {
103
				ongoing = false;
104
				log.warn("Blackboard workflow node FAILED: " + job.getError());
105
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
106
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
107
				complete(job, "abort");
108
			}
92
    @Override
93
    public boolean isInaccurate() {
94
        return false;
95
    }
109 96

  
110
			@Override
111
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
112
				ongoing = false;
113

  
114
				final Message report = dnetMessageManager.getReport(wfId, "Collection");
115
				if (report != null) {
116
					report.getBody().forEach(env::setAttribute);
117
				}
118
			}
119

  
120
		};
121
	}
122

  
123
	@Override
124
	public int getTotalValue() {
125
		return 0;
126
	}
127

  
128
	@Override
129
	public int getCurrentValue() {
130
		return currentValue;
131
	}
132

  
133
	@Override
134
	public boolean isInaccurate() {
135
		return false;
136
	}
137

  
138
	@Override
139
	public ProgressProvider getProgressProvider() {
140
		return this;
141
	}
97
    @Override
98
    public ProgressProvider getProgressProvider() {
99
        return this;
100
    }
142 101
}

Also available in: Unified diff