Project

General

Profile

1
package eu.dnetlib.data.download.rmi;
2

    
3
import java.util.List;
4
import java.util.concurrent.ArrayBlockingQueue;
5
import java.util.concurrent.BlockingQueue;
6
import java.util.concurrent.ExecutorService;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.Future;
9

    
10
import javax.annotation.Resource;
11

    
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.springframework.beans.factory.annotation.Autowired;
15

    
16
import com.google.common.base.Function;
17
import com.google.common.base.Joiner;
18
import com.google.common.collect.Lists;
19

    
20
import eu.dnetlib.data.download.DownloadPluginEnumeratorImpl;
21
import eu.dnetlib.data.download.DownloadReport;
22
import eu.dnetlib.data.download.DownloadReportMap;
23
import eu.dnetlib.data.download.DownloadServiceImpl;
24
import eu.dnetlib.data.download.worker.DownloadWorker;
25
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
26
import eu.dnetlib.data.objectstore.modular.connector.ObjectStoreDao;
27
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
28
import eu.dnetlib.data.objectstore.rmi.Protocols;
29
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
30

    
31
/**
32
 * The Class DownloadServiceFeeder.
33
 */
34
public class DownloadServiceFeeder {
35

    
36
	/** The Constant log. */
37
	private static final Log log = LogFactory.getLog(DownloadServiceFeeder.class);
38

    
39
	/** The download plugin enumerator. */
40
	@Resource
41
	DownloadPluginEnumeratorImpl downloadPluginEnumerator;
42

    
43
	/** The result set client factory. */
44
	@Resource
45
	private ResultSetClientFactory resultSetClientFactory;
46

    
47
	/** The object store dao. */
48
	@Autowired
49
	private ObjectStoreDao objectStoreDao;
50

    
51
	/**
52
	 * Download and feed file into the objectStore .
53
	 *
54
	 * @param epr
55
	 *            the end-point reference of the result-set of Serialized DownloadItem
56
	 * @param plugin
57
	 *            the plugin used to retrieve the correct URL
58
	 * @param objectStoreID
59
	 *            the object store id to store the data
60
	 * @param protocol
61
	 *            the protocol used to download the file
62
	 * @param mimeType
63
	 *            the mime type of the Files
64
	 * @param numberOfThreads
65
	 *            the number of threads to use for download at the same time
66
	 * @throws DownloadServiceException
67
	 *             the download service exception
68
	 * @throws ObjectStoreServiceException
69
	 */
70
	public DownloadReportMap download(final String epr,
71
			final String plugin,
72
			final String objectStoreID,
73
			final String protocol,
74
			final String mimeType,
75
			final int numberOfThreads,
76
			final String basePath,
77
			final List<String> regularExpression
78
			) throws DownloadServiceException, ObjectStoreServiceException {
79
		final DownloadPlugin downloadPlugin = downloadPluginEnumerator.get(plugin);
80
		if ((basePath != null) && (basePath.isEmpty() == false)) {
81
			downloadPlugin.setBasePath(basePath);
82
		}
83

    
84
		final Iterable<String> urlInfo = resultSetClientFactory.getClient(epr);
85
		final BlockingQueue<String> itemsQueue = new ArrayBlockingQueue<String>(1024);
86
		final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
87
		final ObjectStore objStore = objectStoreDao.getObjectStore(objectStoreID);
88

    
89
		final List<Future<DownloadReportMap>> responses = Lists.newArrayList();
90

    
91
		if (regularExpression != null) {
92
			downloadPlugin.setRegularExpression(regularExpression);
93
		}
94

    
95
		for (int i = 0; i < numberOfThreads; i++) {
96
			responses.add(executor.submit(new DownloadWorker(itemsQueue, objStore, Protocols.valueOf(protocol), mimeType, new Function<String, DownloadItem>() {
97

    
98
				@Override
99
				public DownloadItem apply(final String input) {
100
					if (input == null) {
101
						log.error("Input is null");
102
						return null;
103
					}
104
					if (input.equals(DownloadServiceImpl.END_QUEUE_STRING)) return DownloadServiceImpl.END_QUEUE;
105
					try {
106
						final DownloadItem di = DownloadItem.newObjectfromJSON(input);
107

    
108
						if (downloadPlugin.retrieveUrl(di) == null) {
109
							di.setUrl(null);
110
							di.setOriginalUrl(null);
111
						}
112
						return di;
113
					} catch (Throwable e) {
114
						log.error("Exception on trasform item :" + input, e);
115
						return null;
116
					}
117
				}
118
			})));
119
		}
120

    
121
		int i = 0;
122
		if (urlInfo != null) {
123
			for (final String downloadItem : urlInfo) {
124
				if (downloadItem != null) {
125
					if ((i++ % 1000) == 0) {
126
						log.debug("Read " + i);
127
					}
128
					try {
129
						itemsQueue.put(downloadItem);
130
					} catch (final Exception e) {
131
						log.error("An error occurred while populating the download items queue: " + Joiner.on("\tat ").join(e.getStackTrace()));
132
					}
133
				}
134
			}
135
		}
136

    
137
		try {
138
			itemsQueue.put(DownloadServiceImpl.END_QUEUE_STRING);
139
		} catch (final InterruptedException e) {
140
			log.error("An error occurred adding the loop terminator: " + Joiner.on("\tat ").join(e.getStackTrace()));
141
		}
142

    
143
		final DownloadReportMap resultMap = new DownloadReportMap();
144
		resultMap.setStatus(true);
145

    
146
		for (final Future<DownloadReportMap> currentResponses : responses) {
147
			try {
148
				final DownloadReportMap currentMap = currentResponses.get();
149
				for (final String key : currentMap.keySet()) {
150
					if (!resultMap.containsKey(key)) {
151
						resultMap.put(key, currentMap.get(key));
152
						resultMap.setTotalDownloaded(currentMap.getTotalDownloaded());
153
						resultMap.setStatus(currentMap.getStatus());
154
					} else {
155
						final DownloadReport currentReport = currentMap.get(key);
156
						resultMap.get(key).incrementError(currentReport.getNumberOfOccurrences());
157
					}
158
				}
159
				log.info("Status " + currentMap.getStatus());
160
				resultMap.setStatus(resultMap.getStatus() && currentMap.getStatus());
161
				resultMap.setTotalDownloaded(currentMap.getTotalDownloaded() + resultMap.getTotalDownloaded());
162

    
163
			} catch (final Exception e) {
164
				log.error(e);
165
				resultMap.setStatus(false);
166
			}
167
		}
168

    
169
		executor.shutdown();
170
		return resultMap;
171

    
172
	}
173
}
(2-2/2)