Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.mdstore;
2

    
3
import java.util.Map;
4

    
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.springframework.beans.factory.annotation.Autowired;
8

    
9
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
10
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
11
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
12
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
13
import eu.dnetlib.msro.workflows.procs.Env;
14
import eu.dnetlib.msro.workflows.procs.Token;
15
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
16
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
17
import eu.dnetlib.rmi.common.ResultSet;
18
import eu.dnetlib.rmi.common.ResultSetException;
19
import eu.dnetlib.rmi.data.MDStoreService;
20

    
21
public class StoreMDStoreRecordsJobNode extends BlackboardJobNode {
22

    
23
	private static final Log log = LogFactory.getLog(StoreMDStoreRecordsJobNode.class);
24

    
25
	private String eprParam;
26
	private String mdId;
27
	private String storingType;
28

    
29
	@Autowired
30
	private ResultSetClient resultSetClient;
31

    
32
	@Override
33
	protected String obtainServiceId(final Env env) {
34
		return getServiceLocator().getServiceId(MDStoreService.class, this.mdId);
35
	}
36

    
37
	@Override
38
	protected void prepareJob(final BlackboardJob job, final Token token) throws ResultSetException {
39
		final ResultSet<?> rs = token.getEnv().getAttribute(getEprParam(), ResultSet.class);
40

    
41
		token.setProgressProvider(new ResultsetProgressProvider(rs, this.resultSetClient));
42

    
43
		job.setAction("FEED");
44
		job.getParameters().put("epr", rs.toJson());
45
		job.getParameters().put("storingType", getStoringType());
46
		job.getParameters().put("mdId", getMdId());
47
	}
48

    
49
	@Override
50
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Token token) {
51
		return new BlackboardWorkflowJobListener(token) {
52

    
53
			@Override
54
			protected void responseToEnv(final Env env, final Map<String, String> responseParams) {
55
				log.info("New mdstore size : " + responseParams.get("mdstoreSize"));
56
				log.info("Number of write operations: " + responseParams.get("writeOps"));
57
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "sinkSize", responseParams.get("mdstoreSize"));
58
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "processedSize", responseParams.get("writeOps"));
59
			}
60
		};
61
	}
62

    
63
	public String getEprParam() {
64
		return this.eprParam;
65
	}
66

    
67
	public void setEprParam(final String eprParam) {
68
		this.eprParam = eprParam;
69
	}
70

    
71
	public String getMdId() {
72
		return this.mdId;
73
	}
74

    
75
	public void setMdId(final String mdId) {
76
		this.mdId = mdId;
77
	}
78

    
79
	public String getStoringType() {
80
		return this.storingType;
81
	}
82

    
83
	public void setStoringType(final String storingType) {
84
		this.storingType = storingType;
85
	}
86

    
87
}
(9-9/10)