Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.download;
2

    
3
import java.util.Map;
4

    
5
import com.googlecode.sarasvati.Engine;
6
import com.googlecode.sarasvati.NodeToken;
7
import com.googlecode.sarasvati.env.Env;
8
import eu.dnetlib.data.download.rmi.DownloadService;
9
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
10
import eu.dnetlib.msro.workflows.nodes.BlackboardJobNode;
11
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
12
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
13
import eu.dnetlib.msro.workflows.resultset.ProcessCountingResultSetFactory;
14
import eu.dnetlib.msro.workflows.util.ProgressProvider;
15
import eu.dnetlib.msro.workflows.util.ResultsetProgressProvider;
16
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
17
import org.apache.commons.codec.binary.Base64;
18
import org.apache.commons.lang.StringUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.springframework.beans.factory.annotation.Autowired;
22

    
23
// TODO: Auto-generated Javadoc
24
/**
25
 * The Class DownloadFromMetadata is a job node that send a blackboard message to the Download service to start to download file from url
26
 * retrieved by Metadata .
27
 */
28
public class DownloadFromMetadataJobNode extends BlackboardJobNode implements ProgressJobNode {
29

    
30
	private static final Log log = LogFactory.getLog(DownloadFromMetadataJobNode.class);
31

    
32
	protected String regularExpression;
33
	/** The inputepr param. */
34
	private String inputeprParam;
35
	/** The obejct store id. */
36
	private String objectStoreID;
37
	/** The plugin. */
38
	private String plugin;
39
	/** The protocol. */
40
	private String protocol;
41
	/** The mime type. */
42
	private String mimeType;
43
	private int numberOfThreads = -1;
44
	private int connectTimeoutMs = -1;
45
	private int readTimeoutMs = -1;
46

    
47
	private int sleepTimeMs = 0;
48
	/** The process counting result set factory. */
49
	@Autowired
50
	private ProcessCountingResultSetFactory processCountingResultSetFactory;
51

    
52
	/** The progress provider. */
53
	private ResultsetProgressProvider progressProvider;
54

    
55
	/*
56
	 * (non-Javadoc)
57
	 *
58
	 * @see eu.dnetlib.msro.workflows.nodes.BlackboardJobNode#obtainServiceId(com.googlecode.sarasvati.NodeToken)
59
	 */
60
	@Override
61
	protected String obtainServiceId(final NodeToken token) {
62
		return getServiceLocator().getServiceId(DownloadService.class);
63
	}
64

    
65
	/*
66
	 * (non-Javadoc)
67
	 *
68
	 * @see eu.dnetlib.msro.workflows.nodes.BlackboardJobNode#prepareJob(eu.dnetlib.enabling.tools.blackboard.BlackboardJob,
69
	 * com.googlecode.sarasvati.NodeToken)
70
	 */
71
	@Override
72
	protected void prepareJob(final BlackboardJob job, final NodeToken token) throws Exception {
73
		job.setAction("DOWNLOAD");
74
		final String eprS = token.getEnv().getAttribute(getInputeprParam());
75
		this.progressProvider = processCountingResultSetFactory.createProgressProvider(token.getProcess(), eprS);
76
		job.getParameters().put("epr", progressProvider.getEpr().toString());
77
		job.getParameters().put("protocol", getProtocol());
78
		job.getParameters().put("plugin", getPlugin());
79
		job.getParameters().put("mimeType", getMimeType());
80
		job.getParameters().put("objectStoreID", getObjectStoreID());
81
		if (getNumberOfThreads() > 0) {
82
			job.getParameters().put("numberOfThreads", "" + getNumberOfThreads());
83
		}
84
		if (getConnectTimeoutMs() > 0) {
85
			job.getParameters().put("connectTimeoutMs", "" + getConnectTimeoutMs());
86
		}
87
		if (getReadTimeoutMs() > 0) {
88
			job.getParameters().put("readTimeoutMs", "" + getReadTimeoutMs());
89
		}
90
		if (getSleepTimeMs() > 0) {
91
			job.getParameters().put("sleepTimeMs", "" + getSleepTimeMs());
92
		}
93
		if (!StringUtils.isEmpty(getRegularExpression())){
94
			job.getParameters().put("regularExpressions", getRegularExpression());
95
		}
96
	}
97

    
98
	/*
99
	 * (non-Javadoc)
100
	 *
101
	 * @see eu.dnetlib.msro.workflows.nodes.BlackboardJobNode#generateBlackboardListener(com.googlecode.sarasvati.Engine,
102
	 * com.googlecode.sarasvati.NodeToken)
103
	 */
104
	@Override
105
	protected BlackboardWorkflowJobListener generateBlackboardListener(final Engine engine, final NodeToken token) {
106
		return new BlackboardWorkflowJobListener(engine, token) {
107

    
108
			@Override
109
			protected void populateEnv(final Env env, final Map<String, String> responseParams) {
110
				env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "total", responseParams.get("total"));
111

    
112
				final String base64 = responseParams.get("report");
113
				if (StringUtils.isNotBlank(base64) && Base64.isBase64(base64.getBytes())) {
114
					final String report = new String(Base64.decodeBase64(base64.getBytes()));
115
					log.info("found download report");
116
					log.debug(report);
117
					env.setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + "report", report);
118
				} else {
119
					log.warn("unable to find or decode download report");
120
				}
121
			}
122
		};
123
	}
124

    
125
	/**
126
	 * Gets the inputepr param.
127
	 *
128
	 * @return the inputeprParam
129
	 */
130
	public String getInputeprParam() {
131
		return inputeprParam;
132
	}
133

    
134
	/**
135
	 * Sets the inputepr param.
136
	 *
137
	 * @param inputeprParam
138
	 *            the inputeprParam to set
139
	 */
140
	public void setInputeprParam(final String inputeprParam) {
141
		this.inputeprParam = inputeprParam;
142
	}
143

    
144
	/**
145
	 * Gets the object store id.
146
	 *
147
	 * @return the objectStoreID
148
	 */
149
	public String getObjectStoreID() {
150
		return objectStoreID;
151
	}
152

    
153
	/**
154
	 * Sets the object store id.
155
	 *
156
	 * @param objectStoreID
157
	 *            the objectStoreID to set
158
	 */
159
	public void setObjectStoreID(final String objectStoreID) {
160
		this.objectStoreID = objectStoreID;
161
	}
162

    
163
	/**
164
	 * Gets the plugin.
165
	 *
166
	 * @return the plugin
167
	 */
168
	public String getPlugin() {
169
		return plugin;
170
	}
171

    
172
	/**
173
	 * Sets the plugin.
174
	 *
175
	 * @param plugin
176
	 *            the plugin to set
177
	 */
178
	public void setPlugin(final String plugin) {
179
		this.plugin = plugin;
180
	}
181

    
182
	/**
183
	 * Gets the protocol.
184
	 *
185
	 * @return the protol
186
	 */
187
	public String getProtocol() {
188
		return protocol;
189
	}
190

    
191
	/**
192
	 * Sets the protocol.
193
	 *
194
	 * @param protol
195
	 *            the protol to set
196
	 */
197
	public void setProtocol(final String protol) {
198
		this.protocol = protol;
199
	}
200

    
201
	/**
202
	 * Gets the mime type.
203
	 *
204
	 * @return the mimeType
205
	 */
206
	public String getMimeType() {
207
		return mimeType;
208
	}
209

    
210
	/**
211
	 * Sets the mime type.
212
	 *
213
	 * @param mimeType
214
	 *            the mimeType to set
215
	 */
216
	public void setMimeType(final String mimeType) {
217
		this.mimeType = mimeType;
218
	}
219

    
220
	/*
221
	 * (non-Javadoc)
222
	 *
223
	 * @see eu.dnetlib.msro.workflows.nodes.ProgressJobNode#getProgressProvider()
224
	 */
225
	@Override
226
	public ProgressProvider getProgressProvider() {
227
		return progressProvider;
228
	}
229

    
230
	public String getRegularExpression() {
231
		return regularExpression;
232
	}
233

    
234
	public void setRegularExpression(final String regularExpression) {
235
		this.regularExpression = regularExpression;
236
	}
237

    
238
	public int getNumberOfThreads() {
239
		return numberOfThreads;
240
	}
241

    
242
	public void setNumberOfThreads(final int numberOfThreads) {
243
		this.numberOfThreads = numberOfThreads;
244
	}
245

    
246
	public int getConnectTimeoutMs() {
247
		return connectTimeoutMs;
248
	}
249

    
250
	public void setConnectTimeoutMs(final int connectTimeoutMs) {
251
		this.connectTimeoutMs = connectTimeoutMs;
252
	}
253

    
254
	public int getReadTimeoutMs() {
255
		return readTimeoutMs;
256
	}
257

    
258
	public void setReadTimeoutMs(final int readTimeoutMs) {
259
		this.readTimeoutMs = readTimeoutMs;
260
	}
261

    
262
	public int getSleepTimeMs() {
263
		return sleepTimeMs;
264
	}
265

    
266
	public void setSleepTimeMs(final int sleepTimeMs) {
267
		this.sleepTimeMs = sleepTimeMs;
268
	}
269
}
(1-1/2)