Project

General

Profile

1
package eu.dnetlib.data.download.rmi;
2

    
3
import java.util.List;
4
import java.util.concurrent.ArrayBlockingQueue;
5
import java.util.concurrent.BlockingQueue;
6
import java.util.concurrent.ExecutorService;
7
import java.util.concurrent.Executors;
8
import java.util.concurrent.Future;
9

    
10
import javax.annotation.Resource;
11

    
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.springframework.beans.factory.annotation.Autowired;
15

    
16
import com.google.common.base.Function;
17
import com.google.common.base.Joiner;
18
import com.google.common.collect.Lists;
19

    
20
import eu.dnetlib.data.download.DownloadPluginEnumeratorImpl;
21
import eu.dnetlib.data.download.DownloadReport;
22
import eu.dnetlib.data.download.DownloadReportMap;
23
import eu.dnetlib.data.download.DownloadServiceImpl;
24
import eu.dnetlib.data.download.worker.DownloadWorker;
25
import eu.dnetlib.data.objectstore.modular.connector.ObjectStore;
26
import eu.dnetlib.data.objectstore.modular.connector.ObjectStoreDao;
27
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
28
import eu.dnetlib.data.objectstore.rmi.Protocols;
29
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
30

    
31
/**
32
 * The Class DownloadServiceFeeder.
33
 */
34
public class DownloadServiceFeeder {
35

    
36
	/** The Constant log. */
37
	private static final Log log = LogFactory.getLog(DownloadServiceFeeder.class);
38

    
39
	/** The download plugin enumerator. */
40
	@Resource
41
	DownloadPluginEnumeratorImpl downloadPluginEnumerator;
42

    
43
	/** The result set client factory. */
44
	@Resource
45
	private ResultSetClientFactory resultSetClientFactory;
46

    
47
	/** The object store dao. */
48
	@Autowired
49
	private ObjectStoreDao objectStoreDao;
50

    
51
	/**
52
	 * Download and feed file into the objectStore .
53
	 *
54
	 * @param epr
55
	 *            the end-point reference of the result-set of Serialized DownloadItem
56
	 * @param plugin
57
	 *            the plugin used to retrieve the correct URL
58
	 * @param objectStoreID
59
	 *            the object store id to store the data
60
	 * @param protocol
61
	 *            the protocol used to download the file
62
	 * @param mimeType
63
	 *            the mime type of the Files
64
	 * @param numberOfThreads
65
	 *            the number of threads to use for download at the same time
66
	 * @throws DownloadServiceException
67
	 *             the download service exception
68
	 * @throws ObjectStoreServiceException
69
	 */
70
	public DownloadReportMap download(final String epr,
71
			final String plugin,
72
			final String objectStoreID,
73
			final String protocol,
74
			final String mimeType,
75
			final int numberOfThreads,
76
			final String basePath) throws DownloadServiceException, ObjectStoreServiceException {
77
		final DownloadPlugin downloadPlugin = downloadPluginEnumerator.get(plugin);
78
		if ((basePath != null) && (basePath.isEmpty() == false)) {
79
			downloadPlugin.setBasePath(basePath);
80
		}
81

    
82
		final Iterable<String> urlInfo = resultSetClientFactory.getClient(epr);
83
		final BlockingQueue<String> itemsQueue = new ArrayBlockingQueue<String>(1024);
84
		final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
85
		final ObjectStore objStore = objectStoreDao.getObjectStore(objectStoreID);
86

    
87
		final List<Future<DownloadReportMap>> responses = Lists.newArrayList();
88

    
89
		for (int i = 0; i < numberOfThreads; i++) {
90
			responses.add(executor.submit(new DownloadWorker(itemsQueue, objStore, Protocols.valueOf(protocol), mimeType, new Function<String, DownloadItem>() {
91

    
92
				@Override
93
				public DownloadItem apply(final String input) {
94
					if (input == null) {
95
						log.error("Input is null");
96
						return null;
97
					}
98
					if (input.equals(DownloadServiceImpl.END_QUEUE_STRING)) return DownloadServiceImpl.END_QUEUE;
99
					try {
100
						final DownloadItem di = DownloadItem.newObjectfromJSON(input);
101
						if (downloadPlugin.retrieveUrl(di) == null) {
102
							di.setUrl(null);
103
							di.setOriginalUrl(null);
104
						}
105
						return di;
106
					} catch (Exception e) {
107
						log.error("Exception on trasform item :"+input,e);
108
						return null;
109
					}
110
				}
111
			})));
112
		}
113

    
114
		int i = 0;
115
		for (final String downloadItem : urlInfo) {
116
			if (downloadItem != null) {
117
				if ((i++ % 1000) == 0) {
118
					log.debug("Read " + i);
119
				}
120
				try {
121
					itemsQueue.put(downloadItem);
122
				} catch (final Exception e) {
123
					log.error("An error occurred while populating the download items queue: " + Joiner.on("\tat ").join(e.getStackTrace()));
124
				}
125
			}
126
		}
127

    
128
		try {
129
			itemsQueue.put(DownloadServiceImpl.END_QUEUE_STRING);
130
		} catch (final InterruptedException e) {
131
			log.error("An error occurred adding the loop terminator: " + Joiner.on("\tat ").join(e.getStackTrace()));
132
		}
133

    
134
		final DownloadReportMap resultMap = new DownloadReportMap();
135
		resultMap.setStatus(true);
136

    
137
		for (final Future<DownloadReportMap> currentResponses : responses) {
138
			try {
139
				final DownloadReportMap currentMap = currentResponses.get();
140
				for (final String key : currentMap.keySet()) {
141
					if (!resultMap.containsKey(key)) {
142
						resultMap.put(key, currentMap.get(key));
143
						resultMap.setTotalDownloaded(currentMap.getTotalDownloaded());
144
						resultMap.setStatus(currentMap.getStatus());
145
					} else {
146
						final DownloadReport currentReport = currentMap.get(key);
147
						resultMap.get(key).incrementError(currentReport.getNumberOfOccurrences());
148
					}
149
				}
150
				log.info("Status " + currentMap.getStatus());
151
				resultMap.setStatus(resultMap.getStatus() && currentMap.getStatus());
152
				resultMap.setTotalDownloaded(currentMap.getTotalDownloaded() + resultMap.getTotalDownloaded());
153

    
154
			} catch (final Exception e) {
155
				log.error(e);
156
				resultMap.setStatus(false);
157
			}
158
		}
159

    
160
		executor.shutdown();
161
		return resultMap;
162

    
163
	}
164
}
(2-2/2)