Project

General

Profile

« Previous | Next » 

Revision 45885

more java8 stuff

View differences:

modules/dnet-data-services/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/MDStoreRetriever.java
1 1
package eu.dnetlib.data.mdstore.modular;
2 2

  
3
import com.google.common.collect.Iterables;
3
import java.util.List;
4

  
4 5
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
5 6
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
6 7
import eu.dnetlib.enabling.resultset.factory.ResultSetFactory;
8
import eu.dnetlib.enabling.tools.DnetStreamSupport;
7 9
import eu.dnetlib.rmi.common.ResultSet;
8 10
import eu.dnetlib.rmi.data.DocumentNotFoundException;
9 11
import eu.dnetlib.rmi.data.MDStoreServiceException;
......
28 30
	}
29 31

  
30 32
	public Iterable<String> deliver(final String format, final String layout, final String interpretation) throws MDStoreServiceException {
31
		try {
32
			return Iterables.concat(Iterables.transform(dao.listMDStores(format, layout, interpretation), mdId -> {
33
		return () -> getMdIds(format, layout, interpretation).stream().map(mdId -> {
33 34
				log.debug("bulk deliver of mdId: " + mdId);
34 35
				try {
35 36
					return dao.readMDStore(mdId).iterate();
36 37
				} catch (MDStoreServiceException e) {
37 38
					throw new RuntimeException(e);
38 39
				}
39
			}));
40
		} catch (RuntimeException e) {
41
			throw new MDStoreServiceException(e);
40
			}).flatMap(i -> DnetStreamSupport.generateStreamFromIterator(i.iterator())).iterator();
41
	}
42

  
43
	private List<String> getMdIds(final String format, final String layout, final String interpretation) {
44
		try {
45
			return dao.listMDStores(format, layout, interpretation);
46
		} catch (MDStoreServiceException e) {
47
			throw new RuntimeException(e);
42 48
		}
43 49
	}
44 50

  
modules/dnet-data-services/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MongoResultSetListener.java
25 25
	private MongoCollection<DBObject> collection;
26 26
	private int total = 0;
27 27
	private int count = 0;
28
	private int batchSize;
28 29

  
29
	public MongoResultSetListener(final MongoCollection<DBObject> collection, final Pattern filter, final Function<DBObject, String> serializer)
30
	public MongoResultSetListener(final MongoCollection<DBObject> collection, final Pattern filter, final Function<DBObject, String> serializer, final int batchSize)
30 31
			throws MDStoreServiceException {
31 32
		log.debug("Creating resultset");
32 33
		this.collection = collection;
33 34
		this.filter = filter;
34 35
		this.serializer = serializer;
36
		this.batchSize = batchSize;
35 37
		createCursor();
36
		if (currentCursor == null)
38
		if (currentCursor == null) {
37 39
			throw new MDStoreServiceException("Error on creating resultset the iterator is null");
38

  
40
		}
39 41
	}
40 42

  
41 43
	private void createCursor() throws MDStoreServiceException {
42 44
		try {
43 45
			if (filter != null) {
44
				Bson query = Filters.regex("body", filter);
46
				final Bson query = Filters.regex("body", filter);
45 47
				total = (int) collection.count(query);
46
				currentCursor = collection.find(query).sort(sortByIdAsc).iterator();
48
				currentCursor = collection.find(query).batchSize(batchSize).sort(sortByIdAsc).iterator();
47 49
			} else {
48 50
				total = (int) collection.count();
49
				currentCursor = collection.find().sort(sortByIdAsc).iterator();
51
				currentCursor = collection.find().batchSize(batchSize).sort(sortByIdAsc).iterator();
50 52
			}
51 53
		} catch (Throwable e) {
52 54
			throw new MDStoreServiceException("Error on initialize the cursor");
modules/dnet-data-services/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MongoMDStore.java
15 15
import com.mongodb.BasicDBObject;
16 16
import com.mongodb.DBObject;
17 17
import com.mongodb.QueryBuilder;
18
import com.mongodb.client.FindIterable;
19
import com.mongodb.client.ListIndexesIterable;
20
import com.mongodb.client.MongoCollection;
21
import com.mongodb.client.MongoDatabase;
18
import com.mongodb.client.*;
22 19
import eu.dnetlib.data.mdstore.modular.RecordParser;
23 20
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
24 21
import eu.dnetlib.enabling.resultset.listener.ResultSetListener;
22
import eu.dnetlib.enabling.tools.DnetStreamSupport;
25 23
import eu.dnetlib.rmi.data.DocumentNotFoundException;
26 24
import org.apache.commons.lang3.StringUtils;
27 25
import org.apache.commons.logging.Log;
......
41 39

  
42 40
	private RecordParser recordParser;
43 41

  
42

  
43

  
44 44
	public MongoMDStore(final String id,
45 45
			final MongoCollection<DBObject> collection,
46 46
			final RecordParser recordParser,
......
146 146
		return deliver(from, until, recordFilter, new SerializeMongoRecordId());
147 147
	}
148 148

  
149
	public ResultSetListener<String> deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer) {
149
	private ResultSetListener<String> deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer) {
150 150
		try {
151 151
			ensureIndices();
152 152
			final Pattern filter = (recordFilter != null) && (recordFilter.length() > 0) ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
153 153

  
154
			return new MongoResultSetListener(collection, filter, serializer);
154
			return new MongoResultSetListener(collection, filter, serializer, BULK_SIZE);
155 155
		} catch (Throwable e) {
156 156
			throw new RuntimeException(e);
157 157
		}
......
159 159

  
160 160
	@Override
161 161
	public Iterable<String> iterate() {
162
		Stream<DBObject> inputStream = StreamSupport.stream(Spliterators.spliteratorUnknownSize(collection.find().iterator(), Spliterator.ORDERED), false);
163
		return inputStream.map(it -> (String) it.get("body")).collect(Collectors.toList());
162
		return () -> {
163
			final MongoCursor<DBObject> i = collection.find().batchSize(BULK_SIZE).iterator();
164
			return DnetStreamSupport.generateStreamFromIterator(i)
165
					.map(it -> (String) it.get("body"))
166
					.iterator();
167
		};
164 168
	}
165 169

  
166 170
	@Override

Also available in: Unified diff