Project

General

Profile

1
package eu.dnetlib.data.download.worker;
2

    
3
import java.io.IOException;
4
import java.net.HttpURLConnection;
5
import java.net.URL;
6
import java.util.concurrent.BlockingQueue;
7
import java.util.concurrent.Callable;
8

    
9
import com.google.common.base.Function;
10
import com.google.common.base.Joiner;
11
import eu.dnetlib.data.download.DownloadReport;
12
import eu.dnetlib.data.download.DownloadReportMap;
13
import eu.dnetlib.data.download.DownloadServiceImpl;
14
import eu.dnetlib.data.download.rmi.DownloadItem;
15
import eu.dnetlib.data.objectstore.modular.ObjectStoreRecord;
16
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
17
import eu.dnetlib.data.objectstore.rmi.ObjectStoreFile;
18
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
19
import eu.dnetlib.data.objectstore.rmi.Protocols;
20
import org.apache.commons.lang.StringUtils;
21
import org.apache.commons.logging.Log;
22
import org.apache.commons.logging.LogFactory;
23

    
24
/**
25
 * The Class DownloadWorker is a worker responsible to download the data into the object store.
26
 */
27
public class DownloadWorker implements Callable<DownloadReportMap> {
28

    
29
	/**
30
	 * The Constant log.
31
	 */
32
	private static final Log log = LogFactory.getLog(DownloadWorker.class);
33

    
34
	/**
35
	 * The queue.
36
	 */
37
	private BlockingQueue<String> queue = null;
38

    
39
	/**
40
	 * The object store.
41
	 */
42
	private ObjectStore objectStore = null;
43

    
44
	/**
45
	 * The protocol.
46
	 */
47
	private Protocols protocol;
48

    
49
	/**
50
	 * The mime type.
51
	 */
52
	private String mimeType;
53

    
54
	private Function<String, DownloadItem> converter;
55

    
56
	/**
57
	 * Instantiates a new download worker.
58
	 *
59
	 * @param queue
60
	 *            the queue
61
	 * @param objectStore
62
	 *            the object store
63
	 * @param protocol
64
	 *            the protocol
65
	 * @param mimeType
66
	 *            the mime type
67
	 */
68
	public DownloadWorker(final BlockingQueue<String> queue, final ObjectStore objectStore, final Protocols protocol, final String mimeType,
69
			final Function<String, DownloadItem> converter) {
70
		this.setConverter(converter);
71
		this.setQueue(queue);
72
		this.setObjectStore(objectStore);
73
		this.setMimeType(mimeType);
74
		this.setProtocol(protocol);
75
	}
76

    
77
	/*
78
	 * (non-Javadoc)
79
	 *
80
	 * @see java.lang.Runnable#run()
81
	 */
82
	@Override
83
	public DownloadReportMap call() throws Exception {
84
		final DownloadReportMap report = new DownloadReportMap();
85
		final long threadId = Thread.currentThread().getId();
86
		try {
87

    
88
			String takedObject = queue.take();
89
			DownloadItem di = getConverter().apply(takedObject);
90
			if (log.isDebugEnabled()) {
91
				log.debug(takedObject);
92
			}
93

    
94
			while (DownloadServiceImpl.END_QUEUE != di) {
95

    
96
				log.debug(threadId + ": Reading " + takedObject);
97

    
98
				if (di == null) {
99
					log.info("the current download item is Null, skipping");
100
				}
101
				if ((di != null) && StringUtils.isNotBlank(di.getUrl()) && !checkIfExists(di)) {
102
					try {
103
						final URL toDownload = followURL(new URL(di.getUrl()), report, di);
104
						final ObjectStoreRecord record = new ObjectStoreRecord();
105
						final ObjectStoreFile metadata = new ObjectStoreFile();
106
						metadata.setObjectID(di.getFileName());
107
						metadata.setMetadataRelatedID(di.getIdItemMetadata());
108
						metadata.setAccessProtocol(this.protocol);
109
						metadata.setMimeType(this.mimeType);
110
						metadata.setDownloadedURL(di.getOriginalUrl());
111
						record.setFileMetadata(metadata);
112
						record.setInputStream(toDownload.openStream());
113
						objectStore.feedObjectRecord(record);
114
						report.addDowload();
115
						log.debug("Saved object " + metadata.toJSON());
116
					} catch (final Throwable e) {
117
						log.error("An error occurred processing the Download Item: " + di.toJSON(), e);
118
						reportException(report, di, e);
119
					}
120
				}
121
				takedObject = queue.take();
122
				log.debug(threadId + ": Next Object From Queue  " + takedObject);
123
				di = getConverter().apply(takedObject);
124
				if (log.isDebugEnabled()) {
125
					log.debug(takedObject);
126
				}
127
			}
128
			queue.put(DownloadServiceImpl.END_QUEUE_STRING);
129
		} catch (final Exception e) {
130
			log.error("An error occured : " + Joiner.on("\tat ").join(e.getStackTrace()));
131
			reportException(report, null, e);
132
			report.setStatus(false);
133
			return report;
134
		}
135

    
136
		log.info("CLOSED THREAD " + threadId);
137
		report.setStatus(true);
138
		return report;
139
	}
140

    
141
	private void reportException(final DownloadReportMap report, final DownloadItem di, final Throwable e) {
142
		final String className = e.getClass().getName();
143
		if (!report.containsKey(className)) {
144
			final DownloadReport dr = new DownloadReport();
145
			dr.setStackTrace(Joiner.on("\tat ").join(e.getStackTrace()));
146
			if (di != null) {
147
				dr.setRecordId(di.getIdItemMetadata());
148
			}
149
			report.put(className, dr);
150
		} else {
151
			report.get(className).incrementError();
152
		}
153
	}
154

    
155
	private URL followURL(final URL inputURL, final DownloadReportMap report, final DownloadItem di) throws IOException {
156

    
157
		final String ptrcl = inputURL.getProtocol();
158
		if (ptrcl.startsWith("file")) {
159
			log.debug("the protocol is File, returning " + inputURL);
160
			return inputURL;
161
		}
162

    
163
		HttpURLConnection conn;
164

    
165
		conn = (HttpURLConnection) inputURL.openConnection();
166
		conn.setInstanceFollowRedirects(true);  // you still need to handle redirect manully.
167
		HttpURLConnection.setFollowRedirects(true);
168
		String location = inputURL.toString();
169
		if ((conn.getResponseCode() >= 300) && (conn.getResponseCode() < 400)) {
170
			location = conn.getHeaderFields().get("Location").get(0);
171
			conn.disconnect();
172
		}
173

    
174
		if (!location.equals(inputURL.toString())) return new URL(location);
175
		return inputURL;
176
	}
177

    
178
	private boolean checkIfExists(final DownloadItem di) {
179
		try {
180
			return objectStore.deliverObject(di.getFileName()) != null;
181
		} catch (final ObjectStoreServiceException e) {
182
			log.debug(e.getMessage());
183
			return false;
184
		}
185
	}
186

    
187
	/**
188
	 * Sets the object store.
189
	 *
190
	 * @param objectStore
191
	 *            the objectStore to set
192
	 */
193
	public void setObjectStore(final ObjectStore objectStore) {
194
		this.objectStore = objectStore;
195
	}
196

    
197
	/**
198
	 * Sets the queue.
199
	 *
200
	 * @param queue
201
	 *            the queue to set
202
	 */
203
	public void setQueue(final BlockingQueue<String> queue) {
204
		this.queue = queue;
205
	}
206

    
207
	/**
208
	 * Sets the protocol.
209
	 *
210
	 * @param protocol
211
	 *            the protocol to set
212
	 */
213
	public void setProtocol(final Protocols protocol) {
214
		this.protocol = protocol;
215
	}
216

    
217
	/**
218
	 * Sets the mime type.
219
	 *
220
	 * @param mimeType
221
	 *            the mimeType to set
222
	 */
223
	public void setMimeType(final String mimeType) {
224
		this.mimeType = mimeType;
225
	}
226

    
227
	public Function<String, DownloadItem> getConverter() {
228
		return converter;
229
	}
230

    
231
	public void setConverter(final Function<String, DownloadItem> converter) {
232
		this.converter = converter;
233
	}
234
}
    (1-1/1)