Project

General

Profile

« Previous | Next » 

Revision 62288

fixed a problem with blocked threads

View differences:

modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MongoMDStore.java
1 1
package eu.dnetlib.data.mdstore.modular.mongodb;
2 2

  
3
import java.util.*;
4
import java.util.concurrent.*;
3
import java.util.ArrayList;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Set;
7
import java.util.concurrent.ArrayBlockingQueue;
8
import java.util.concurrent.BlockingQueue;
9
import java.util.concurrent.Callable;
10
import java.util.concurrent.ExecutionException;
11
import java.util.concurrent.ExecutorService;
12
import java.util.concurrent.Executors;
13
import java.util.concurrent.Future;
14
import java.util.concurrent.TimeUnit;
5 15
import java.util.regex.Pattern;
6 16

  
17
import org.apache.commons.lang3.StringUtils;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.bson.conversions.Bson;
21
import org.springframework.beans.factory.annotation.Required;
22

  
7 23
import com.google.common.base.Function;
8 24
import com.google.common.collect.Iterables;
9 25
import com.google.common.collect.Iterators;
......
17 33
import com.mongodb.client.MongoCollection;
18 34
import com.mongodb.client.MongoDatabase;
19 35
import com.mongodb.client.model.IndexOptions;
36

  
20 37
import eu.dnetlib.data.mdstore.DocumentNotFoundException;
21 38
import eu.dnetlib.data.mdstore.MDStoreServiceException;
22 39
import eu.dnetlib.data.mdstore.modular.MDFormatDescription;
23 40
import eu.dnetlib.data.mdstore.modular.RecordParser;
24 41
import eu.dnetlib.data.mdstore.modular.connector.MDStore;
25 42
import eu.dnetlib.enabling.resultset.ResultSetListener;
26
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
27
import org.apache.commons.lang3.StringUtils;
28
import org.apache.commons.logging.Log;
29
import org.apache.commons.logging.LogFactory;
30
import org.bson.conversions.Bson;
31
import org.springframework.beans.factory.annotation.Required;
32 43

  
33 44
public class MongoMDStore implements MDStore {
34 45

  
......
37 48

  
38 49
	private final boolean discardRecords;
39 50
	private String id;
40
	private MongoDatabase mongoDatabase;
51
	private final MongoDatabase mongoDatabase;
41 52
	private MongoCollection<DBObject> collection;
42 53
	private MongoCollection<DBObject> discardedCollection;
43 54
	private List<MDFormatDescription> mdformats;
......
48 59
	private final IndexOptions options = new IndexOptions().background(true);
49 60

  
50 61
	public MongoMDStore(final String id,
51
			final MongoCollection<DBObject> collection,
52
			final RecordParser recordParser,
53
			final boolean discardRecords,
54
			final MongoDatabase mongoDatabase) {
62
		final MongoCollection<DBObject> collection,
63
		final RecordParser recordParser,
64
		final boolean discardRecords,
65
		final MongoDatabase mongoDatabase) {
55 66
		this.id = id;
56 67
		this.mongoDatabase = mongoDatabase;
57 68
		this.collection = collection;
......
77 88
		int countStored = 0;
78 89
		final Callable<Integer> writer = () -> {
79 90
			final MongoBulkWritesManager bulkWritesManager =
80
					new MongoBulkWritesManager(collection, discardedCollection, mdformats, BULK_SIZE, recordParser, discardRecords);
91
				new MongoBulkWritesManager(collection, discardedCollection, mdformats, BULK_SIZE, recordParser, discardRecords);
81 92
			int count = 0;
82 93
			while (true) {
83 94
				try {
84
					final Object record = queue.take();
95
					final Object record = queue.poll(1, TimeUnit.HOURS);
96
					if (record == null) {
97
						log.fatal("retrieved a null object, probably the feeding is failed");
98
						throw new IllegalStateException("retrieved a null object, probably the feeding wf is failed");
99
					}
85 100
					if (record.equals(sentinel)) {
86 101
						bulkWritesManager.flushBulks();
87 102
						break;
......
97 112
			return count;
98 113
		};
99 114
		final ExecutorService executorService = Executors.newSingleThreadExecutor();
100
		Future<Integer> storedCountInt = executorService.submit(writer);
115
		final Future<Integer> storedCountInt = executorService.submit(writer);
101 116

  
102 117
		try {
103 118
			log.info("feeding mdstore " + id);
104 119
			if (records != null) {
105 120
				for (final String record : records) {
106
					queue.put(record);
121
					if (record != null) {
122
						queue.put(record);
123
					}
107 124
				}
108 125
			}
109 126
			queue.put(sentinel);
......
145 162
	 */
146 163
	public void replace(final String grep, final String replace) {
147 164
		final Pattern regex = Pattern.compile(grep, Pattern.MULTILINE);
148
		BasicDBObject query = (BasicDBObject) QueryBuilder.start("body").regex(regex).get();
165
		final BasicDBObject query = (BasicDBObject) QueryBuilder.start("body").regex(regex).get();
149 166
		final FindIterable<DBObject> matches = collection.find(query, DBObject.class);
150
		//final DBCursor matches = collection.find(QueryBuilder.start("body").regex(regex).get());
151
		if (log.isDebugEnabled())
167
		// final DBCursor matches = collection.find(QueryBuilder.start("body").regex(regex).get());
168
		if (log.isDebugEnabled()) {
152 169
			log.debug("FOUND: " + Lists.newArrayList(matches).size());
170
		}
153 171

  
154 172
		for (final DBObject match : matches) {
155 173
			final DBObject o = new BasicDBObject(match.toMap());
......
169 187
	}
170 188

  
171 189
	public ResultSetListener deliver(final String from, final String until, final String recordFilter, final Function<DBObject, String> serializer)
172
			throws MDStoreServiceException {
173
		final Pattern filter = (recordFilter != null) && (recordFilter.length() > 0) ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
190
		throws MDStoreServiceException {
191
		final Pattern filter = recordFilter != null && recordFilter.length() > 0 ? Pattern.compile(recordFilter, Pattern.MULTILINE) : null;
174 192

  
175
		return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer);	}
193
		return new MongoResultSetListener(collection, parseLong(from), parseLong(until), filter, serializer);
194
	}
176 195

  
177 196
	private Long parseLong(final String s) throws MDStoreServiceException {
178
		if (StringUtils.isBlank(s)) {
179
			return null;
180
		}
197
		if (StringUtils.isBlank(s)) { return null; }
181 198
		try {
182 199
			return Long.valueOf(s);
183
		} catch (NumberFormatException e) {
200
		} catch (final NumberFormatException e) {
184 201
			throw new MDStoreServiceException("Invalid date, expected java.lang.Long, or null", e);
185 202
		}
186 203
	}
......
198 215
	@Override
199 216
	public String getRecord(final String recordId) throws DocumentNotFoundException {
200 217
		final DBObject obj = collection.find(new BasicDBObject("id", recordId)).first();
201
		if (obj == null || !obj.containsField("body")) throw new DocumentNotFoundException(String.format(
202
				"The document with id '%s' does not exist in mdstore: '%s'", recordId, id));
218
		if (obj == null || !obj.containsField("body")) {
219
			throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'", recordId, id));
220
		}
203 221
		final String body = (String) obj.get("body");
204
		if (body.trim().length() == 0) throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'",
205
				recordId, id));
222
		if (body.trim().length() == 0) {
223
			throw new DocumentNotFoundException(String.format("The document with id '%s' does not exist in mdstore: '%s'", recordId, id));
224
		}
206 225
		return new SerializeMongoRecord().apply(obj);
207 226
	}
208 227

  
......
210 229
	public List<String> deliver(final String mdId, final int pageSize, final int offset, final Map<String, String> queryParam) {
211 230
		final QueryBuilder query = QueryBuilder.start();
212 231

  
213
		for (String key : queryParam.keySet()) {
232
		for (final String key : queryParam.keySet()) {
214 233
			query.and(key).regex(Pattern.compile(queryParam.get(key), Pattern.LITERAL));
215 234
		}
216 235

  
217
		FindIterable<DBObject> dbObjects = offset > 0
218
				? collection.find((Bson) query.get()).limit(pageSize).skip(offset)
219
				: collection.find((Bson) query.get()).limit(pageSize);
236
		final FindIterable<DBObject> dbObjects = offset > 0
237
			? collection.find((Bson) query.get()).limit(pageSize).skip(offset)
238
			: collection.find((Bson) query.get()).limit(pageSize);
220 239

  
221

  
222 240
		queryParam.put("count", "" + collection.count((Bson) query.get()));
223 241

  
224 242
		final List<String> result = new ArrayList<>();
modules/cnr-mongo-mdstore/trunk/src/test/java/eu/dnetlib/data/mdstore/modular/mongodb/SimpleBlockingQueueThreadTest.java
1
package eu.dnetlib.data.mdstore.modular.mongodb;
2

  
3
import static org.junit.Assert.assertEquals;
4

  
5
import java.util.concurrent.ArrayBlockingQueue;
6
import java.util.concurrent.BlockingQueue;
7
import java.util.concurrent.Callable;
8
import java.util.concurrent.ExecutionException;
9
import java.util.concurrent.ExecutorService;
10
import java.util.concurrent.Executors;
11
import java.util.concurrent.Future;
12
import java.util.concurrent.TimeUnit;
13

  
14
import org.junit.Ignore;
15
import org.junit.Test;
16

  
17
public class SimpleBlockingQueueThreadTest {
18

  
19
	final ExecutorService executorService = Executors.newSingleThreadExecutor();
20

  
21
	@Test
22
	@Ignore
23
	public void test_1() {
24
		final int res = feed(1, 2, 3, 5);
25
		assertEquals(4, res);
26
		System.out.println("END");
27
	}
28

  
29
	@Test
30
	@Ignore
31
	public void test_2() throws InterruptedException {
32
		try {
33
			// The term 0 produces a division by 0 exception
34
			final int res = feed(1, 2, 0, 5);
35
			assertEquals(4, res);
36
		} catch (final Throwable e) {
37
			Thread.sleep(15000);
38
		}
39
		System.out.println("END");
40
	}
41

  
42
	public int feed(final int... records) {
43

  
44
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<>(100);
45
		final Object sentinel = new Object();
46
		int countStored = 0;
47
		final Callable<Integer> writer = () -> {
48
			int count = 0;
49
			while (true) {
50
				try {
51
					final Object record = queue.poll(10, TimeUnit.SECONDS);
52
					if (record == null) {
53
						System.err.println("- thread: retrieved a null object, probably the feeding is failed");
54
						throw new IllegalStateException("retrieved a null object, probably the feeding wf is failed");
55
					}
56
					if (record.equals(sentinel)) {
57
						System.out.println("- thread: sentinel found");
58
						break;
59
					}
60
					count++;
61
					System.out.println("- thread: processing object: " + record);
62
				} catch (final InterruptedException e) {
63
					System.err.println("- thread: Error - " + e.getMessage());
64
					throw new IllegalStateException(e);
65
				}
66
			}
67
			System.out.println("- thread: Total processed: " + count);
68
			return count;
69
		};
70
		final Future<Integer> storedCountInt = executorService.submit(writer);
71

  
72
		try {
73
			for (final int record : records) {
74
				queue.put(100 / record);
75
				Thread.sleep(4000);
76
			}
77
			queue.put(sentinel);
78
			countStored = storedCountInt.get().intValue();
79
		} catch (final InterruptedException | ExecutionException e) {
80
			System.err.println("Error: " + e.getMessage());
81
			throw new IllegalStateException(e);
82
		}
83

  
84
		return countStored;
85
	}
86

  
87
}

Also available in: Unified diff