1 |
42157
|
sandro.lab
|
package eu.dnetlib.data.objectstore;
|
2 |
26600
|
sandro.lab
|
|
3 |
31882
|
sandro.lab
|
import java.util.List;
|
4 |
33251
|
sandro.lab
|
import java.util.concurrent.BlockingQueue;
|
5 |
|
|
import java.util.concurrent.ExecutorService;
|
6 |
|
|
import java.util.concurrent.Executors;
|
7 |
|
|
import java.util.concurrent.Future;
|
8 |
42157
|
sandro.lab
|
import java.util.function.Function;
|
9 |
26600
|
sandro.lab
|
import javax.annotation.Resource;
|
10 |
|
|
|
11 |
33912
|
claudio.at
|
import com.google.common.base.Joiner;
|
12 |
31882
|
sandro.lab
|
import com.google.common.collect.Lists;
|
13 |
40523
|
claudio.at
|
import com.google.common.collect.Queues;
|
14 |
42157
|
sandro.lab
|
import eu.dnetlib.data.objectstore.connector.ObjectStore;
|
15 |
|
|
import eu.dnetlib.data.objectstore.connector.ObjectStoreDao;
|
16 |
|
|
import eu.dnetlib.data.objectstore.worker.DownloadWorker;
|
17 |
|
|
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
|
18 |
|
|
import eu.dnetlib.rmi.common.ResultSet;
|
19 |
42170
|
sandro.lab
|
import eu.dnetlib.rmi.data.DownloadItem;
|
20 |
|
|
import eu.dnetlib.rmi.data.DownloadPlugin;
|
21 |
|
|
import eu.dnetlib.rmi.data.ObjectStoreServiceException;
|
22 |
|
|
import eu.dnetlib.rmi.data.Protocols;
|
23 |
40523
|
claudio.at
|
import org.apache.commons.logging.Log;
|
24 |
|
|
import org.apache.commons.logging.LogFactory;
|
25 |
|
|
import org.springframework.beans.factory.annotation.Autowired;
|
26 |
26600
|
sandro.lab
|
|
27 |
|
|
/**
|
28 |
|
|
* The Class DownloadServiceFeeder.
|
29 |
|
|
*/
|
30 |
|
|
public class DownloadServiceFeeder {
|
31 |
|
|
|
32 |
42157
|
sandro.lab
|
/**
|
33 |
|
|
* The Constant log.
|
34 |
|
|
*/
|
35 |
26600
|
sandro.lab
|
private static final Log log = LogFactory.getLog(DownloadServiceFeeder.class);
|
36 |
|
|
|
37 |
42157
|
sandro.lab
|
/**
|
38 |
|
|
* The download plugin enumerator.
|
39 |
|
|
*/
|
40 |
26600
|
sandro.lab
|
@Resource
|
41 |
33251
|
sandro.lab
|
DownloadPluginEnumeratorImpl downloadPluginEnumerator;
|
42 |
26600
|
sandro.lab
|
|
43 |
42157
|
sandro.lab
|
/**
|
44 |
|
|
* The result set client factory.
|
45 |
|
|
*/
|
46 |
26600
|
sandro.lab
|
@Resource
|
47 |
42157
|
sandro.lab
|
private ResultSetClient resultSetClient;
|
48 |
26600
|
sandro.lab
|
|
49 |
42157
|
sandro.lab
|
/**
|
50 |
|
|
* The object store dao.
|
51 |
|
|
*/
|
52 |
26600
|
sandro.lab
|
@Autowired
|
53 |
|
|
private ObjectStoreDao objectStoreDao;
|
54 |
|
|
|
55 |
42157
|
sandro.lab
|
public static void reportException(final DownloadReportMap report, final DownloadItem di, final Throwable e) {
|
56 |
|
|
final String className = e.getClass().getName();
|
57 |
|
|
if (!report.containsKey(className)) {
|
58 |
|
|
final DownloadReport dr = new DownloadReport();
|
59 |
|
|
dr.setStackTrace(Joiner.on("\tat ").join(e.getStackTrace()));
|
60 |
|
|
if (di != null) {
|
61 |
|
|
dr.setDownloadItem(di);
|
62 |
|
|
}
|
63 |
|
|
report.put(className, dr);
|
64 |
|
|
} else {
|
65 |
|
|
report.get(className).incrementError();
|
66 |
|
|
}
|
67 |
|
|
}
|
68 |
|
|
|
69 |
26600
|
sandro.lab
|
/**
|
70 |
|
|
* Download and feed file into the objectStore .
|
71 |
31408
|
sandro.lab
|
*
|
72 |
42157
|
sandro.lab
|
* @param epr the end-point reference of the result-set of Serialized DownloadItem
|
73 |
|
|
* @param plugin the plugin used to retrieve the correct URL
|
74 |
|
|
* @param objectStoreID the object store id to store the data
|
75 |
|
|
* @param protocol the protocol used to download the file
|
76 |
|
|
* @param mimeType the mime type of the Files
|
77 |
|
|
* @param numberOfThreads the number of threads to use for download at the same time
|
78 |
37002
|
sandro.lab
|
* @throws ObjectStoreServiceException
|
79 |
26600
|
sandro.lab
|
*/
|
80 |
32283
|
sandro.lab
|
public DownloadReportMap download(final String epr,
|
81 |
26600
|
sandro.lab
|
final String plugin,
|
82 |
|
|
final String objectStoreID,
|
83 |
|
|
final String protocol,
|
84 |
|
|
final String mimeType,
|
85 |
33251
|
sandro.lab
|
final int numberOfThreads,
|
86 |
37568
|
sandro.lab
|
final String basePath,
|
87 |
41998
|
alessia.ba
|
final List<String> regularExpression,
|
88 |
|
|
final int connectTimeoutMs,
|
89 |
42170
|
sandro.lab
|
final int readTimeoutMs, final int sleepTimeMs) throws ObjectStoreServiceException {
|
90 |
26600
|
sandro.lab
|
final DownloadPlugin downloadPlugin = downloadPluginEnumerator.get(plugin);
|
91 |
36413
|
sandro.lab
|
if ((basePath != null) && (basePath.isEmpty() == false)) {
|
92 |
33251
|
sandro.lab
|
downloadPlugin.setBasePath(basePath);
|
93 |
|
|
}
|
94 |
26600
|
sandro.lab
|
|
95 |
42157
|
sandro.lab
|
final Iterable<String> urlInfo = resultSetClient.iter(ResultSet.fromJson(epr), String.class);
|
96 |
42170
|
sandro.lab
|
final BlockingQueue<String> itemsQueue = Queues.newArrayBlockingQueue(numberOfThreads * 5);
|
97 |
33912
|
claudio.at
|
final ExecutorService executor = Executors.newFixedThreadPool(numberOfThreads);
|
98 |
|
|
final ObjectStore objStore = objectStoreDao.getObjectStore(objectStoreID);
|
99 |
26600
|
sandro.lab
|
|
100 |
33912
|
claudio.at
|
final List<Future<DownloadReportMap>> responses = Lists.newArrayList();
|
101 |
40523
|
claudio.at
|
final DownloadReportMap pluginReport = new DownloadReportMap();
|
102 |
|
|
pluginReport.setStatus(true);
|
103 |
31882
|
sandro.lab
|
|
104 |
38924
|
claudio.at
|
if (regularExpression != null) {
|
105 |
37568
|
sandro.lab
|
downloadPlugin.setRegularExpression(regularExpression);
|
106 |
|
|
}
|
107 |
|
|
|
108 |
26600
|
sandro.lab
|
for (int i = 0; i < numberOfThreads; i++) {
|
109 |
42157
|
sandro.lab
|
final Function<String, DownloadItem> applyDowunloadPlugin = input -> {
|
110 |
|
|
if (input == null) {
|
111 |
|
|
log.error("Input is null");
|
112 |
|
|
return null;
|
113 |
|
|
}
|
114 |
|
|
if (input.equals(DownloadIntoObjectStoreAction.END_QUEUE_STRING)) return DownloadIntoObjectStoreAction.END_QUEUE;
|
115 |
33251
|
sandro.lab
|
|
116 |
42157
|
sandro.lab
|
DownloadItem di = null;
|
117 |
|
|
try {
|
118 |
|
|
di = DownloadItem.newObjectfromJSON(input);
|
119 |
40523
|
claudio.at
|
|
120 |
42157
|
sandro.lab
|
if (downloadPlugin.retrieveUrl(di) == null) {
|
121 |
|
|
di.setUrl(null);
|
122 |
|
|
di.setOriginalUrl(null);
|
123 |
33251
|
sandro.lab
|
}
|
124 |
42157
|
sandro.lab
|
return di;
|
125 |
|
|
} catch (Throwable e) {
|
126 |
|
|
reportException(pluginReport, di, e);
|
127 |
|
|
log.error("Exception on transform item :" + input, e);
|
128 |
|
|
return null;
|
129 |
33251
|
sandro.lab
|
}
|
130 |
42157
|
sandro.lab
|
};
|
131 |
|
|
responses.add(executor
|
132 |
|
|
.submit(new DownloadWorker(itemsQueue, objStore, Protocols.valueOf(protocol), mimeType, connectTimeoutMs, readTimeoutMs, sleepTimeMs,
|
133 |
|
|
applyDowunloadPlugin
|
134 |
|
|
)));
|
135 |
26600
|
sandro.lab
|
}
|
136 |
33251
|
sandro.lab
|
|
137 |
|
|
int i = 0;
|
138 |
38924
|
claudio.at
|
if (urlInfo != null) {
|
139 |
37586
|
sandro.lab
|
for (final String downloadItem : urlInfo) {
|
140 |
|
|
if (downloadItem != null) {
|
141 |
|
|
if ((i++ % 1000) == 0) {
|
142 |
|
|
log.debug("Read " + i);
|
143 |
|
|
}
|
144 |
|
|
try {
|
145 |
|
|
itemsQueue.put(downloadItem);
|
146 |
|
|
} catch (final Exception e) {
|
147 |
|
|
log.error("An error occurred while populating the download items queue: " + Joiner.on("\tat ").join(e.getStackTrace()));
|
148 |
|
|
}
|
149 |
33251
|
sandro.lab
|
}
|
150 |
26600
|
sandro.lab
|
}
|
151 |
|
|
}
|
152 |
|
|
|
153 |
|
|
try {
|
154 |
42157
|
sandro.lab
|
itemsQueue.put(DownloadIntoObjectStoreAction.END_QUEUE_STRING);
|
155 |
33912
|
claudio.at
|
} catch (final InterruptedException e) {
|
156 |
|
|
log.error("An error occurred adding the loop terminator: " + Joiner.on("\tat ").join(e.getStackTrace()));
|
157 |
26600
|
sandro.lab
|
}
|
158 |
31882
|
sandro.lab
|
|
159 |
40523
|
claudio.at
|
final DownloadReportMap resultMap = getDownloadReportMap(responses, pluginReport);
|
160 |
40501
|
claudio.at
|
executor.shutdown();
|
161 |
|
|
return resultMap;
|
162 |
40523
|
claudio.at
|
}
|
163 |
40501
|
claudio.at
|
|
164 |
40523
|
claudio.at
|
private DownloadReportMap getDownloadReportMap(final List<Future<DownloadReportMap>> responses, final DownloadReportMap pluginReport) {
|
165 |
33912
|
claudio.at
|
final DownloadReportMap resultMap = new DownloadReportMap();
|
166 |
33251
|
sandro.lab
|
resultMap.setStatus(true);
|
167 |
31882
|
sandro.lab
|
|
168 |
40512
|
claudio.at
|
for (final Future<DownloadReportMap> currentResponse : responses) {
|
169 |
33251
|
sandro.lab
|
try {
|
170 |
40512
|
claudio.at
|
final DownloadReportMap currentMap = currentResponse.get();
|
171 |
|
|
|
172 |
40523
|
claudio.at
|
mergeReport(resultMap, currentMap);
|
173 |
40512
|
claudio.at
|
|
174 |
33251
|
sandro.lab
|
log.info("Status " + currentMap.getStatus());
|
175 |
|
|
resultMap.setStatus(resultMap.getStatus() && currentMap.getStatus());
|
176 |
|
|
resultMap.setTotalDownloaded(currentMap.getTotalDownloaded() + resultMap.getTotalDownloaded());
|
177 |
31882
|
sandro.lab
|
|
178 |
33912
|
claudio.at
|
} catch (final Exception e) {
|
179 |
33251
|
sandro.lab
|
log.error(e);
|
180 |
|
|
resultMap.setStatus(false);
|
181 |
|
|
}
|
182 |
|
|
}
|
183 |
40523
|
claudio.at
|
mergeReport(resultMap, pluginReport);
|
184 |
33251
|
sandro.lab
|
return resultMap;
|
185 |
26600
|
sandro.lab
|
}
|
186 |
40523
|
claudio.at
|
|
187 |
|
|
private void mergeReport(final DownloadReportMap resultMap, final DownloadReportMap currentMap) {
|
188 |
|
|
for (final String key : currentMap.keySet()) {
|
189 |
|
|
if (!resultMap.containsKey(key)) {
|
190 |
|
|
resultMap.put(key, currentMap.get(key));
|
191 |
|
|
} else {
|
192 |
|
|
final DownloadReport currentReport = currentMap.get(key);
|
193 |
|
|
resultMap.get(key).incrementError(currentReport.getNumberOfOccurrences());
|
194 |
|
|
}
|
195 |
42117
|
claudio.at
|
resultMap.setTotalDownloaded(resultMap.getTotalDownloaded() + currentMap.getTotalDownloaded());
|
196 |
|
|
resultMap.setStatus(resultMap.getStatus() & currentMap.getStatus());
|
197 |
40523
|
claudio.at
|
}
|
198 |
|
|
}
|
199 |
26600
|
sandro.lab
|
}
|