Project

General

Profile

« Previous | Next » 

Revision 47234

Returning more detailed counters for mdstore feeding.
Fixed bug when trying to store too large XML: now all the page of XMLs is skipped, so we can go on with the other records (MongoBulkWritesManager)

View differences:

MongoMDStore.java
4 4
import java.util.List;
5 5
import java.util.Spliterator;
6 6
import java.util.Spliterators;
7
import java.util.concurrent.ArrayBlockingQueue;
8
import java.util.concurrent.BlockingQueue;
7
import java.util.concurrent.*;
9 8
import java.util.function.Function;
10 9
import java.util.regex.Pattern;
11 10
import java.util.stream.Collectors;
12 11
import java.util.stream.Stream;
13 12
import java.util.stream.StreamSupport;
14 13

  
14
import com.google.common.collect.Lists;
15 15
import com.mongodb.BasicDBObject;
16 16
import com.mongodb.DBObject;
17 17
import com.mongodb.QueryBuilder;
......
19 19
import com.mongodb.client.ListIndexesIterable;
20 20
import com.mongodb.client.MongoCollection;
21 21
import com.mongodb.client.MongoDatabase;
22
import com.mongodb.client.model.IndexOptions;
22 23
import eu.dnetlib.data.mdstore.modular.RecordParser;
23 24
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
24 25
import eu.dnetlib.enabling.resultset.listener.ResultSetListener;
......
42 43

  
43 44
	private RecordParser recordParser;
44 45

  
46
	private static List<String> indices = Lists.newArrayList("id", "timestamp", "originalId");
47
	private final IndexOptions options = new IndexOptions().background(true);
48

  
45 49
	public MongoMDStore(final String id,
46 50
			final MongoCollection<DBObject> collection,
47 51
			final RecordParser recordParser,
......
55 59
		this.discardRecords = discardRecords;
56 60
	}
57 61

  
62

  
63

  
58 64
	@Override
59
	public void feed(final Iterable<String> records, final boolean incremental) {
65
	public int feed(final Iterable<String> records, final boolean incremental) {
60 66
		// TODO: remove incremental from MDStore API. It is used in MDStoreModular. Useless here.
61 67

  
62 68
		ensureIndices();
63 69

  
64 70
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
65 71
		final Object sentinel = new Object();
66
		final Thread background = new Thread(() -> {
67
			final MongoBulkWritesManager bulkWritesManager =
68
					new MongoBulkWritesManager(collection, discardedCollection, BULK_SIZE, recordParser, discardRecords);
69
			int count = 0;
70
			while (true) {
71
				try {
72
					final Object record = queue.take();
73
					if (record == sentinel) {
74
						bulkWritesManager.flushBulks(mongoDatabase);
75
						break;
72
		int countStored = 0;
73
		final Callable<Integer> writer = new Callable<Integer>() {
74
			@Override
75
			public Integer call() throws Exception {
76
				final MongoBulkWritesManager bulkWritesManager =
77
						new MongoBulkWritesManager(collection, discardedCollection, BULK_SIZE, recordParser, discardRecords);
78
				int count = 0;
79
				while (true) {
80
					try {
81
						final Object record = queue.take();
82
						if (record == sentinel) {
83
							bulkWritesManager.flushBulks();
84
							break;
85
						}
86
						count++;
87
						bulkWritesManager.insert((String) record);
88
					} catch (final InterruptedException e) {
89
						log.fatal("got exception in background thread", e);
90
						throw new IllegalStateException(e);
76 91
					}
77
					count++;
78
					bulkWritesManager.insert((String) record);
79
				} catch (final InterruptedException e) {
80
					log.fatal("got exception in background thread", e);
81
					throw new IllegalStateException(e);
82 92
				}
93
				log.debug(String.format("extracted %s records from feeder queue", count));
94
				return count;
83 95
			}
84
			log.debug(String.format("extracted %s records from feeder queue", count));
85
		});
96
		};
86 97

  
87
		background.start();
98
		final ExecutorService executorService = Executors.newSingleThreadExecutor();
99
		Future<Integer> storedCountInt = executorService.submit(writer);
100

  
88 101
		try {
89 102
			log.info("feeding mdstore " + id);
90 103
			if (records != null) {
......
93 106
				}
94 107
			}
95 108
			queue.put(sentinel);
96
			log.info("finished feeding mdstore " + id);
97

  
98
			background.join();
109
			countStored = storedCountInt.get().intValue();
99 110
		} catch (final InterruptedException e) {
111
			log.error("Error on feeding mdstore with id:" + id, e);
100 112
			throw new IllegalStateException(e);
113
		} catch (ExecutionException e) {
114
			log.error("Error on feeding mdstore with id:" + id, e);
115
			throw new IllegalStateException(e);
101 116
		}
102
		// double check
117
		log.info("finished feeding mdstore " + id);
103 118
		ensureIndices();
104 119
		collection.createIndex(new BasicDBObject("id", 1));
120
		return countStored;
105 121
	}
106 122

  
107 123
	public void ensureIndices() {
108
		for (final String key : Arrays.asList("id", "timestamp", "originalId")) {
109
			collection.createIndex(new BasicDBObject(key, 1));
110
		}
124
			for (final String key : indices) {
125
				collection.createIndex(new BasicDBObject(key, 1), options);
126
			}
111 127
	}
112 128

  
113 129
	public boolean isIndexed() {

Also available in: Unified diff