Project

General

Profile

1
package eu.dnetlib.data.download.worker;
2

    
3
import java.io.IOException;
4
import java.net.HttpURLConnection;
5
import java.net.URL;
6
import java.net.URLConnection;
7
import java.util.concurrent.BlockingQueue;
8
import java.util.concurrent.Callable;
9
import java.util.concurrent.TimeUnit;
10

    
11
import com.google.common.base.Function;
12
import com.google.common.base.Joiner;
13
import eu.dnetlib.data.download.DownloadReportMap;
14
import eu.dnetlib.data.download.DownloadServiceImpl;
15
import eu.dnetlib.data.download.rmi.DownloadItem;
16
import eu.dnetlib.data.download.rmi.DownloadServiceFeeder;
17
import eu.dnetlib.data.objectstore.modular.ObjectStoreRecord;
18
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
19
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
20
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
21
import eu.dnetlib.data.objectstore.rmi.Protocols;
22
import org.apache.commons.lang3.StringUtils;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25

    
26
/**
27
 * The Class DownloadWorker is a worker responsible to download the data into the object store.
28
 */
29
public class DownloadWorker implements Callable<DownloadReportMap> {
30

    
31
	/**
32
	 * The Constant log.
33
	 */
34
	private static final Log log = LogFactory.getLog(DownloadWorker.class);
35
	private static final int MAX_NULLS = 5 ;
36

    
37
	/**
38
	 * The queue.
39
	 */
40
	private BlockingQueue<String> queue = null;
41

    
42
	/**
43
	 * The object store.
44
	 */
45
	private ObjectStore objectStore = null;
46

    
47
	/**
48
	 * The protocol.
49
	 */
50
	private Protocols protocol;
51

    
52
	/**
53
	 * The mime type.
54
	 */
55
	private String mimeType;
56

    
57
	private Function<String, DownloadItem> converter;
58

    
59
	private int connectTimeoutMs;
60
	private int readTimeoutMs;
61

    
62
	/**
63
	 * ms to wait between two subsequent download request
64
	 **/
65
	private int sleepTimeMs;
66
	
67
	private String userAgent = "Mozilla/5.0 (compatible; OAI; +http://www.openaire.eu)";
68

    
69
	/**
70
	 * Instantiates a new download worker.
71
	 *
72
	 * @param queue
73
	 *            the queue
74
	 * @param objectStore
75
	 *            the object store
76
	 * @param protocol
77
	 *            the protocol
78
	 * @param mimeType
79
	 *            the mime type
80
	 */
81
	public DownloadWorker(final BlockingQueue<String> queue,
82
			final ObjectStore objectStore,
83
			final Protocols protocol,
84
			final String mimeType,
85
			final int connectTimeoutMs,
86
			final int readTimeoutMs, final int sleepTimeMs,
87
			final Function<String, DownloadItem> converter) {
88
		this.setConverter(converter);
89
		this.setQueue(queue);
90
		this.setObjectStore(objectStore);
91
		this.setMimeType(mimeType);
92
		this.setProtocol(protocol);
93
		this.setConnectTimeoutMs(connectTimeoutMs);
94
		this.setReadTimeoutMs(readTimeoutMs);
95
		this.setSleepTimeMs(sleepTimeMs);
96
	}
97

    
98
	/*
99
	 * (non-Javadoc)
100
	 *
101
	 * @see java.lang.Runnable#run()
102
	 */
103
	@Override
104
	public DownloadReportMap call() throws Exception {
105
		final DownloadReportMap report = new DownloadReportMap();
106
		final long threadId = Thread.currentThread().getId();
107
		int nullCounter = 0;
108
		try {
109

    
110
			String takedObject = queue.poll(5, TimeUnit.SECONDS);
111

    
112
			while (!DownloadServiceImpl.END_QUEUE_STRING.equals(takedObject) && nullCounter < MAX_NULLS) {
113

    
114
				if (takedObject == null) {
115
					nullCounter++;
116
				}
117
				if (log.isDebugEnabled()) {
118
					log.debug(takedObject);
119
				}
120
				final DownloadItem di = getConverter().apply(takedObject);
121

    
122
				log.debug(threadId + ": Reading " + takedObject);
123

    
124
				if (di == null) {
125
					log.debug("the current download item is Null, skipping");
126
					DownloadServiceFeeder.reportException(report, null, new IllegalArgumentException("found null DownloadItem"));
127
				} else if (StringUtils.isNotBlank(di.getUrl()) && !checkIfExists(di)) {
128
					doDownload(threadId, report, di);
129
				}
130
				takedObject = queue.poll(5, TimeUnit.SECONDS);
131
				log.debug(String.format("%s: next object from queue %s, remaining items: %s", threadId, takedObject, queue.size()));
132
			}
133
		} catch (final Exception e) {
134
			log.debug("An error occured : " + Joiner.on("\tat ").join(e.getStackTrace()));
135
			DownloadServiceFeeder.reportException(report, null, e);
136
			report.setStatus(false);
137
			return report;
138
		} finally {
139
			log.info(String.format("%s: finalising queue, remaining items: %s, nulls: %s", threadId, queue.size(), nullCounter));
140
			final boolean res = queue.offer(DownloadServiceImpl.END_QUEUE_STRING, 5, TimeUnit.SECONDS);
141
			log.info("put terminator in queue: " + res);
142
		}
143

    
144
		log.info("CLOSED THREAD " + threadId);
145
		report.setStatus(true);
146
		return report;
147
	}
148

    
149
	public void doDownload(final long threadId, final DownloadReportMap report, final DownloadItem di) {
150
		try {
151
			if (getSleepTimeMs() > 0) {
152
				log.debug(threadId + ": I will sleep for " + getSleepTimeMs() + " ms, as requested...");
153
				Thread.sleep(getSleepTimeMs());
154
			}
155

    
156
			final URL toDownload = followURL(threadId, new URL(di.getUrl()), report, di);
157
			final ObjectStoreRecord record = new ObjectStoreRecord();
158
			final ObjectStoreFile metadata = new ObjectStoreFile();
159
			metadata.setObjectID(di.getFileName());
160
			metadata.setMetadataRelatedID(di.getIdItemMetadata());
161
			metadata.setAccessProtocol(this.protocol);
162
			metadata.setMimeType(this.mimeType);
163
			metadata.setDownloadedURL(di.getOriginalUrl());
164
			record.setFileMetadata(metadata);
165

    
166
			log.debug(threadId + ": opening connection " + toDownload);
167
			final URLConnection connection = toDownload.openConnection();
168
			// set user-agent
169
			connection.setRequestProperty("User-Agent", userAgent);
170
			connection.setConnectTimeout(getConnectTimeoutMs());
171
			connection.setReadTimeout(getReadTimeoutMs());
172
			connection.setRequestProperty("User-Agent", "OpenAIRE aggregation system/2.2.5 ; (OpenAccess aggregator; mailto:helpdesk@openaire.eu) ");
173

    
174
			log.debug(threadId + ": getting input stream from " + toDownload);
175
			record.setInputStream(connection.getInputStream());
176
			log.debug(threadId + ": feeding object from  " + toDownload + " into objectstore ...");
177
			objectStore.feedObjectRecord(record);
178
			report.addDowload();
179
			log.debug(threadId + ": saved object " + metadata.toJSON());
180
		} catch (final Throwable e) {
181
			log.debug(threadId + ": error downloading Item: " + di.toJSON(), e);
182
			DownloadServiceFeeder.reportException(report, di, e);
183
		}
184
	}
185

    
186
	private URL followURL(final long threadId, final URL inputURL, final DownloadReportMap report, final DownloadItem di) throws IOException {
187

    
188
		final String ptrcl = inputURL.getProtocol();
189
		if (ptrcl.startsWith("file")) {
190
			log.debug("the protocol is File, returning " + inputURL);
191
			return inputURL;
192
		}
193
		HttpURLConnection conn = (HttpURLConnection) inputURL.openConnection();
194
		conn.setInstanceFollowRedirects(true);  // you still need to handle redirect manually.
195
		conn.setReadTimeout(getReadTimeoutMs());
196
		conn.setConnectTimeout(getConnectTimeoutMs());
197
		// set user-agent
198
		conn.setRequestProperty("User-Agent", userAgent);
199

    
200
		String location = inputURL.toString();
201
		log.debug(threadId + " : followURL connecting  " + inputURL);
202
		conn.connect();
203
		log.debug(threadId + " : followURL connected  " + inputURL);
204
		int responseCode = conn.getResponseCode();
205
		log.debug(threadId + " : followURL " + inputURL + ", response code: " + responseCode);
206
		if ((responseCode >= 300) && (responseCode < 400)) {
207
			location = conn.getHeaderFields().get("Location").get(0);
208
			conn.disconnect();
209
			log.debug(threadId + " : followURL disconnected  " + inputURL);
210
		}
211

    
212
		if (!location.equals(inputURL.toString())) return new URL(location);
213
		return inputURL;
214
	}
215

    
216
	private boolean checkIfExists(final DownloadItem di) {
217
		try {
218
			return objectStore.deliverObject(di.getFileName()) != null;
219
		} catch (final ObjectStoreServiceException e) {
220
			log.debug(e.getMessage());
221
			return false;
222
		}
223
	}
224

    
225
	/**
226
	 * Sets the object store.
227
	 *
228
	 * @param objectStore
229
	 *            the objectStore to set
230
	 */
231
	public void setObjectStore(final ObjectStore objectStore) {
232
		this.objectStore = objectStore;
233
	}
234

    
235
	/**
236
	 * Sets the queue.
237
	 *
238
	 * @param queue
239
	 *            the queue to set
240
	 */
241
	public void setQueue(final BlockingQueue<String> queue) {
242
		this.queue = queue;
243
	}
244

    
245
	/**
246
	 * Sets the protocol.
247
	 *
248
	 * @param protocol
249
	 *            the protocol to set
250
	 */
251
	public void setProtocol(final Protocols protocol) {
252
		this.protocol = protocol;
253
	}
254

    
255
	/**
256
	 * Sets the mime type.
257
	 *
258
	 * @param mimeType
259
	 *            the mimeType to set
260
	 */
261
	public void setMimeType(final String mimeType) {
262
		this.mimeType = mimeType;
263
	}
264

    
265
	public Function<String, DownloadItem> getConverter() {
266
		return converter;
267
	}
268

    
269
	public void setConverter(final Function<String, DownloadItem> converter) {
270
		this.converter = converter;
271
	}
272

    
273
	public int getReadTimeoutMs() {
274
		return readTimeoutMs;
275
	}
276

    
277
	public void setReadTimeoutMs(final int readTimeoutMs) {
278
		this.readTimeoutMs = readTimeoutMs;
279
	}
280

    
281
	public int getConnectTimeoutMs() {
282
		return connectTimeoutMs;
283
	}
284

    
285
	public void setConnectTimeoutMs(final int connectTimeoutMs) {
286
		this.connectTimeoutMs = connectTimeoutMs;
287
	}
288

    
289
	public int getSleepTimeMs() {
290
		return sleepTimeMs;
291
	}
292

    
293
	public void setSleepTimeMs(final int sleepTimeMs) {
294
		this.sleepTimeMs = sleepTimeMs;
295
	}
296
}
    (1-1/1)