Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.download.worker;
2
3 32258 sandro.lab
import java.io.IOException;
4
import java.net.HttpURLConnection;
5 26600 sandro.lab
import java.net.URL;
6
import java.util.concurrent.BlockingQueue;
7 31882 sandro.lab
import java.util.concurrent.Callable;
8 26600 sandro.lab
9 32020 sandro.lab
import com.google.common.base.Function;
10 32283 sandro.lab
import com.google.common.base.Joiner;
11
import eu.dnetlib.data.download.DownloadReport;
12
import eu.dnetlib.data.download.DownloadReportMap;
13 33251 sandro.lab
import eu.dnetlib.data.download.DownloadServiceImpl;
14 26600 sandro.lab
import eu.dnetlib.data.download.rmi.DownloadItem;
15
import eu.dnetlib.data.objectstore.modular.ObjectStoreRecord;
16
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
17
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
18 31831 sandro.lab
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
19 26600 sandro.lab
import eu.dnetlib.data.objectstore.rmi.Protocols;
20 40412 claudio.at
import org.apache.commons.lang.StringUtils;
21 40366 claudio.at
import org.apache.commons.logging.Log;
22
import org.apache.commons.logging.LogFactory;
23 26600 sandro.lab
24
/**
25
 * The Class DownloadWorker is a worker responsible to download the data into the object store.
26
 */
27 32283 sandro.lab
public class DownloadWorker implements Callable<DownloadReportMap> {
28 26600 sandro.lab
29 33251 sandro.lab
	/**
30
	 * The Constant log.
31
	 */
32
	private static final Log log = LogFactory.getLog(DownloadWorker.class);
33 26600 sandro.lab
34 33251 sandro.lab
	/**
35
	 * The queue.
36
	 */
37
	private BlockingQueue<String> queue = null;
38 26600 sandro.lab
39 33251 sandro.lab
	/**
40
	 * The object store.
41
	 */
42
	private ObjectStore objectStore = null;
43 26600 sandro.lab
44 33251 sandro.lab
	/**
45
	 * The protocol.
46
	 */
47
	private Protocols protocol;
48 26600 sandro.lab
49 33251 sandro.lab
	/**
50
	 * The mime type.
51
	 */
52
	private String mimeType;
53 26600 sandro.lab
54 33251 sandro.lab
	private Function<String, DownloadItem> converter;
55 32020 sandro.lab
56 33251 sandro.lab
	/**
57
	 * Instantiates a new download worker.
58
	 *
59
	 * @param queue
60
	 *            the queue
61
	 * @param objectStore
62
	 *            the object store
63
	 * @param protocol
64
	 *            the protocol
65
	 * @param mimeType
66
	 *            the mime type
67
	 */
68
	public DownloadWorker(final BlockingQueue<String> queue, final ObjectStore objectStore, final Protocols protocol, final String mimeType,
69
			final Function<String, DownloadItem> converter) {
70
		this.setConverter(converter);
71
		this.setQueue(queue);
72
		this.setObjectStore(objectStore);
73
		this.setMimeType(mimeType);
74
		this.setProtocol(protocol);
75
	}
76 26600 sandro.lab
77 33251 sandro.lab
	/*
78
	 * (non-Javadoc)
79 36413 sandro.lab
	 *
80 33251 sandro.lab
	 * @see java.lang.Runnable#run()
81
	 */
82
	@Override
83
	public DownloadReportMap call() throws Exception {
84 33912 claudio.at
		final DownloadReportMap report = new DownloadReportMap();
85 40412 claudio.at
		final long threadId = Thread.currentThread().getId();
86 33251 sandro.lab
		try {
87 33905 sandro.lab
88 33912 claudio.at
			String takedObject = queue.take();
89 33905 sandro.lab
			DownloadItem di = getConverter().apply(takedObject);
90
			if (log.isDebugEnabled()) {
91
				log.debug(takedObject);
92
			}
93
94 33251 sandro.lab
			while (DownloadServiceImpl.END_QUEUE != di) {
95 36413 sandro.lab
96 40412 claudio.at
				log.debug(threadId + ": Reading " + takedObject);
97 36413 sandro.lab
98 33251 sandro.lab
				if (di == null) {
99 33905 sandro.lab
					log.info("the current download item is Null, skipping");
100 33251 sandro.lab
				}
101 40412 claudio.at
				if ((di != null) && StringUtils.isNotBlank(di.getUrl()) && !checkIfExists(di)) {
102 33251 sandro.lab
					try {
103 33912 claudio.at
						final URL toDownload = followURL(new URL(di.getUrl()), report, di);
104
						final ObjectStoreRecord record = new ObjectStoreRecord();
105
						final ObjectStoreFile metadata = new ObjectStoreFile();
106 33251 sandro.lab
						metadata.setObjectID(di.getFileName());
107
						metadata.setMetadataRelatedID(di.getIdItemMetadata());
108
						metadata.setAccessProtocol(this.protocol);
109
						metadata.setMimeType(this.mimeType);
110
						metadata.setDownloadedURL(di.getOriginalUrl());
111
						record.setFileMetadata(metadata);
112
						record.setInputStream(toDownload.openStream());
113
						objectStore.feedObjectRecord(record);
114
						report.addDowload();
115
						log.debug("Saved object " + metadata.toJSON());
116 36413 sandro.lab
					} catch (final Throwable e) {
117 40412 claudio.at
						log.error("An error occurred processing the Download Item: " + di.toJSON(), e);
118 40366 claudio.at
						reportException(report, di, e);
119 33251 sandro.lab
					}
120
				}
121 33912 claudio.at
				takedObject = queue.take();
122 40412 claudio.at
				log.debug(threadId + ": Next Object From Queue  " + takedObject);
123 33905 sandro.lab
				di = getConverter().apply(takedObject);
124 33912 claudio.at
				if (log.isDebugEnabled()) {
125
					log.debug(takedObject);
126
				}
127 33251 sandro.lab
			}
128
			queue.put(DownloadServiceImpl.END_QUEUE_STRING);
129 33912 claudio.at
		} catch (final Exception e) {
130 33905 sandro.lab
			log.error("An error occured : " + Joiner.on("\tat ").join(e.getStackTrace()));
131 40412 claudio.at
			reportException(report, null, e);
132 33251 sandro.lab
			report.setStatus(false);
133
			return report;
134
		}
135 26600 sandro.lab
136 40412 claudio.at
		log.info("CLOSED THREAD " + threadId);
137 33251 sandro.lab
		report.setStatus(true);
138
		return report;
139
	}
140 26600 sandro.lab
141 40366 claudio.at
	private void reportException(final DownloadReportMap report, final DownloadItem di, final Throwable e) {
142 33912 claudio.at
		final String className = e.getClass().getName();
143 33251 sandro.lab
		if (!report.containsKey(className)) {
144 33912 claudio.at
			final DownloadReport dr = new DownloadReport();
145 33251 sandro.lab
			dr.setStackTrace(Joiner.on("\tat ").join(e.getStackTrace()));
146 40412 claudio.at
			if (di != null) {
147
				dr.setRecordId(di.getIdItemMetadata());
148
			}
149 33251 sandro.lab
			report.put(className, dr);
150
		} else {
151
			report.get(className).incrementError();
152
		}
153
	}
154 32258 sandro.lab
155 36413 sandro.lab
	private URL followURL(final URL inputURL, final DownloadReportMap report, final DownloadItem di) throws IOException {
156 32258 sandro.lab
157 33912 claudio.at
		final String ptrcl = inputURL.getProtocol();
158 33251 sandro.lab
		if (ptrcl.startsWith("file")) {
159 33905 sandro.lab
			log.debug("the protocol is File, returning " + inputURL);
160 33251 sandro.lab
			return inputURL;
161
		}
162 32283 sandro.lab
163 33251 sandro.lab
		HttpURLConnection conn;
164 32258 sandro.lab
165 36413 sandro.lab
		conn = (HttpURLConnection) inputURL.openConnection();
166
		conn.setInstanceFollowRedirects(true);  // you still need to handle redirect manully.
167
		HttpURLConnection.setFollowRedirects(true);
168
		String location = inputURL.toString();
169
		if ((conn.getResponseCode() >= 300) && (conn.getResponseCode() < 400)) {
170
			location = conn.getHeaderFields().get("Location").get(0);
171
			conn.disconnect();
172
		}
173 32258 sandro.lab
174 36413 sandro.lab
		if (!location.equals(inputURL.toString())) return new URL(location);
175
		return inputURL;
176 33251 sandro.lab
	}
177 32258 sandro.lab
178 33251 sandro.lab
	private boolean checkIfExists(final DownloadItem di) {
179
		try {
180
			return objectStore.deliverObject(di.getFileName()) != null;
181 33912 claudio.at
		} catch (final ObjectStoreServiceException e) {
182 33920 claudio.at
			log.debug(e.getMessage());
183 33251 sandro.lab
			return false;
184
		}
185
	}
186 31831 sandro.lab
187 33251 sandro.lab
	/**
188
	 * Sets the object store.
189
	 *
190
	 * @param objectStore
191
	 *            the objectStore to set
192
	 */
193
	public void setObjectStore(final ObjectStore objectStore) {
194
		this.objectStore = objectStore;
195
	}
196 26600 sandro.lab
197 33251 sandro.lab
	/**
198
	 * Sets the queue.
199
	 *
200
	 * @param queue
201
	 *            the queue to set
202
	 */
203
	public void setQueue(final BlockingQueue<String> queue) {
204
		this.queue = queue;
205
	}
206 26600 sandro.lab
207 33251 sandro.lab
	/**
208
	 * Sets the protocol.
209
	 *
210
	 * @param protocol
211
	 *            the protocol to set
212
	 */
213
	public void setProtocol(final Protocols protocol) {
214
		this.protocol = protocol;
215
	}
216 26600 sandro.lab
217 33251 sandro.lab
	/**
218
	 * Sets the mime type.
219
	 *
220
	 * @param mimeType
221
	 *            the mimeType to set
222
	 */
223
	public void setMimeType(final String mimeType) {
224
		this.mimeType = mimeType;
225
	}
226 26600 sandro.lab
227 33251 sandro.lab
	public Function<String, DownloadItem> getConverter() {
228
		return converter;
229
	}
230 32020 sandro.lab
231 33251 sandro.lab
	public void setConverter(final Function<String, DownloadItem> converter) {
232
		this.converter = converter;
233
	}
234 26600 sandro.lab
}