Project

General

Profile

« Previous | Next » 

Revision 50563

MERGE: branch updated with trunk changes through r50560

View differences:

MongoMDStore.java
15 15
import com.mongodb.BasicDBObject;
16 16
import com.mongodb.DBObject;
17 17
import com.mongodb.QueryBuilder;
18
import com.mongodb.client.FindIterable;
19
import com.mongodb.client.ListIndexesIterable;
20
import com.mongodb.client.MongoCollection;
21
import com.mongodb.client.MongoDatabase;
18
import com.mongodb.client.*;
22 19
import com.mongodb.client.model.IndexOptions;
23 20
import eu.dnetlib.data.mdstore.modular.RecordParser;
24 21
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
25 22
import eu.dnetlib.enabling.resultset.listener.ResultSetListener;
23
import eu.dnetlib.enabling.tools.DnetStreamSupport;
26 24
import eu.dnetlib.rmi.data.DocumentNotFoundException;
27 25
import eu.dnetlib.rmi.data.MDStoreServiceException;
28 26
import org.apache.commons.lang3.StringUtils;
......
163 161
		return deliver(from, until, recordFilter, new SerializeMongoRecordId(), false);
164 162
	}
165 163

  
166
	public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer, boolean noCursorTimeout) {
164
	public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer) {
167 165
		try {
168 166
			final Pattern filter = (recordFilter != null) && (recordFilter.length() > 0) ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
169 167

  
170
			return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer, BULK_SIZE, noCursorTimeout);
168
			return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer, BULK_SIZE);
171 169
		} catch(Throwable e) {
172 170
			throw new RuntimeException(e);
173 171
		}
......
178 176
			return null;
179 177
		}
180 178
		try {
181
			Long l =  Long.valueOf(s);
182
			if(l.longValue() == 0 || l.longValue() == -1 ) return null;
183
			else return l;
179
			return Long.valueOf(s);
184 180
		} catch (NumberFormatException e) {
185 181
			throw new MDStoreServiceException("Invalid date, expected java.lang.Long, or null", e);
186 182
		}
......
188 184

  
189 185
	@Override
190 186
	public Iterable<String> iterate() {
191
		Stream<DBObject> inputStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(collection.find().iterator(), Spliterator.ORDERED), false);
192
		return inputStream.map(it -> (String) it.get("body")).collect(Collectors.toList());
187
		return () -> {
188
            final MongoCursor<DBObject> i = collection.find().noCursorTimeout(true).batchSize(BULK_SIZE).iterator();
189
            return DnetStreamSupport.generateStreamFromIterator(i)
190
					.map(it -> (String) it.get("body"))
191
					.iterator();
192
		};
193 193
	}
194 194

  
195 195
	@Override

Also available in: Unified diff