Project

General

Profile

1
package eu.dnetlib.data.objectstore.filesystem;
2

    
3
import java.util.List;
4

    
5
import com.google.common.collect.Lists;
6
import com.mongodb.DBObject;
7
import com.mongodb.client.FindIterable;
8
import com.mongodb.client.MongoCollection;
9
import com.mongodb.client.MongoCursor;
10
import com.mongodb.client.model.Filters;
11
import com.mongodb.client.model.Sorts;
12
import eu.dnetlib.enabling.resultset.ResultSet;
13
import eu.dnetlib.enabling.resultset.ResultSetAware;
14
import eu.dnetlib.enabling.resultset.ResultSetListener;
15
import eu.dnetlib.miscutils.collections.MappedCollection;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.bson.conversions.Bson;
19

    
20
/**
21
 * The listener interface for receiving fileSystemObjectStoreResultSet events.
22
 * The class that is interested in processing a fileSystemObjectStoreResultSet
23
 * event implements this interface, and the object created
24
 * with that class is registered with a component using the
25
 * component's <code>addFileSystemObjectStoreResultSetListener<code> method. When
26
 * the fileSystemObjectStoreResultSet event occurs, that object's appropriate
27
 * method is invoked.
28
 *
29
 * @author sandro
30
 */
31
public class FileSystemObjectStoreResultSetListener implements ResultSetListener, ResultSetAware {
32

    
33

    
34
	/** The Constant log. */
35
	private static final Log log = LogFactory.getLog(FileSystemObjectStoreResultSetListener.class); // NOPMD by marko on 11/24/08 5:02 PM
36

    
37

    
38
	/** The from date. */
39
	private Long fromDate;
40

    
41
	/** The until date. */
42
	private Long untilDate;
43

    
44
	/** The records. */
45
	private List<String> records;
46

    
47
	/** The object store id. */
48
	private String objectStoreID;
49

    
50

    
51
	/** The mongo collection. */
52
	private MongoCollection<DBObject> mongoCollection;
53

    
54
	/** The base uri. */
55
	private String baseURI;
56

    
57
	/**
58
	 * The base path
59
	 */
60
	private String basePath;
61

    
62
	/** The current size. */
63
	private int currentSize = -1;
64

    
65
	/** The current cursor. */
66
	private MongoCursor<DBObject> currentCursor;
67

    
68
	/** The cursor position. */
69
	private long cursorPosition;
70

    
71
	/**
72
	 * {@inheritDoc}
73
	 * @see eu.dnetlib.enabling.resultset.TypedResultSetListener#getResult(int, int)
74
	 */
75
	@Override
76
	public List<String> getResult(final int from, final int to) {
77
		if (log.isDebugEnabled()) {
78
			log.debug(String.format("ObjectStoreId :%s, from: %d, to: %d", objectStoreID, from, to));
79
		}
80
		if (records != null) {
81
			List<String> ids = Lists.newArrayList();
82
			for (int i = from-1; i < Math.min(records.size(),to); i++) {
83
				ids.add(records.get(i));
84
			}
85
			Bson q = Filters.in("id", ids);
86
			FindIterable<DBObject> res = getMongoCollection().find(q);
87
			return MappedCollection.listMap(res, ObjectStoreFileUtility.asJSON(getBaseURI(), getObjectStoreID(), getBasePath()));
88
		} else if ((fromDate != null) && (untilDate != null)) {
89
			if ((currentCursor == null) || (cursorPosition > from)) {
90
				createCurrentCursor();
91
			}
92
			while (cursorPosition < from) {
93
				currentCursor.next();
94
				cursorPosition++;
95
			}
96
			List<DBObject> result = Lists.newArrayList();
97
			for (int i = from; i <= to; i++) {
98
				if (currentCursor.hasNext()) {
99
					result.add(currentCursor.next());
100
					cursorPosition++;
101
				}
102
			}
103
			return MappedCollection.listMap(result, ObjectStoreFileUtility.asJSON(getBaseURI(), getObjectStoreID(), getBasePath()));
104
		}
105

    
106
		throw new IllegalArgumentException("Missing parameters on Delivery must provide either from, to, or ObjectStoreIDs");
107
	}
108

    
109
	/**
110
	 * Creates the current cursor.
111
	 */
112
	private void createCurrentCursor() {
113
		Bson timestampQuery = Filters.and(Filters.gt("timestamp", fromDate.doubleValue()), Filters.lt("timestamp", untilDate.doubleValue()));
114
		if (currentCursor != null) {
115
			currentCursor.close();
116
		}
117
		currentCursor = getMongoCollection().find(timestampQuery).sort(Sorts.orderBy(Filters.eq("_id", 1))).iterator();
118
		cursorPosition = 1;
119

    
120
	}
121

    
122
	/**
123
	 * {@inheritDoc}
124
	 * @see eu.dnetlib.enabling.resultset.TypedResultSetListener#getSize()
125
	 */
126
	@Override
127
	public int getSize() {
128
		if (currentSize == -1) {
129
			currentSize = calculateSize();
130
		}
131
		return Math.max(0, currentSize - 1);
132
	}
133

    
134
	/**
135
	 * Calculate size.
136
	 *
137
	 * @return the int
138
	 */
139
	private int calculateSize() {
140
		if (records != null) {
141
			Bson query = Filters.in("id", records);
142
			return (int) getMongoCollection().count(query);
143
		} else if ((fromDate != null) && (untilDate != null)) {
144
			Bson timestampQuery = Filters.and(Filters.gt("timestamp", fromDate.doubleValue()), Filters.lt("timestamp", untilDate.doubleValue()));
145
			return (int) getMongoCollection().count(timestampQuery);
146
		}
147
		return 0;
148
	}
149

    
150

    
151
	/**
152
	 * {@inheritDoc}
153
	 * @see eu.dnetlib.enabling.resultset.ResultSetAware#setResultSet(eu.dnetlib.enabling.resultset.ResultSet)
154
	 */
155
	@Override
156
	public void setResultSet(final ResultSet resultSet) {
157
		resultSet.close();
158
	}
159

    
160

    
161
	/**
162
	 * Gets the from date.
163
	 *
164
	 * @return the fromDate
165
	 */
166
	public Long getFromDate() {
167
		return fromDate;
168
	}
169

    
170

    
171
	/**
172
	 * Sets the from date.
173
	 *
174
	 * @param fromDate the fromDate to set
175
	 */
176
	public FileSystemObjectStoreResultSetListener setFromDate(final Long fromDate) {
177
		this.fromDate = fromDate;
178
		return this;
179
	}
180

    
181

    
182
	/**
183
	 * Gets the until date.
184
	 *
185
	 * @return the untilDate
186
	 */
187
	public Long getUntilDate() {
188
		return untilDate;
189
	}
190

    
191

    
192
	/**
193
	 * Sets the until date.
194
	 *
195
	 * @param untilDate the untilDate to set
196
	 */
197
	public FileSystemObjectStoreResultSetListener setUntilDate(final Long untilDate) {
198
		this.untilDate = untilDate;
199
		return this;
200
	}
201

    
202

    
203
	/**
204
	 * Gets the records.
205
	 *
206
	 * @return the records
207
	 */
208
	public List<String> getRecords() {
209
		return records;
210
	}
211

    
212

    
213
	/**
214
	 * Sets the records.
215
	 *
216
	 * @param records the records to set
217
	 */
218
	public void setRecords(final List<String> records) {
219
		this.records = records;
220
	}
221

    
222

    
223
	/**
224
	 * Gets the object store id.
225
	 *
226
	 * @return the objectStoreID
227
	 */
228
	public String getObjectStoreID() {
229
		return objectStoreID;
230
	}
231

    
232

    
233
	/**
234
	 * Sets the object store id.
235
	 *
236
	 * @param objectStoreID the objectStoreID to set
237
	 */
238
	public void setObjectStoreID(final String objectStoreID) {
239
		this.objectStoreID = objectStoreID;
240
	}
241

    
242

    
243

    
244

    
245

    
246
	/**
247
	 * Gets the base uri.
248
	 *
249
	 * @return the baseURI
250
	 */
251
	public String getBaseURI() {
252
		return baseURI;
253
	}
254

    
255

    
256
	/**
257
	 * Sets the base uri.
258
	 *
259
	 * @param baseURI the baseURI to set
260
	 */
261
	public void setBaseURI(final String baseURI) {
262
		this.baseURI = baseURI;
263
	}
264

    
265

    
266
	/**
267
	 * Gets the current size.
268
	 *
269
	 * @return the currentSize
270
	 */
271
	public int getCurrentSize() {
272
		return currentSize;
273
	}
274

    
275

    
276
	/**
277
	 * Sets the current size.
278
	 *
279
	 * @param currentSize the currentSize to set
280
	 */
281
	public void setCurrentSize(final int currentSize) {
282
		this.currentSize = currentSize;
283
	}
284

    
285

    
286
	/**
287
	 * Gets the current cursor.
288
	 *
289
	 * @return the currentCursor
290
	 */
291
	public MongoCursor<DBObject> getCurrentCursor() {
292
		return currentCursor;
293
	}
294

    
295

    
296
	/**
297
	 * Sets the current cursor.
298
	 *
299
	 * @param currentCursor the currentCursor to set
300
	 */
301
	public void setCurrentCursor(final MongoCursor<DBObject> currentCursor) {
302
		this.currentCursor = currentCursor;
303
	}
304

    
305

    
306
	/**
307
	 * Gets the cursor position.
308
	 *
309
	 * @return the cursorPosition
310
	 */
311
	public long getCursorPosition() {
312
		return cursorPosition;
313
	}
314

    
315

    
316
	/**
317
	 * Sets the cursor position.
318
	 *
319
	 * @param cursorPosition the cursorPosition to set
320
	 */
321
	public void setCursorPosition(final long cursorPosition) {
322
		this.cursorPosition = cursorPosition;
323
	}
324

    
325
	/**
326
	 * Gets the mongo collection.
327
	 *
328
	 * @return the mongo collection
329
	 */
330
	public MongoCollection<DBObject> getMongoCollection() {
331
		return mongoCollection;
332
	}
333

    
334
	/**
335
	 * Sets the mongo collection.
336
	 *
337
	 * @param mongoCollection the new mongo collection
338
	 */
339
	public void setMongoCollection(final MongoCollection<DBObject> mongoCollection) {
340
		this.mongoCollection = mongoCollection;
341
	}
342

    
343
	public String getBasePath() {
344
		return basePath;
345
	}
346

    
347
	public void setBasePath(final String basePath) {
348
		this.basePath = basePath;
349
	}
350
}
(3-3/8)