Revision 50563
Added by Alessia Bardi over 6 years ago
MongoMDStore.java | ||
---|---|---|
15 | 15 |
import com.mongodb.BasicDBObject; |
16 | 16 |
import com.mongodb.DBObject; |
17 | 17 |
import com.mongodb.QueryBuilder; |
18 |
import com.mongodb.client.FindIterable; |
|
19 |
import com.mongodb.client.ListIndexesIterable; |
|
20 |
import com.mongodb.client.MongoCollection; |
|
21 |
import com.mongodb.client.MongoDatabase; |
|
18 |
import com.mongodb.client.*; |
|
22 | 19 |
import com.mongodb.client.model.IndexOptions; |
23 | 20 |
import eu.dnetlib.data.mdstore.modular.RecordParser; |
24 | 21 |
import eu.dnetlib.data.mdstore.modular.connector.MDStore; |
25 | 22 |
import eu.dnetlib.enabling.resultset.listener.ResultSetListener; |
23 |
import eu.dnetlib.enabling.tools.DnetStreamSupport; |
|
26 | 24 |
import eu.dnetlib.rmi.data.DocumentNotFoundException; |
27 | 25 |
import eu.dnetlib.rmi.data.MDStoreServiceException; |
28 | 26 |
import org.apache.commons.lang3.StringUtils; |
... | ... | |
163 | 161 |
return deliver(from, until, recordFilter, new SerializeMongoRecordId(), false); |
164 | 162 |
} |
165 | 163 |
|
166 |
public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer, boolean noCursorTimeout) {
|
|
164 |
public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer) { |
|
167 | 165 |
try { |
168 | 166 |
final Pattern filter = (recordFilter != null) && (recordFilter.length() > 0) ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null; |
169 | 167 |
|
170 |
return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer, BULK_SIZE, noCursorTimeout);
|
|
168 |
return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer, BULK_SIZE); |
|
171 | 169 |
} catch(Throwable e) { |
172 | 170 |
throw new RuntimeException(e); |
173 | 171 |
} |
... | ... | |
178 | 176 |
return null; |
179 | 177 |
} |
180 | 178 |
try { |
181 |
Long l = Long.valueOf(s); |
|
182 |
if(l.longValue() == 0 || l.longValue() == -1 ) return null; |
|
183 |
else return l; |
|
179 |
return Long.valueOf(s); |
|
184 | 180 |
} catch (NumberFormatException e) { |
185 | 181 |
throw new MDStoreServiceException("Invalid date, expected java.lang.Long, or null", e); |
186 | 182 |
} |
... | ... | |
188 | 184 |
|
189 | 185 |
@Override |
190 | 186 |
public Iterable<String> iterate() { |
191 |
Stream<DBObject> inputStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(collection.find().iterator(), Spliterator.ORDERED), false); |
|
192 |
return inputStream.map(it -> (String) it.get("body")).collect(Collectors.toList()); |
|
187 |
return () -> { |
|
188 |
final MongoCursor<DBObject> i = collection.find().noCursorTimeout(true).batchSize(BULK_SIZE).iterator(); |
|
189 |
return DnetStreamSupport.generateStreamFromIterator(i) |
|
190 |
.map(it -> (String) it.get("body")) |
|
191 |
.iterator(); |
|
192 |
}; |
|
193 | 193 |
} |
194 | 194 |
|
195 | 195 |
@Override |
Also available in: Unified diff
MERGE: branch updated with trunk changes through r50560