Project

General

Profile

1
package eu.dnetlib.data.objectstore;
2

    
3
import java.util.List;
4
import java.util.concurrent.BlockingQueue;
5
import java.util.concurrent.ExecutorService;
6
import java.util.concurrent.Executors;
7
import java.util.concurrent.Future;
8
import java.util.function.Function;
9

    
10
import com.google.common.base.Joiner;
11
import com.google.common.collect.Lists;
12
import com.google.common.collect.Queues;
13
import eu.dnetlib.data.objectstore.connector.ObjectStore;
14
import eu.dnetlib.data.objectstore.connector.ObjectStoreDao;
15
import eu.dnetlib.data.objectstore.worker.DownloadWorker;
16
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
17
import eu.dnetlib.rmi.common.ResultSet;
18
import eu.dnetlib.rmi.data.DownloadItem;
19
import eu.dnetlib.rmi.data.DownloadPlugin;
20
import eu.dnetlib.rmi.data.ObjectStoreServiceException;
21
import eu.dnetlib.rmi.data.Protocols;
22
import org.apache.commons.logging.Log;
23
import org.apache.commons.logging.LogFactory;
24
import org.springframework.beans.factory.annotation.Autowired;
25

    
26
/**
27
 * The Class DownloadServiceFeeder.
28
 */
29
public class DownloadServiceFeeder {
30

    
31
	/**
32
	 * The Constant log.
33
	 */
34
	private static final Log log = LogFactory.getLog(DownloadServiceFeeder.class);
35

    
36
	/**
37
	 * The download plugin enumerator.
38
	 */
39
	@Autowired
40
	DownloadPluginEnumeratorImpl downloadPluginEnumerator;
41

    
42
	/**
43
	 * The result set client factory.
44
	 */
45
	@Autowired
46
	private ResultSetClient resultSetClient;
47

    
48
	/**
49
	 * The object store dao.
50
	 */
51
	@Autowired
52
	private ObjectStoreDao objectStoreDao;
53

    
54
	public static void reportException(final DownloadReportMap report, final DownloadItem di, final Throwable e) {
55
		final String className = e.getClass().getName();
56
		if (!report.containsKey(className)) {
57
			final DownloadReport dr = new DownloadReport();
58
			dr.setStackTrace(Joiner.on("\tat ").join(e.getStackTrace()));
59
			if (di != null) {
60
				dr.setDownloadItem(di);
61
			}
62
			report.put(className, dr);
63
		} else {
64
			report.get(className).incrementError();
65
		}
66
	}
67

    
68
	/**
69
	 * Download and feed file into the objectStore .
70
	 *
71
	 * @param epr             the end-point reference of the result-set of Serialized DownloadItem
72
	 * @param plugin          the plugin used to retrieve the correct URL
73
	 * @param objectStoreID   the object store id to store the data
74
	 * @param protocol        the protocol used to download the file
75
	 * @param mimeType        the mime type of the Files
76
	 * @param numberOfThreads the number of threads to use for download at the same time
77
	 * @throws ObjectStoreServiceException
78
	 */
79
	public DownloadReportMap download(final String epr,
80
			final String plugin,
81
			final String objectStoreID,
82
			final String protocol,
83
			final String mimeType,
84
			final int numberOfThreads,
85
			final String basePath,
86
			final List<String> regularExpression,
87
			final int connectTimeoutMs,
88
			final int readTimeoutMs, final int sleepTimeMs) throws ObjectStoreServiceException {
89
		final DownloadPlugin downloadPlugin = downloadPluginEnumerator.get(plugin);
90
		if ((basePath != null) && (basePath.isEmpty() == false)) {
91
			downloadPlugin.setBasePath(basePath);
92
		}
93

    
94
		final Iterable<String> urlInfo = resultSetClient.iter(ResultSet.fromJson(epr), String.class);
95
		final BlockingQueue<String> itemsQueue = Queues.newArrayBlockingQueue(numberOfThreads * 5);
96
		final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
97
		final ObjectStore objStore = objectStoreDao.getObjectStore(objectStoreID);
98

    
99
		final List<Future<DownloadReportMap>> responses = Lists.newArrayList();
100
		final DownloadReportMap pluginReport = new DownloadReportMap();
101
		pluginReport.setStatus(true);
102

    
103
		if (regularExpression != null) {
104
			downloadPlugin.setRegularExpression(regularExpression);
105
		}
106
		final Function<String, DownloadItem> applyDowunloadPlugin = input -> {
107
			if (input == null) {
108
				log.error("Input is null");
109
				return null;
110
			}
111
			if (input.equals(DownloadIntoObjectStoreAction.END_QUEUE_STRING)) return DownloadIntoObjectStoreAction.END_QUEUE;
112

    
113
			DownloadItem di = null;
114
			try {
115
				di = DownloadItem.newObjectfromJSON(input);
116

    
117
				if (downloadPlugin.retrieveUrl(di) == null) {
118
					di.setUrl(null);
119
					di.setOriginalUrl(null);
120
				}
121
				return di;
122
			} catch (Throwable e) {
123
				reportException(pluginReport, di, e);
124
				log.error("Exception on transform item :" + input, e);
125
				return null;
126
			}
127
		};
128

    
129
		for (int i = 0; i < numberOfThreads; i++) {
130
			responses.add(executor
131
					.submit(new DownloadWorker(itemsQueue, objStore, Protocols.valueOf(protocol), mimeType, connectTimeoutMs, readTimeoutMs, sleepTimeMs,
132
							applyDowunloadPlugin
133
					)));
134
		}
135

    
136
		int i = 0;
137
		if (urlInfo != null) {
138
			for (final String downloadItem : urlInfo) {
139
				if (downloadItem != null) {
140
					if ((i++ % 1000) == 0) {
141
						log.debug("Read " + i);
142
					}
143
					try {
144
						itemsQueue.put(downloadItem);
145
					} catch (final Exception e) {
146
						log.error("An error occurred while populating the download items queue: " + Joiner.on("\tat ").join(e.getStackTrace()));
147
					}
148
				}
149
			}
150
		}
151

    
152
		try {
153
			itemsQueue.put(DownloadIntoObjectStoreAction.END_QUEUE_STRING);
154
		} catch (final InterruptedException e) {
155
			log.error("An error occurred adding the loop terminator: " + Joiner.on("\tat ").join(e.getStackTrace()));
156
		}
157

    
158
		final DownloadReportMap resultMap = getDownloadReportMap(responses, pluginReport);
159
		executor.shutdown();
160
		return resultMap;
161
	}
162

    
163
	private DownloadReportMap getDownloadReportMap(final List<Future<DownloadReportMap>> responses, final DownloadReportMap pluginReport) {
164
		final DownloadReportMap resultMap = new DownloadReportMap();
165
		resultMap.setStatus(true);
166

    
167
		for (final Future<DownloadReportMap> currentResponse : responses) {
168
			try {
169
				final DownloadReportMap currentMap = currentResponse.get();
170

    
171
				mergeReport(resultMap, currentMap);
172

    
173
				log.info("Status " + currentMap.getStatus());
174
				resultMap.setStatus(resultMap.getStatus() && currentMap.getStatus());
175
				resultMap.setTotalDownloaded(currentMap.getTotalDownloaded() + resultMap.getTotalDownloaded());
176

    
177
			} catch (final Exception e) {
178
				log.error(e);
179
				resultMap.setStatus(false);
180
			}
181
		}
182
		mergeReport(resultMap, pluginReport);
183
		return resultMap;
184
	}
185

    
186
	private void mergeReport(final DownloadReportMap resultMap, final DownloadReportMap currentMap) {
187
		for (final String key : currentMap.keySet()) {
188
			if (!resultMap.containsKey(key)) {
189
				resultMap.put(key, currentMap.get(key));
190
			} else {
191
				final DownloadReport currentReport = currentMap.get(key);
192
				resultMap.get(key).incrementError(currentReport.getNumberOfOccurrences());
193
			}
194
			resultMap.setTotalDownloaded(resultMap.getTotalDownloaded() + currentMap.getTotalDownloaded());
195
			resultMap.setStatus(resultMap.getStatus() & currentMap.getStatus());
196
		}
197
	}
198
}
(8-8/21)