Project

General

Profile

1 42157 sandro.lab
package eu.dnetlib.data.objectstore;
2 26600 sandro.lab
3 31882 sandro.lab
import java.util.List;
4 33251 sandro.lab
import java.util.concurrent.BlockingQueue;
5
import java.util.concurrent.ExecutorService;
6
import java.util.concurrent.Executors;
7
import java.util.concurrent.Future;
8 42157 sandro.lab
import java.util.function.Function;
9 26600 sandro.lab
import javax.annotation.Resource;
10
11 33912 claudio.at
import com.google.common.base.Joiner;
12 31882 sandro.lab
import com.google.common.collect.Lists;
13 40523 claudio.at
import com.google.common.collect.Queues;
14 42157 sandro.lab
import eu.dnetlib.data.objectstore.connector.ObjectStore;
15
import eu.dnetlib.data.objectstore.connector.ObjectStoreDao;
16
import eu.dnetlib.data.objectstore.worker.DownloadWorker;
17
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
18
import eu.dnetlib.rmi.common.ResultSet;
19 42170 sandro.lab
import eu.dnetlib.rmi.data.DownloadItem;
20
import eu.dnetlib.rmi.data.DownloadPlugin;
21
import eu.dnetlib.rmi.data.ObjectStoreServiceException;
22
import eu.dnetlib.rmi.data.Protocols;
23 40523 claudio.at
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.springframework.beans.factory.annotation.Autowired;
26 26600 sandro.lab
27
/**
28
 * The Class DownloadServiceFeeder.
29
 */
30
public class DownloadServiceFeeder {
31
32 42157 sandro.lab
	/**
33
	 * The Constant log.
34
	 */
35 26600 sandro.lab
	private static final Log log = LogFactory.getLog(DownloadServiceFeeder.class);
36
37 42157 sandro.lab
	/**
38
	 * The download plugin enumerator.
39
	 */
40 26600 sandro.lab
	@Resource
41 33251 sandro.lab
	DownloadPluginEnumeratorImpl downloadPluginEnumerator;
42 26600 sandro.lab
43 42157 sandro.lab
	/**
44
	 * The result set client factory.
45
	 */
46 26600 sandro.lab
	@Resource
47 42157 sandro.lab
	private ResultSetClient resultSetClient;
48 26600 sandro.lab
49 42157 sandro.lab
	/**
50
	 * The object store dao.
51
	 */
52 26600 sandro.lab
	@Autowired
53
	private ObjectStoreDao objectStoreDao;
54
55 42157 sandro.lab
	public static void reportException(final DownloadReportMap report, final DownloadItem di, final Throwable e) {
56
		final String className = e.getClass().getName();
57
		if (!report.containsKey(className)) {
58
			final DownloadReport dr = new DownloadReport();
59
			dr.setStackTrace(Joiner.on("\tat ").join(e.getStackTrace()));
60
			if (di != null) {
61
				dr.setDownloadItem(di);
62
			}
63
			report.put(className, dr);
64
		} else {
65
			report.get(className).incrementError();
66
		}
67
	}
68
69 26600 sandro.lab
	/**
70
	 * Download and feed file into the objectStore .
71 31408 sandro.lab
	 *
72 42157 sandro.lab
	 * @param epr             the end-point reference of the result-set of Serialized DownloadItem
73
	 * @param plugin          the plugin used to retrieve the correct URL
74
	 * @param objectStoreID   the object store id to store the data
75
	 * @param protocol        the protocol used to download the file
76
	 * @param mimeType        the mime type of the Files
77
	 * @param numberOfThreads the number of threads to use for download at the same time
78 37002 sandro.lab
	 * @throws ObjectStoreServiceException
79 26600 sandro.lab
	 */
80 32283 sandro.lab
	public DownloadReportMap download(final String epr,
81 26600 sandro.lab
			final String plugin,
82
			final String objectStoreID,
83
			final String protocol,
84
			final String mimeType,
85 33251 sandro.lab
			final int numberOfThreads,
86 37568 sandro.lab
			final String basePath,
87 41998 alessia.ba
			final List<String> regularExpression,
88
			final int connectTimeoutMs,
89 42170 sandro.lab
			final int readTimeoutMs, final int sleepTimeMs) throws ObjectStoreServiceException {
90 26600 sandro.lab
		final DownloadPlugin downloadPlugin = downloadPluginEnumerator.get(plugin);
91 36413 sandro.lab
		if ((basePath != null) && (basePath.isEmpty() == false)) {
92 33251 sandro.lab
			downloadPlugin.setBasePath(basePath);
93
		}
94 26600 sandro.lab
95 42157 sandro.lab
		final Iterable<String> urlInfo = resultSetClient.iter(ResultSet.fromJson(epr), String.class);
96 42170 sandro.lab
		final BlockingQueue<String> itemsQueue = Queues.newArrayBlockingQueue(numberOfThreads * 5);
97 33912 claudio.at
		final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
98
		final ObjectStore objStore = objectStoreDao.getObjectStore(objectStoreID);
99 26600 sandro.lab
100 33912 claudio.at
		final List<Future<DownloadReportMap>> responses = Lists.newArrayList();
101 40523 claudio.at
		final DownloadReportMap pluginReport = new DownloadReportMap();
102
		pluginReport.setStatus(true);
103 31882 sandro.lab
104 38924 claudio.at
		if (regularExpression != null) {
105 37568 sandro.lab
			downloadPlugin.setRegularExpression(regularExpression);
106
		}
107
108 26600 sandro.lab
		for (int i = 0; i < numberOfThreads; i++) {
109 42157 sandro.lab
			final Function<String, DownloadItem> applyDowunloadPlugin = input -> {
110
				if (input == null) {
111
					log.error("Input is null");
112
					return null;
113
				}
114
				if (input.equals(DownloadIntoObjectStoreAction.END_QUEUE_STRING)) return DownloadIntoObjectStoreAction.END_QUEUE;
115 33251 sandro.lab
116 42157 sandro.lab
				DownloadItem di = null;
117
				try {
118
					di = DownloadItem.newObjectfromJSON(input);
119 40523 claudio.at
120 42157 sandro.lab
					if (downloadPlugin.retrieveUrl(di) == null) {
121
						di.setUrl(null);
122
						di.setOriginalUrl(null);
123 33251 sandro.lab
					}
124 42157 sandro.lab
					return di;
125
				} catch (Throwable e) {
126
					reportException(pluginReport, di, e);
127
					log.error("Exception on transform item :" + input, e);
128
					return null;
129 33251 sandro.lab
				}
130 42157 sandro.lab
			};
131
			responses.add(executor
132
					.submit(new DownloadWorker(itemsQueue, objStore, Protocols.valueOf(protocol), mimeType, connectTimeoutMs, readTimeoutMs, sleepTimeMs,
133
							applyDowunloadPlugin
134
					)));
135 26600 sandro.lab
		}
136 33251 sandro.lab
137
		int i = 0;
138 38924 claudio.at
		if (urlInfo != null) {
139 37586 sandro.lab
			for (final String downloadItem : urlInfo) {
140
				if (downloadItem != null) {
141
					if ((i++ % 1000) == 0) {
142
						log.debug("Read " + i);
143
					}
144
					try {
145
						itemsQueue.put(downloadItem);
146
					} catch (final Exception e) {
147
						log.error("An error occurred while populating the download items queue: " + Joiner.on("\tat ").join(e.getStackTrace()));
148
					}
149 33251 sandro.lab
				}
150 26600 sandro.lab
			}
151
		}
152
153
		try {
154 42157 sandro.lab
			itemsQueue.put(DownloadIntoObjectStoreAction.END_QUEUE_STRING);
155 33912 claudio.at
		} catch (final InterruptedException e) {
156
			log.error("An error occurred adding the loop terminator: " + Joiner.on("\tat ").join(e.getStackTrace()));
157 26600 sandro.lab
		}
158 31882 sandro.lab
159 40523 claudio.at
		final DownloadReportMap resultMap = getDownloadReportMap(responses, pluginReport);
160 40501 claudio.at
		executor.shutdown();
161
		return resultMap;
162 40523 claudio.at
	}
163 40501 claudio.at
164 40523 claudio.at
	private DownloadReportMap getDownloadReportMap(final List<Future<DownloadReportMap>> responses, final DownloadReportMap pluginReport) {
165 33912 claudio.at
		final DownloadReportMap resultMap = new DownloadReportMap();
166 33251 sandro.lab
		resultMap.setStatus(true);
167 31882 sandro.lab
168 40512 claudio.at
		for (final Future<DownloadReportMap> currentResponse : responses) {
169 33251 sandro.lab
			try {
170 40512 claudio.at
				final DownloadReportMap currentMap = currentResponse.get();
171
172 40523 claudio.at
				mergeReport(resultMap, currentMap);
173 40512 claudio.at
174 33251 sandro.lab
				log.info("Status " + currentMap.getStatus());
175
				resultMap.setStatus(resultMap.getStatus() && currentMap.getStatus());
176
				resultMap.setTotalDownloaded(currentMap.getTotalDownloaded() + resultMap.getTotalDownloaded());
177 31882 sandro.lab
178 33912 claudio.at
			} catch (final Exception e) {
179 33251 sandro.lab
				log.error(e);
180
				resultMap.setStatus(false);
181
			}
182
		}
183 40523 claudio.at
		mergeReport(resultMap, pluginReport);
184 33251 sandro.lab
		return resultMap;
185 26600 sandro.lab
	}
186 40523 claudio.at
187
	private void mergeReport(final DownloadReportMap resultMap, final DownloadReportMap currentMap) {
188
		for (final String key : currentMap.keySet()) {
189
			if (!resultMap.containsKey(key)) {
190
				resultMap.put(key, currentMap.get(key));
191
			} else {
192
				final DownloadReport currentReport = currentMap.get(key);
193
				resultMap.get(key).incrementError(currentReport.getNumberOfOccurrences());
194
			}
195 42117 claudio.at
			resultMap.setTotalDownloaded(resultMap.getTotalDownloaded() + currentMap.getTotalDownloaded());
196
			resultMap.setStatus(resultMap.getStatus() & currentMap.getStatus());
197 40523 claudio.at
		}
198
	}
199 26600 sandro.lab
}