Project

General

Profile

1
package eu.dnetlib.data.download.worker;
2

    
3
import java.io.IOException;
4
import java.net.HttpURLConnection;
5
import java.net.URL;
6
import java.net.URLConnection;
7
import java.util.concurrent.BlockingQueue;
8
import java.util.concurrent.Callable;
9
import java.util.concurrent.TimeUnit;
10

    
11
import com.google.common.base.Function;
12
import com.google.common.base.Joiner;
13
import eu.dnetlib.data.download.DownloadReportMap;
14
import eu.dnetlib.data.download.DownloadServiceImpl;
15
import eu.dnetlib.data.download.rmi.DownloadItem;
16
import eu.dnetlib.data.download.rmi.DownloadServiceFeeder;
17
import eu.dnetlib.data.objectstore.modular.ObjectStoreRecord;
18
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
19
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
20
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
21
import eu.dnetlib.data.objectstore.rmi.Protocols;
22
import org.apache.commons.lang3.StringUtils;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25

    
26
/**
27
 * The Class DownloadWorker is a worker responsible to download the data into the object store.
28
 */
29
public class DownloadWorker implements Callable<DownloadReportMap> {
30

    
31
	/**
32
	 * The Constant log.
33
	 */
34
	private static final Log log = LogFactory.getLog(DownloadWorker.class);
35
	private static final int MAX_NULLS = 5 ;
36

    
37
	/**
38
	 * The queue.
39
	 */
40
	private BlockingQueue<String> queue = null;
41

    
42
	/**
43
	 * The object store.
44
	 */
45
	private ObjectStore objectStore = null;
46

    
47
	/**
48
	 * The protocol.
49
	 */
50
	private Protocols protocol;
51

    
52
	/**
53
	 * The mime type.
54
	 */
55
	private String mimeType;
56

    
57
	private Function<String, DownloadItem> converter;
58

    
59
	private int connectTimeoutMs;
60
	private int readTimeoutMs;
61

    
62
	/**
63
	 * ms to wait between two subsequent download request
64
	 **/
65
	private int sleepTimeMs;
66

    
67
	/**
68
	 * Instantiates a new download worker.
69
	 *
70
	 * @param queue
71
	 *            the queue
72
	 * @param objectStore
73
	 *            the object store
74
	 * @param protocol
75
	 *            the protocol
76
	 * @param mimeType
77
	 *            the mime type
78
	 */
79
	public DownloadWorker(final BlockingQueue<String> queue,
80
			final ObjectStore objectStore,
81
			final Protocols protocol,
82
			final String mimeType,
83
			final int connectTimeoutMs,
84
			final int readTimeoutMs, final int sleepTimeMs,
85
			final Function<String, DownloadItem> converter) {
86
		this.setConverter(converter);
87
		this.setQueue(queue);
88
		this.setObjectStore(objectStore);
89
		this.setMimeType(mimeType);
90
		this.setProtocol(protocol);
91
		this.setConnectTimeoutMs(connectTimeoutMs);
92
		this.setReadTimeoutMs(readTimeoutMs);
93
		this.setSleepTimeMs(sleepTimeMs);
94
	}
95

    
96
	/*
97
	 * (non-Javadoc)
98
	 *
99
	 * @see java.lang.Runnable#run()
100
	 */
101
	@Override
102
	public DownloadReportMap call() throws Exception {
103
		final DownloadReportMap report = new DownloadReportMap();
104
		final long threadId = Thread.currentThread().getId();
105
		int nullCounter = 0;
106
		try {
107

    
108
			String takedObject = queue.poll(5, TimeUnit.SECONDS);
109

    
110
			while (!DownloadServiceImpl.END_QUEUE_STRING.equals(takedObject) && nullCounter < MAX_NULLS) {
111

    
112
				if (takedObject == null) {
113
					nullCounter++;
114
				}
115
				if (log.isDebugEnabled()) {
116
					log.debug(takedObject);
117
				}
118
				final DownloadItem di = getConverter().apply(takedObject);
119

    
120
				log.debug(threadId + ": Reading " + takedObject);
121

    
122
				if (di == null) {
123
					log.debug("the current download item is Null, skipping");
124
					DownloadServiceFeeder.reportException(report, null, new IllegalArgumentException("found null DownloadItem"));
125
				} else if (StringUtils.isNotBlank(di.getUrl()) && !checkIfExists(di)) {
126
					doDownload(threadId, report, di);
127
				}
128
				takedObject = queue.poll(5, TimeUnit.SECONDS);
129
				log.debug(String.format("%s: next object from queue %s, remaining items: %s", threadId, takedObject, queue.size()));
130
			}
131
		} catch (final Exception e) {
132
			log.debug("An error occured : " + Joiner.on("\tat ").join(e.getStackTrace()));
133
			DownloadServiceFeeder.reportException(report, null, e);
134
			report.setStatus(false);
135
			return report;
136
		} finally {
137
			log.info(String.format("%s: finalising queue, remaining items: %s, nulls: %s", threadId, queue.size(), nullCounter));
138
			final boolean res = queue.offer(DownloadServiceImpl.END_QUEUE_STRING, 5, TimeUnit.SECONDS);
139
			log.info("put terminator in queue: " + res);
140
		}
141

    
142
		log.info("CLOSED THREAD " + threadId);
143
		report.setStatus(true);
144
		return report;
145
	}
146

    
147
	public void doDownload(final long threadId, final DownloadReportMap report, final DownloadItem di) {
148
		try {
149
			if (getSleepTimeMs() > 0) {
150
				log.debug(threadId + ": I will sleep for " + getSleepTimeMs() + " ms, as requested...");
151
				Thread.sleep(getSleepTimeMs());
152
			}
153

    
154
			final URL toDownload = followURL(threadId, new URL(di.getUrl()), report, di);
155
			final ObjectStoreRecord record = new ObjectStoreRecord();
156
			final ObjectStoreFile metadata = new ObjectStoreFile();
157
			metadata.setObjectID(di.getFileName());
158
			metadata.setMetadataRelatedID(di.getIdItemMetadata());
159
			metadata.setAccessProtocol(this.protocol);
160
			metadata.setMimeType(this.mimeType);
161
			metadata.setDownloadedURL(di.getOriginalUrl());
162
			record.setFileMetadata(metadata);
163

    
164
			log.debug(threadId + ": opening connection " + toDownload);
165
			final URLConnection connection = toDownload.openConnection();
166
			connection.setConnectTimeout(getConnectTimeoutMs());
167
			connection.setReadTimeout(getReadTimeoutMs());
168

    
169
			log.debug(threadId + ": getting input stream from " + toDownload);
170
			record.setInputStream(connection.getInputStream());
171
			log.debug(threadId + ": feeding object from  " + toDownload + " into objectstore ...");
172
			objectStore.feedObjectRecord(record);
173
			report.addDowload();
174
			log.debug(threadId + ": saved object " + metadata.toJSON());
175
		} catch (final Throwable e) {
176
			log.debug(threadId + ": error downloading Item: " + di.toJSON(), e);
177
			DownloadServiceFeeder.reportException(report, di, e);
178
		}
179
	}
180

    
181
	private URL followURL(final long threadId, final URL inputURL, final DownloadReportMap report, final DownloadItem di) throws IOException {
182

    
183
		final String ptrcl = inputURL.getProtocol();
184
		if (ptrcl.startsWith("file")) {
185
			log.debug("the protocol is File, returning " + inputURL);
186
			return inputURL;
187
		}
188
		HttpURLConnection conn = (HttpURLConnection) inputURL.openConnection();
189
		conn.setInstanceFollowRedirects(true);  // you still need to handle redirect manually.
190
		conn.setReadTimeout(getReadTimeoutMs());
191
		conn.setConnectTimeout(getConnectTimeoutMs());
192
		String location = inputURL.toString();
193
		log.debug(threadId + " : followURL connecting  " + inputURL);
194
		conn.connect();
195
		log.debug(threadId + " : followURL connected  " + inputURL);
196
		int responseCode = conn.getResponseCode();
197
		log.debug(threadId + " : followURL " + inputURL + ", response code: " + responseCode);
198
		if ((responseCode >= 300) && (responseCode < 400)) {
199
			location = conn.getHeaderFields().get("Location").get(0);
200
			conn.disconnect();
201
			log.debug(threadId + " : followURL disconnected  " + inputURL);
202
		}
203

    
204
		if (!location.equals(inputURL.toString())) return new URL(location);
205
		return inputURL;
206
	}
207

    
208
	private boolean checkIfExists(final DownloadItem di) {
209
		try {
210
			return objectStore.deliverObject(di.getFileName()) != null;
211
		} catch (final ObjectStoreServiceException e) {
212
			log.debug(e.getMessage());
213
			return false;
214
		}
215
	}
216

    
217
	/**
218
	 * Sets the object store.
219
	 *
220
	 * @param objectStore
221
	 *            the objectStore to set
222
	 */
223
	public void setObjectStore(final ObjectStore objectStore) {
224
		this.objectStore = objectStore;
225
	}
226

    
227
	/**
228
	 * Sets the queue.
229
	 *
230
	 * @param queue
231
	 *            the queue to set
232
	 */
233
	public void setQueue(final BlockingQueue<String> queue) {
234
		this.queue = queue;
235
	}
236

    
237
	/**
238
	 * Sets the protocol.
239
	 *
240
	 * @param protocol
241
	 *            the protocol to set
242
	 */
243
	public void setProtocol(final Protocols protocol) {
244
		this.protocol = protocol;
245
	}
246

    
247
	/**
248
	 * Sets the mime type.
249
	 *
250
	 * @param mimeType
251
	 *            the mimeType to set
252
	 */
253
	public void setMimeType(final String mimeType) {
254
		this.mimeType = mimeType;
255
	}
256

    
257
	public Function<String, DownloadItem> getConverter() {
258
		return converter;
259
	}
260

    
261
	public void setConverter(final Function<String, DownloadItem> converter) {
262
		this.converter = converter;
263
	}
264

    
265
	public int getReadTimeoutMs() {
266
		return readTimeoutMs;
267
	}
268

    
269
	public void setReadTimeoutMs(final int readTimeoutMs) {
270
		this.readTimeoutMs = readTimeoutMs;
271
	}
272

    
273
	public int getConnectTimeoutMs() {
274
		return connectTimeoutMs;
275
	}
276

    
277
	public void setConnectTimeoutMs(final int connectTimeoutMs) {
278
		this.connectTimeoutMs = connectTimeoutMs;
279
	}
280

    
281
	public int getSleepTimeMs() {
282
		return sleepTimeMs;
283
	}
284

    
285
	public void setSleepTimeMs(final int sleepTimeMs) {
286
		this.sleepTimeMs = sleepTimeMs;
287
	}
288
}
    (1-1/1)