Project

General

Profile

1
package eu.dnetlib.data.download.rmi;
2

    
3
import java.util.List;
4
import java.util.concurrent.BlockingQueue;
5
import java.util.concurrent.ExecutorService;
6
import java.util.concurrent.Executors;
7
import java.util.concurrent.Future;
8
import javax.annotation.Resource;
9

    
10
import com.google.common.base.Function;
11
import com.google.common.base.Joiner;
12
import com.google.common.collect.Lists;
13
import com.google.common.collect.Queues;
14
import eu.dnetlib.data.download.DownloadPluginEnumeratorImpl;
15
import eu.dnetlib.data.download.DownloadReport;
16
import eu.dnetlib.data.download.DownloadReportMap;
17
import eu.dnetlib.data.download.DownloadServiceImpl;
18
import eu.dnetlib.data.download.worker.DownloadWorker;
19
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
20
import eu.dnetlib.data.objectstore.modular.connector.ObjectStoreDao;
21
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
22
import eu.dnetlib.data.objectstore.rmi.Protocols;
23
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
24
import org.apache.commons.logging.Log;
25
import org.apache.commons.logging.LogFactory;
26
import org.springframework.beans.factory.annotation.Autowired;
27

    
28
/**
29
 * The Class DownloadServiceFeeder.
30
 */
31
public class DownloadServiceFeeder {
32

    
33
	/** The Constant log. */
34
	private static final Log log = LogFactory.getLog(DownloadServiceFeeder.class);
35
	private static final int MAX_NUM_FOUND = 10;
36

    
37
	/** The download plugin enumerator. */
38
	@Resource
39
	DownloadPluginEnumeratorImpl downloadPluginEnumerator;
40

    
41
	/** The result set client factory. */
42
	@Resource
43
	private ResultSetClientFactory resultSetClientFactory;
44

    
45
	/** The object store dao. */
46
	@Autowired
47
	private ObjectStoreDao objectStoreDao;
48

    
49
	public static void reportException(final DownloadReportMap report, final DownloadItem di, final Throwable e) {
50
		final String className = e.getClass().getName();
51
		if (!report.containsKey(className)) {
52
			final DownloadReport dr = new DownloadReport();
53
			dr.setStackTrace(Joiner.on("\tat ").join(e.getStackTrace()));
54
			if (di != null) {
55
				dr.setDownloadItem(di);
56
			}
57
			report.put(className, dr);
58
		} else {
59
			report.get(className).incrementError();
60
		}
61
	}
62

    
63
	/**
64
	 * Download and feed file into the objectStore .
65
	 *
66
	 * @param epr
67
	 *            the end-point reference of the result-set of Serialized DownloadItem
68
	 * @param plugin
69
	 *            the plugin used to retrieve the correct URL
70
	 * @param objectStoreID
71
	 *            the object store id to store the data
72
	 * @param protocol
73
	 *            the protocol used to download the file
74
	 * @param mimeType
75
	 *            the mime type of the Files
76
	 * @param numberOfThreads
77
	 *            the number of threads to use for download at the same time
78
	 * @throws DownloadServiceException
79
	 *             the download service exception
80
	 * @throws ObjectStoreServiceException
81
	 */
82
	public DownloadReportMap download(final String epr,
83
			final String plugin,
84
			final String objectStoreID,
85
			final String protocol,
86
			final String mimeType,
87
			final int numberOfThreads,
88
			final String basePath,
89
			final List<String> regularExpression,
90
			final int connectTimeoutMs,
91
			final int readTimeoutMs, final int sleepTimeMs) throws DownloadServiceException, ObjectStoreServiceException {
92
		final DownloadPlugin downloadPlugin = downloadPluginEnumerator.get(plugin);
93
		if ((basePath != null) && (basePath.isEmpty() == false)) {
94
			downloadPlugin.setBasePath(basePath);
95
		}
96

    
97
		final Iterable<String> urlInfo = resultSetClientFactory.getClient(epr);
98
		final BlockingQueue<String> itemsQueue = Queues.newArrayBlockingQueue(1024);
99
		final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
100
		final ObjectStore objStore = objectStoreDao.getObjectStore(objectStoreID);
101

    
102
		final List<Future<DownloadReportMap>> responses = Lists.newArrayList();
103
		final DownloadReportMap pluginReport = new DownloadReportMap();
104
		pluginReport.setStatus(true);
105

    
106
		if (regularExpression != null) {
107
			downloadPlugin.setRegularExpression(regularExpression);
108
		}
109

    
110
		for (int i = 0; i < numberOfThreads; i++) {
111
			responses.add(executor
112
					.submit(new DownloadWorker(itemsQueue, objStore, Protocols.valueOf(protocol), mimeType, connectTimeoutMs, readTimeoutMs, sleepTimeMs,
113
					new Function<String, DownloadItem>() {
114

    
115
				@Override
116
				public DownloadItem apply(final String input) {
117
					if (input == null) {
118
						log.error("Input is null");
119
						return null;
120
					}
121
					if (input.equals(DownloadServiceImpl.END_QUEUE_STRING)) return DownloadServiceImpl.END_QUEUE;
122

    
123
					DownloadItem di = null;
124
					try {
125
						di = DownloadItem.newObjectfromJSON(input);
126

    
127
						if (downloadPlugin.retrieveUrl(di) == null) {
128
							di.setUrl(null);
129
							di.setOriginalUrl(null);
130
						}
131
						return di;
132
					} catch (Throwable e) {
133
						reportException(pluginReport, di, e);
134
						log.error("Exception on transform item :" + input, e);
135
						return null;
136
					}
137
				}
138
			})));
139
		}
140

    
141
		int i = 0;
142
		int null_found = 0;
143
		if (urlInfo != null) {
144
			for (final String downloadItem : urlInfo) {
145
				if (downloadItem != null) {
146
					null_found = 0;
147
					if ((i++ % 1000) == 0) {
148
						log.debug("Read " + i);
149
					}
150
					try {
151
						itemsQueue.put(downloadItem);
152
					} catch (final Exception e) {
153
						log.error("An error occurred while populating the download items queue: " + Joiner.on("\tat ").join(e.getStackTrace()));
154
					}
155
				} else {
156
					if (null_found++ > MAX_NUM_FOUND) {
157
						break;
158
					}
159
				}
160

    
161
			}
162
		}
163

    
164
		try {
165
			itemsQueue.put(DownloadServiceImpl.END_QUEUE_STRING);
166
		} catch (final InterruptedException e) {
167
			log.error("An error occurred adding the loop terminator: " + Joiner.on("\tat ").join(e.getStackTrace()));
168
		}
169

    
170
		final DownloadReportMap resultMap = getDownloadReportMap(responses, pluginReport);
171
		executor.shutdown();
172
		return resultMap;
173
	}
174

    
175
	private DownloadReportMap getDownloadReportMap(final List<Future<DownloadReportMap>> responses, final DownloadReportMap pluginReport) {
176
		final DownloadReportMap resultMap = new DownloadReportMap();
177
		resultMap.setStatus(true);
178

    
179
		for (final Future<DownloadReportMap> currentResponse : responses) {
180
			try {
181
				final DownloadReportMap currentMap = currentResponse.get();
182

    
183
				mergeReport(resultMap, currentMap);
184

    
185
				log.info("Status " + currentMap.getStatus());
186
				resultMap.setStatus(resultMap.getStatus() && currentMap.getStatus());
187
				resultMap.setTotalDownloaded(currentMap.getTotalDownloaded() + resultMap.getTotalDownloaded());
188

    
189
			} catch (final Exception e) {
190
				log.error(e);
191
				resultMap.setStatus(false);
192
			}
193
		}
194
		mergeReport(resultMap, pluginReport);
195
		return resultMap;
196
	}
197

    
198
	private void mergeReport(final DownloadReportMap resultMap, final DownloadReportMap currentMap) {
199
		for (final String key : currentMap.keySet()) {
200
			if (!resultMap.containsKey(key)) {
201
				resultMap.put(key, currentMap.get(key));
202
			} else {
203
				final DownloadReport currentReport = currentMap.get(key);
204
				resultMap.get(key).incrementError(currentReport.getNumberOfOccurrences());
205
			}
206
			resultMap.setTotalDownloaded(resultMap.getTotalDownloaded() + currentMap.getTotalDownloaded());
207
			resultMap.setStatus(resultMap.getStatus() & currentMap.getStatus());
208
		}
209
	}
210
}
(2-2/2)