Project

General

Profile

« Previous | Next » 

Revision 55226

Added by Enrico Ottonello over 5 years ago

added parameters to job (apidescriptor, name, path)

View differences:

modules/dnet-openaireplus-workflows/branches/dnet-hadoop/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/hadoop/SubmitDnetHadoopJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;
2 2

  
3
import java.util.Map;
4
import java.util.Optional;
5
import java.util.stream.Collectors;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.springframework.beans.factory.annotation.Autowired;
10

  
11
import com.google.gson.Gson;
3 12
import com.googlecode.sarasvati.Engine;
4 13
import com.googlecode.sarasvati.NodeToken;
5 14
import com.googlecode.sarasvati.env.Env;
15

  
16
import eu.dnetlib.collector.worker.model.ApiDescriptor;
17
import eu.dnetlib.enabling.datasources.common.ApiParam;
18
import eu.dnetlib.enabling.datasources.common.LocalDatasourceManager;
6 19
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
7 20
import eu.dnetlib.message.Message;
8 21
import eu.dnetlib.msro.message.DnetMessageManager;
......
11 24
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
12 25
import eu.dnetlib.msro.workflows.util.ProgressProvider;
13 26
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.springframework.beans.factory.annotation.Autowired;
17 27

  
18
import java.util.Map;
19

  
20 28
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
21
    private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
22 29

  
23
    @Autowired
24
    DnetMessageManager  dnetMessageManager;
30
	private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);
25 31

  
26
    private boolean ongoing = true;
32
	@Autowired
33
	DnetMessageManager dnetMessageManager;
27 34

  
28
    private int currentValue;
35
	@Autowired
36
	private LocalDatasourceManager<?, ?> dsManager;
29 37

  
30
    private String wfId;
38
	private boolean ongoing = true;
31 39

  
40
	private int currentValue;
32 41

  
42
	private String wfId;
33 43

  
34
    @Override
35
    protected void prepareJob(BlackboardJob job, NodeToken token) throws Exception {
36
        this.wfId =token.getProcess().getEnv().getAttribute("system:processId");
37
        job.getParameters().put("wfId",wfId);
38
        Runnable r = () -> {
39
          while (ongoing) {
40
              Message mess = dnetMessageManager.getOnGoingMessages(wfId);
41
              if (mess!= null && mess.getBody()!= null && mess.getBody().containsKey("progressCount")) {
42
                  try {
43
                      this.currentValue = Integer.parseInt(mess.getBody().get("progressCount"));
44
                      Thread.sleep(1000);
45
                  } catch (Throwable e) {
44
	@Override
45
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
46 46

  
47
                  }
48
              }
49
          }
50
        };
47
		final String dsId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_ID);
48
		final String apiId = token.getEnv().getAttribute(WorkflowsConstants.DATAPROVIDER_INTERFACE);
49
		final String mdId = token.getEnv().getAttribute("mdId");
50
		final String versionId = token.getEnv().getAttribute("versionId");
51 51

  
52
        new Thread(r).start();
53
        super.prepareJob(job, token);
54
    }
52
		final Optional<ApiDescriptor> opt = dsManager.getApis(dsId).stream().filter(a -> a.getId().equals(apiId)).map(a -> {
53
			final ApiDescriptor res = new ApiDescriptor();
54
			res.setBaseUrl(a.getBaseurl());
55
			res.setId(a.getId());
56
			res.setProtocol(a.getProtocol());
57
			res.getParams().putAll(a.getApiParams().stream().map(o -> {
58
				return (ApiParam) o;
59
			}).collect(Collectors.toMap(ApiParam::getParam, ApiParam::getValue)));
60
			return res;
61
		}).findFirst();
55 62

  
63
		if (!opt.isPresent()) { return; }
56 64

  
57
    @Override
58
    protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, NodeToken token) {
59
        return new BlackboardWorkflowJobListener(engine, token) {
65
		final ApiDescriptor api = opt.get();
66
		final String hdfsPath = "hdfs://mdstores/" + mdId + "/" + versionId;
67
		final String nameNode = "SubmitDnetHadoop";
60 68

  
69
		log.info("Collection param 1 : hdfsPath = " + hdfsPath);
70
		log.info("Collection param 2 : api descriptor json = " + new Gson().toJson(api));
71
		log.info("Collection param 3 : nameNode = " + nameNode);
61 72

  
62
            @Override
63
            protected void onFailed(final BlackboardJob job) {
64
                ongoing = false;
65
                log.warn("Blackboard workflow node FAILED: " + job.getError());
66
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
67
                token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
68
                complete(job, "abort");
69
            }
73
		wfId = token.getProcess().getEnv().getAttribute("system:processId");
74
		job.getParameters().put("wfId", wfId);
75
		final Runnable runnable = () -> {
76
			while (ongoing) {
77
				final Message mess = dnetMessageManager.getOnGoingMessages(wfId);
78
				if (mess != null && mess.getBody() != null && mess.getBody().containsKey("progressCount")) {
79
					try {
80
						currentValue = Integer.parseInt(mess.getBody().get("progressCount"));
81
						Thread.sleep(1000);
82
					} catch (final Throwable e) {
83
						e.printStackTrace();
84
					}
85
				}
86
			}
87
		};
70 88

  
71
            @Override
72
            protected void populateEnv(Env env, Map<String, String> responseParams) {
73
                ongoing=false;
89
		job.getParameters().put("hdfsPath", hdfsPath);
90
		job.getParameters().put("nameNode", nameNode);
91
		job.getParameters().put("api", new Gson().toJson(api));
74 92

  
75
                Message report = dnetMessageManager.getReport(wfId, "Collection");
76
                if (report!= null)
77
                    report.getBody().forEach(env::setAttribute);
78
            }
93
		new Thread(runnable).start();
94
		super.prepareJob(job, token);
95
	}
79 96

  
97
	@Override
98
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
99
		return new BlackboardWorkflowJobListener(engine, token) {
80 100

  
81
        };
82
    }
101
			@Override
102
			protected void onFailed(final BlackboardJob job) {
103
				ongoing = false;
104
				log.warn("Blackboard workflow node FAILED: " + job.getError());
105
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_HAS_FAILED, true);
106
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR, job.getError());
107
				complete(job, "abort");
108
			}
83 109

  
84
    @Override
85
    public int getTotalValue() {
86
        return 0;
87
    }
110
			@Override
111
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
112
				ongoing = false;
88 113

  
89
    @Override
90
    public int getCurrentValue() {
91
        return currentValue;
92
    }
114
				final Message report = dnetMessageManager.getReport(wfId, "Collection");
115
				if (report != null) {
116
					report.getBody().forEach(env::setAttribute);
117
				}
118
			}
93 119

  
94
    @Override
95
    public boolean isInaccurate() {
96
        return false;
97
    }
120
		};
121
	}
98 122

  
99
    @Override
100
    public ProgressProvider getProgressProvider() {
101
        return this;
102
    }
123
	@Override
124
	public int getTotalValue() {
125
		return 0;
126
	}
127

  
128
	@Override
129
	public int getCurrentValue() {
130
		return currentValue;
131
	}
132

  
133
	@Override
134
	public boolean isInaccurate() {
135
		return false;
136
	}
137

  
138
	@Override
139
	public ProgressProvider getProgressProvider() {
140
		return this;
141
	}
103 142
}

Also available in: Unified diff