Project

General

Profile

1
package eu.dnetlib.data.download.worker;
2

    
3
import java.io.IOException;
4
import java.net.HttpURLConnection;
5
import java.net.URL;
6
import java.net.URLConnection;
7
import java.util.concurrent.BlockingQueue;
8
import java.util.concurrent.Callable;
9
import java.util.concurrent.TimeUnit;
10

    
11
import com.google.common.base.Function;
12
import com.google.common.base.Joiner;
13
import eu.dnetlib.data.download.DownloadReportMap;
14
import eu.dnetlib.data.download.DownloadServiceImpl;
15
import eu.dnetlib.data.download.rmi.DownloadItem;
16
import eu.dnetlib.data.download.rmi.DownloadServiceFeeder;
17
import eu.dnetlib.data.objectstore.modular.ObjectStoreRecord;
18
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
19
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
20
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
21
import eu.dnetlib.data.objectstore.rmi.Protocols;
22
import org.apache.commons.lang3.StringUtils;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25

    
26
/**
27
 * The Class DownloadWorker is a worker responsible to download the data into the object store.
28
 */
29
public class DownloadWorker implements Callable<DownloadReportMap> {
30

    
31
	/**
32
	 * The Constant log.
33
	 */
34
	private static final Log log = LogFactory.getLog(DownloadWorker.class);
35
	private static final int MAX_NULLS = 5 ;
36

    
37
	/**
38
	 * The queue.
39
	 */
40
	private BlockingQueue<String> queue = null;
41

    
42
	/**
43
	 * The object store.
44
	 */
45
	private ObjectStore objectStore = null;
46

    
47
	/**
48
	 * The protocol.
49
	 */
50
	private Protocols protocol;
51

    
52
	/**
53
	 * The mime type.
54
	 */
55
	private String mimeType;
56

    
57
	private Function<String, DownloadItem> converter;
58

    
59
	private int connectTimeoutMs;
60
	private int readTimeoutMs;
61

    
62
	/**
63
	 * ms to wait between two subsequent download request
64
	 **/
65
	private int sleepTimeMs;
66

    
67
	/**
68
	 * Instantiates a new download worker.
69
	 *
70
	 * @param queue
71
	 *            the queue
72
	 * @param objectStore
73
	 *            the object store
74
	 * @param protocol
75
	 *            the protocol
76
	 * @param mimeType
77
	 *            the mime type
78
	 */
79
	public DownloadWorker(final BlockingQueue<String> queue,
80
			final ObjectStore objectStore,
81
			final Protocols protocol,
82
			final String mimeType,
83
			final int connectTimeoutMs,
84
			final int readTimeoutMs, final int sleepTimeMs,
85
			final Function<String, DownloadItem> converter) {
86
		this.setConverter(converter);
87
		this.setQueue(queue);
88
		this.setObjectStore(objectStore);
89
		this.setMimeType(mimeType);
90
		this.setProtocol(protocol);
91
		this.setConnectTimeoutMs(connectTimeoutMs);
92
		this.setReadTimeoutMs(readTimeoutMs);
93
		this.setSleepTimeMs(sleepTimeMs);
94
	}
95

    
96
	/*
97
	 * (non-Javadoc)
98
	 *
99
	 * @see java.lang.Runnable#run()
100
	 */
101
	@Override
102
	public DownloadReportMap call() throws Exception {
103
		final DownloadReportMap report = new DownloadReportMap();
104
		final long threadId = Thread.currentThread().getId();
105
		int nullCounter = 0;
106
		try {
107

    
108
			String takedObject = queue.poll(5, TimeUnit.SECONDS);
109

    
110
			while (!DownloadServiceImpl.END_QUEUE_STRING.equals(takedObject) && nullCounter < MAX_NULLS) {
111

    
112
				if (takedObject == null) {
113
					nullCounter++;
114
				}
115
				if (log.isDebugEnabled()) {
116
					log.debug(takedObject);
117
				}
118
				final DownloadItem di = getConverter().apply(takedObject);
119

    
120
				log.debug(threadId + ": Reading " + takedObject);
121

    
122
				if (di == null) {
123
					log.debug("the current download item is Null, skipping");
124
					DownloadServiceFeeder.reportException(report, null, new IllegalArgumentException("found null DownloadItem"));
125
				} else if (StringUtils.isNotBlank(di.getUrl()) && !checkIfExists(di)) {
126
					doDownload(threadId, report, di);
127
				}
128
				takedObject = queue.poll(5, TimeUnit.SECONDS);
129
				log.debug(String.format("%s: next object from queue %s, remaining items: %s", threadId, takedObject, queue.size()));
130
			}
131
		} catch (final Exception e) {
132
			log.debug("An error occured : " + Joiner.on("\tat ").join(e.getStackTrace()));
133
			DownloadServiceFeeder.reportException(report, null, e);
134
			report.setStatus(false);
135
			return report;
136
		} finally {
137
			log.info(String.format("%s: finalising queue, remaining items: %s, nulls: %s", threadId, queue.size(), nullCounter));
138
			final boolean res = queue.offer(DownloadServiceImpl.END_QUEUE_STRING, 5, TimeUnit.SECONDS);
139
			log.info("put terminator in queue: " + res);
140
		}
141

    
142
		log.info("CLOSED THREAD " + threadId);
143
		report.setStatus(true);
144
		return report;
145
	}
146

    
147
	public void doDownload(final long threadId, final DownloadReportMap report, final DownloadItem di) {
148
		try {
149
			if (getSleepTimeMs() > 0) {
150
				log.debug(threadId + ": I will sleep for " + getSleepTimeMs() + " ms, as requested...");
151
				Thread.sleep(getSleepTimeMs());
152
			}
153

    
154
			final URL toDownload = followURL(threadId, new URL(di.getUrl()), report, di);
155
			final ObjectStoreRecord record = new ObjectStoreRecord();
156
			final ObjectStoreFile metadata = new ObjectStoreFile();
157
			metadata.setObjectID(di.getFileName());
158
			metadata.setMetadataRelatedID(di.getIdItemMetadata());
159
			metadata.setAccessProtocol(this.protocol);
160
			metadata.setMimeType(this.mimeType);
161
			metadata.setDownloadedURL(di.getOriginalUrl());
162
			record.setFileMetadata(metadata);
163

    
164
			log.debug(threadId + ": opening connection " + toDownload);
165
			final URLConnection connection = toDownload.openConnection();
166
			connection.setConnectTimeout(getConnectTimeoutMs());
167
			connection.setReadTimeout(getReadTimeoutMs());
168
			connection.setRequestProperty("User-Agent", "OpenAIRE aggregation system/2.2.5 ; (OpenAccess aggregator; mailto:helpdesk@openaire.eu) ");
169

    
170
			log.debug(threadId + ": getting input stream from " + toDownload);
171
			record.setInputStream(connection.getInputStream());
172
			log.debug(threadId + ": feeding object from  " + toDownload + " into objectstore ...");
173
			objectStore.feedObjectRecord(record);
174
			report.addDowload();
175
			log.debug(threadId + ": saved object " + metadata.toJSON());
176
		} catch (final Throwable e) {
177
			log.debug(threadId + ": error downloading Item: " + di.toJSON(), e);
178
			DownloadServiceFeeder.reportException(report, di, e);
179
		}
180
	}
181

    
182
	private URL followURL(final long threadId, final URL inputURL, final DownloadReportMap report, final DownloadItem di) throws IOException {
183

    
184
		final String ptrcl = inputURL.getProtocol();
185
		if (ptrcl.startsWith("file")) {
186
			log.debug("the protocol is File, returning " + inputURL);
187
			return inputURL;
188
		}
189
		HttpURLConnection conn = (HttpURLConnection) inputURL.openConnection();
190
		conn.setInstanceFollowRedirects(true);  // you still need to handle redirect manually.
191
		conn.setReadTimeout(getReadTimeoutMs());
192
		conn.setConnectTimeout(getConnectTimeoutMs());
193
		String location = inputURL.toString();
194
		log.debug(threadId + " : followURL connecting  " + inputURL);
195
		conn.connect();
196
		log.debug(threadId + " : followURL connected  " + inputURL);
197
		int responseCode = conn.getResponseCode();
198
		log.debug(threadId + " : followURL " + inputURL + ", response code: " + responseCode);
199
		if ((responseCode >= 300) && (responseCode < 400)) {
200
			location = conn.getHeaderFields().get("Location").get(0);
201
			conn.disconnect();
202
			log.debug(threadId + " : followURL disconnected  " + inputURL);
203
		}
204

    
205
		if (!location.equals(inputURL.toString())) return new URL(location);
206
		return inputURL;
207
	}
208

    
209
	private boolean checkIfExists(final DownloadItem di) {
210
		try {
211
			return objectStore.deliverObject(di.getFileName()) != null;
212
		} catch (final ObjectStoreServiceException e) {
213
			log.debug(e.getMessage());
214
			return false;
215
		}
216
	}
217

    
218
	/**
219
	 * Sets the object store.
220
	 *
221
	 * @param objectStore
222
	 *            the objectStore to set
223
	 */
224
	public void setObjectStore(final ObjectStore objectStore) {
225
		this.objectStore = objectStore;
226
	}
227

    
228
	/**
229
	 * Sets the queue.
230
	 *
231
	 * @param queue
232
	 *            the queue to set
233
	 */
234
	public void setQueue(final BlockingQueue<String> queue) {
235
		this.queue = queue;
236
	}
237

    
238
	/**
239
	 * Sets the protocol.
240
	 *
241
	 * @param protocol
242
	 *            the protocol to set
243
	 */
244
	public void setProtocol(final Protocols protocol) {
245
		this.protocol = protocol;
246
	}
247

    
248
	/**
249
	 * Sets the mime type.
250
	 *
251
	 * @param mimeType
252
	 *            the mimeType to set
253
	 */
254
	public void setMimeType(final String mimeType) {
255
		this.mimeType = mimeType;
256
	}
257

    
258
	public Function<String, DownloadItem> getConverter() {
259
		return converter;
260
	}
261

    
262
	public void setConverter(final Function<String, DownloadItem> converter) {
263
		this.converter = converter;
264
	}
265

    
266
	public int getReadTimeoutMs() {
267
		return readTimeoutMs;
268
	}
269

    
270
	public void setReadTimeoutMs(final int readTimeoutMs) {
271
		this.readTimeoutMs = readTimeoutMs;
272
	}
273

    
274
	public int getConnectTimeoutMs() {
275
		return connectTimeoutMs;
276
	}
277

    
278
	public void setConnectTimeoutMs(final int connectTimeoutMs) {
279
		this.connectTimeoutMs = connectTimeoutMs;
280
	}
281

    
282
	public int getSleepTimeMs() {
283
		return sleepTimeMs;
284
	}
285

    
286
	public void setSleepTimeMs(final int sleepTimeMs) {
287
		this.sleepTimeMs = sleepTimeMs;
288
	}
289
}
    (1-1/1)