Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.mdstore;
2

    
3
import java.util.Map;
4

    
5
import com.googlecode.sarasvati.Engine;
6
import com.googlecode.sarasvati.NodeToken;
7
import com.googlecode.sarasvati.env.Env;
8
import eu.dnetlib.data.mdstore.MDStoreService;
9
import eu.dnetlib.enabling.resultset.rmi.ResultSetException;
10
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
11
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
12
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
13
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
14
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
15
import eu.dnetlib.msro.workflows.util.ProgressProvider;
16
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
17
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.springframework.beans.factory.annotation.Required;
22

    
23
import static eu.dnetlib.msro.workflows.util.WorkflowsConstants.SYSTEM_WF_PROCESS_ID;
24

    
25
public class StoreMDStoreRecordsJobNode extends BlackboardJobNode implements ProgressJobNode {
26

    
27
	private static final Log log = LogFactory.getLog(StoreMDStoreRecordsJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
28

    
29
	private String eprParam;
30
	private String mdId;
31
	private String storingType;
32
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
33
	private ResultsetProgressProvider progressProvider;
34

    
35
	@Override
36
	protected String obtainServiceId(final NodeToken token) {
37
		return getServiceLocator().getServiceId(MDStoreService.class, mdId);
38
	}
39

    
40
	@Override
41
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws ResultSetException {
42
		job.setAction("FEED");
43

    
44
		final String eprS = token.getEnv().getAttribute(getEprParam());
45

    
46
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), eprS);
47

    
48
		//for storing records after incremental transformation
49
		final String overrideStoringType = token.getFullEnv().getAttribute("operationType");
50
		if (StringUtils.isNotBlank(overrideStoringType)) {
51
			token.getFullEnv().setAttribute("storingType", overrideStoringType);
52
			setStoringType(overrideStoringType);
53
		}
54
		job.getParameters().put("epr", progressProvider.getEpr().toString());
55
		job.getParameters().put("storingType", getStoringType());
56
		job.getParameters().put("mdId", getMdId());
57

    
58
		String processId = token.getProcess().getEnv().getAttribute(SYSTEM_WF_PROCESS_ID);
59
		job.getParameters().put(SYSTEM_WF_PROCESS_ID, processId);
60
	}
61

    
62
	@Override
63
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
64
		return new BlackboardWorkflowJobListener(engine, token) {
65

    
66
			@Override
67
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
68
				log.info("New mdstore size : " + responseParams.get("mdstoreSize"));
69
				log.info("Number of write operations: " + responseParams.get("writeOps"));
70
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "sinkSize", responseParams.get("mdstoreSize"));
71
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "processedSize", responseParams.get("writeOps"));
72
			}
73
		};
74
	}
75

    
76
	public String getEprParam() {
77
		return eprParam;
78
	}
79

    
80
	public void setEprParam(final String eprParam) {
81
		this.eprParam = eprParam;
82
	}
83

    
84
	public String getMdId() {
85
		return mdId;
86
	}
87

    
88
	public void setMdId(final String mdId) {
89
		this.mdId = mdId;
90
	}
91

    
92
	public String getStoringType() {
93
		return storingType;
94
	}
95

    
96
	public void setStoringType(final String storingType) {
97
		this.storingType = storingType;
98
	}
99

    
100
	public ProcessCountingResultSetFactory getProcessCountingResultSetFactory() {
101
		return processCountingResultSetFactory;
102
	}
103

    
104
	@Required
105
	public void setProcessCountingResultSetFactory(final ProcessCountingResultSetFactory processCountingResultSetFactory) {
106
		this.processCountingResultSetFactory = processCountingResultSetFactory;
107
	}
108

    
109
	@Override
110
	public ProgressProvider getProgressProvider() {
111
		return progressProvider;
112
	}
113

    
114
}
(5-5/5)