Project

General

Profile

1
package eu.dnetlib.data.objectstore;
2

    
3
import java.util.List;
4
import java.util.concurrent.BlockingQueue;
5
import java.util.concurrent.ExecutorService;
6
import java.util.concurrent.Executors;
7
import java.util.concurrent.Future;
8
import java.util.function.Function;
9
import javax.annotation.Resource;
10

    
11
import com.google.common.base.Joiner;
12
import com.google.common.collect.Lists;
13
import com.google.common.collect.Queues;
14
import eu.dnetlib.data.objectstore.connector.ObjectStore;
15
import eu.dnetlib.data.objectstore.connector.ObjectStoreDao;
16
import eu.dnetlib.data.objectstore.worker.DownloadWorker;
17
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
18
import eu.dnetlib.rmi.common.ResultSet;
19
import eu.dnetlib.rmi.data.DownloadItem;
20
import eu.dnetlib.rmi.data.DownloadPlugin;
21
import eu.dnetlib.rmi.data.ObjectStoreServiceException;
22
import eu.dnetlib.rmi.data.Protocols;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.springframework.beans.factory.annotation.Autowired;
26

    
27
/**
28
 * The Class DownloadServiceFeeder.
29
 */
30
public class DownloadServiceFeeder {
31

    
32
	/**
33
	 * The Constant log.
34
	 */
35
	private static final Log log = LogFactory.getLog(DownloadServiceFeeder.class);
36

    
37
	/**
38
	 * The download plugin enumerator.
39
	 */
40
	@Resource
41
	DownloadPluginEnumeratorImpl downloadPluginEnumerator;
42

    
43
	/**
44
	 * The result set client factory.
45
	 */
46
	@Resource
47
	private ResultSetClient resultSetClient;
48

    
49
	/**
50
	 * The object store dao.
51
	 */
52
	@Autowired
53
	private ObjectStoreDao objectStoreDao;
54

    
55
	public static void reportException(final DownloadReportMap report, final DownloadItem di, final Throwable e) {
56
		final String className = e.getClass().getName();
57
		if (!report.containsKey(className)) {
58
			final DownloadReport dr = new DownloadReport();
59
			dr.setStackTrace(Joiner.on("\tat ").join(e.getStackTrace()));
60
			if (di != null) {
61
				dr.setDownloadItem(di);
62
			}
63
			report.put(className, dr);
64
		} else {
65
			report.get(className).incrementError();
66
		}
67
	}
68

    
69
	/**
70
	 * Download and feed file into the objectStore .
71
	 *
72
	 * @param epr             the end-point reference of the result-set of Serialized DownloadItem
73
	 * @param plugin          the plugin used to retrieve the correct URL
74
	 * @param objectStoreID   the object store id to store the data
75
	 * @param protocol        the protocol used to download the file
76
	 * @param mimeType        the mime type of the Files
77
	 * @param numberOfThreads the number of threads to use for download at the same time
78
	 * @throws ObjectStoreServiceException
79
	 */
80
	public DownloadReportMap download(final String epr,
81
			final String plugin,
82
			final String objectStoreID,
83
			final String protocol,
84
			final String mimeType,
85
			final int numberOfThreads,
86
			final String basePath,
87
			final List<String> regularExpression,
88
			final int connectTimeoutMs,
89
			final int readTimeoutMs, final int sleepTimeMs) throws ObjectStoreServiceException {
90
		final DownloadPlugin downloadPlugin = downloadPluginEnumerator.get(plugin);
91
		if ((basePath != null) && (basePath.isEmpty() == false)) {
92
			downloadPlugin.setBasePath(basePath);
93
		}
94

    
95
		final Iterable<String> urlInfo = resultSetClient.iter(ResultSet.fromJson(epr), String.class);
96
		final BlockingQueue<String> itemsQueue = Queues.newArrayBlockingQueue(numberOfThreads * 5);
97
		final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
98
		final ObjectStore objStore = objectStoreDao.getObjectStore(objectStoreID);
99

    
100
		final List<Future<DownloadReportMap>> responses = Lists.newArrayList();
101
		final DownloadReportMap pluginReport = new DownloadReportMap();
102
		pluginReport.setStatus(true);
103

    
104
		if (regularExpression != null) {
105
			downloadPlugin.setRegularExpression(regularExpression);
106
		}
107

    
108
		for (int i = 0; i < numberOfThreads; i++) {
109
			final Function<String, DownloadItem> applyDowunloadPlugin = input -> {
110
				if (input == null) {
111
					log.error("Input is null");
112
					return null;
113
				}
114
				if (input.equals(DownloadIntoObjectStoreAction.END_QUEUE_STRING)) return DownloadIntoObjectStoreAction.END_QUEUE;
115

    
116
				DownloadItem di = null;
117
				try {
118
					di = DownloadItem.newObjectfromJSON(input);
119

    
120
					if (downloadPlugin.retrieveUrl(di) == null) {
121
						di.setUrl(null);
122
						di.setOriginalUrl(null);
123
					}
124
					return di;
125
				} catch (Throwable e) {
126
					reportException(pluginReport, di, e);
127
					log.error("Exception on transform item :" + input, e);
128
					return null;
129
				}
130
			};
131
			responses.add(executor
132
					.submit(new DownloadWorker(itemsQueue, objStore, Protocols.valueOf(protocol), mimeType, connectTimeoutMs, readTimeoutMs, sleepTimeMs,
133
							applyDowunloadPlugin
134
					)));
135
		}
136

    
137
		int i = 0;
138
		if (urlInfo != null) {
139
			for (final String downloadItem : urlInfo) {
140
				if (downloadItem != null) {
141
					if ((i++ % 1000) == 0) {
142
						log.debug("Read " + i);
143
					}
144
					try {
145
						itemsQueue.put(downloadItem);
146
					} catch (final Exception e) {
147
						log.error("An error occurred while populating the download items queue: " + Joiner.on("\tat ").join(e.getStackTrace()));
148
					}
149
				}
150
			}
151
		}
152

    
153
		try {
154
			itemsQueue.put(DownloadIntoObjectStoreAction.END_QUEUE_STRING);
155
		} catch (final InterruptedException e) {
156
			log.error("An error occurred adding the loop terminator: " + Joiner.on("\tat ").join(e.getStackTrace()));
157
		}
158

    
159
		final DownloadReportMap resultMap = getDownloadReportMap(responses, pluginReport);
160
		executor.shutdown();
161
		return resultMap;
162
	}
163

    
164
	private DownloadReportMap getDownloadReportMap(final List<Future<DownloadReportMap>> responses, final DownloadReportMap pluginReport) {
165
		final DownloadReportMap resultMap = new DownloadReportMap();
166
		resultMap.setStatus(true);
167

    
168
		for (final Future<DownloadReportMap> currentResponse : responses) {
169
			try {
170
				final DownloadReportMap currentMap = currentResponse.get();
171

    
172
				mergeReport(resultMap, currentMap);
173

    
174
				log.info("Status " + currentMap.getStatus());
175
				resultMap.setStatus(resultMap.getStatus() && currentMap.getStatus());
176
				resultMap.setTotalDownloaded(currentMap.getTotalDownloaded() + resultMap.getTotalDownloaded());
177

    
178
			} catch (final Exception e) {
179
				log.error(e);
180
				resultMap.setStatus(false);
181
			}
182
		}
183
		mergeReport(resultMap, pluginReport);
184
		return resultMap;
185
	}
186

    
187
	private void mergeReport(final DownloadReportMap resultMap, final DownloadReportMap currentMap) {
188
		for (final String key : currentMap.keySet()) {
189
			if (!resultMap.containsKey(key)) {
190
				resultMap.put(key, currentMap.get(key));
191
			} else {
192
				final DownloadReport currentReport = currentMap.get(key);
193
				resultMap.get(key).incrementError(currentReport.getNumberOfOccurrences());
194
			}
195
			resultMap.setTotalDownloaded(resultMap.getTotalDownloaded() + currentMap.getTotalDownloaded());
196
			resultMap.setStatus(resultMap.getStatus() & currentMap.getStatus());
197
		}
198
	}
199
}
(8-8/21)