1
|
package eu.dnetlib.data.objectstore.filesystem;
|
2
|
|
3
|
import java.util.List;
|
4
|
|
5
|
import com.google.common.collect.Lists;
|
6
|
import com.mongodb.DBObject;
|
7
|
import com.mongodb.client.FindIterable;
|
8
|
import com.mongodb.client.MongoCollection;
|
9
|
import com.mongodb.client.MongoCursor;
|
10
|
import com.mongodb.client.model.Filters;
|
11
|
import com.mongodb.client.model.Sorts;
|
12
|
import eu.dnetlib.enabling.resultset.ResultSet;
|
13
|
import eu.dnetlib.enabling.resultset.ResultSetAware;
|
14
|
import eu.dnetlib.enabling.resultset.ResultSetListener;
|
15
|
import eu.dnetlib.miscutils.collections.MappedCollection;
|
16
|
import org.apache.commons.logging.Log;
|
17
|
import org.apache.commons.logging.LogFactory;
|
18
|
import org.bson.conversions.Bson;
|
19
|
|
20
|
/**
|
21
|
* The listener interface for receiving fileSystemObjectStoreResultSet events.
|
22
|
* The class that is interested in processing a fileSystemObjectStoreResultSet
|
23
|
* event implements this interface, and the object created
|
24
|
* with that class is registered with a component using the
|
25
|
* component's <code>addFileSystemObjectStoreResultSetListener<code> method. When
|
26
|
* the fileSystemObjectStoreResultSet event occurs, that object's appropriate
|
27
|
* method is invoked.
|
28
|
*
|
29
|
* @author sandro
|
30
|
*/
|
31
|
public class FileSystemObjectStoreResultSetListener implements ResultSetListener, ResultSetAware {
|
32
|
|
33
|
|
34
|
/** The Constant log. */
|
35
|
private static final Log log = LogFactory.getLog(FileSystemObjectStoreResultSetListener.class); // NOPMD by marko on 11/24/08 5:02 PM
|
36
|
|
37
|
|
38
|
/** The from date. */
|
39
|
private Long fromDate;
|
40
|
|
41
|
/** The until date. */
|
42
|
private Long untilDate;
|
43
|
|
44
|
/** The records. */
|
45
|
private List<String> records;
|
46
|
|
47
|
/** The object store id. */
|
48
|
private String objectStoreID;
|
49
|
|
50
|
|
51
|
/** The mongo collection. */
|
52
|
private MongoCollection<DBObject> mongoCollection;
|
53
|
|
54
|
/** The base uri. */
|
55
|
private String baseURI;
|
56
|
|
57
|
/**
|
58
|
* The base path
|
59
|
*/
|
60
|
private String basePath;
|
61
|
|
62
|
/** The current size. */
|
63
|
private int currentSize = -1;
|
64
|
|
65
|
/** The current cursor. */
|
66
|
private MongoCursor<DBObject> currentCursor;
|
67
|
|
68
|
/** The cursor position. */
|
69
|
private long cursorPosition;
|
70
|
|
71
|
/**
|
72
|
* {@inheritDoc}
|
73
|
* @see eu.dnetlib.enabling.resultset.TypedResultSetListener#getResult(int, int)
|
74
|
*/
|
75
|
@Override
|
76
|
public List<String> getResult(final int from, final int to) {
|
77
|
if (log.isDebugEnabled()) {
|
78
|
log.debug(String.format("ObjectStoreId :%s, from: %d, to: %d", objectStoreID, from, to));
|
79
|
}
|
80
|
if (records != null) {
|
81
|
List<String> ids = Lists.newArrayList();
|
82
|
for (int i = from-1; i < Math.min(records.size(),to); i++) {
|
83
|
ids.add(records.get(i));
|
84
|
}
|
85
|
Bson q = Filters.in("id", ids);
|
86
|
FindIterable<DBObject> res = getMongoCollection().find(q);
|
87
|
return MappedCollection.listMap(res, ObjectStoreFileUtility.asJSON(getBaseURI(), getObjectStoreID(), getBasePath()));
|
88
|
} else if ((fromDate != null) && (untilDate != null)) {
|
89
|
if ((currentCursor == null) || (cursorPosition > from)) {
|
90
|
createCurrentCursor();
|
91
|
}
|
92
|
while (cursorPosition < from) {
|
93
|
currentCursor.next();
|
94
|
cursorPosition++;
|
95
|
}
|
96
|
List<DBObject> result = Lists.newArrayList();
|
97
|
for (int i = from; i <= to; i++) {
|
98
|
if (currentCursor.hasNext()) {
|
99
|
result.add(currentCursor.next());
|
100
|
cursorPosition++;
|
101
|
}
|
102
|
}
|
103
|
return MappedCollection.listMap(result, ObjectStoreFileUtility.asJSON(getBaseURI(), getObjectStoreID(), getBasePath()));
|
104
|
}
|
105
|
|
106
|
throw new IllegalArgumentException("Missing parameters on Delivery must provide either from, to, or ObjectStoreIDs");
|
107
|
}
|
108
|
|
109
|
/**
|
110
|
* Creates the current cursor.
|
111
|
*/
|
112
|
private void createCurrentCursor() {
|
113
|
Bson timestampQuery = Filters.and(Filters.gt("timestamp", fromDate.doubleValue()), Filters.lt("timestamp", untilDate.doubleValue()));
|
114
|
if (currentCursor != null) {
|
115
|
currentCursor.close();
|
116
|
}
|
117
|
currentCursor = getMongoCollection().find(timestampQuery).sort(Sorts.orderBy(Filters.eq("_id", 1))).iterator();
|
118
|
cursorPosition = 1;
|
119
|
|
120
|
}
|
121
|
|
122
|
/**
|
123
|
* {@inheritDoc}
|
124
|
* @see eu.dnetlib.enabling.resultset.TypedResultSetListener#getSize()
|
125
|
*/
|
126
|
@Override
|
127
|
public int getSize() {
|
128
|
if (currentSize == -1) {
|
129
|
currentSize = calculateSize();
|
130
|
}
|
131
|
return Math.max(0, currentSize - 1);
|
132
|
}
|
133
|
|
134
|
/**
|
135
|
* Calculate size.
|
136
|
*
|
137
|
* @return the int
|
138
|
*/
|
139
|
private int calculateSize() {
|
140
|
if (records != null) {
|
141
|
Bson query = Filters.in("id", records);
|
142
|
return (int) getMongoCollection().count(query);
|
143
|
} else if ((fromDate != null) && (untilDate != null)) {
|
144
|
Bson timestampQuery = Filters.and(Filters.gt("timestamp", fromDate.doubleValue()), Filters.lt("timestamp", untilDate.doubleValue()));
|
145
|
return (int) getMongoCollection().count(timestampQuery);
|
146
|
}
|
147
|
return 0;
|
148
|
}
|
149
|
|
150
|
|
151
|
/**
|
152
|
* {@inheritDoc}
|
153
|
* @see eu.dnetlib.enabling.resultset.ResultSetAware#setResultSet(eu.dnetlib.enabling.resultset.ResultSet)
|
154
|
*/
|
155
|
@Override
|
156
|
public void setResultSet(final ResultSet resultSet) {
|
157
|
resultSet.close();
|
158
|
}
|
159
|
|
160
|
|
161
|
/**
|
162
|
* Gets the from date.
|
163
|
*
|
164
|
* @return the fromDate
|
165
|
*/
|
166
|
public Long getFromDate() {
|
167
|
return fromDate;
|
168
|
}
|
169
|
|
170
|
|
171
|
/**
|
172
|
* Sets the from date.
|
173
|
*
|
174
|
* @param fromDate the fromDate to set
|
175
|
*/
|
176
|
public FileSystemObjectStoreResultSetListener setFromDate(final Long fromDate) {
|
177
|
this.fromDate = fromDate;
|
178
|
return this;
|
179
|
}
|
180
|
|
181
|
|
182
|
/**
|
183
|
* Gets the until date.
|
184
|
*
|
185
|
* @return the untilDate
|
186
|
*/
|
187
|
public Long getUntilDate() {
|
188
|
return untilDate;
|
189
|
}
|
190
|
|
191
|
|
192
|
/**
|
193
|
* Sets the until date.
|
194
|
*
|
195
|
* @param untilDate the untilDate to set
|
196
|
*/
|
197
|
public FileSystemObjectStoreResultSetListener setUntilDate(final Long untilDate) {
|
198
|
this.untilDate = untilDate;
|
199
|
return this;
|
200
|
}
|
201
|
|
202
|
|
203
|
/**
|
204
|
* Gets the records.
|
205
|
*
|
206
|
* @return the records
|
207
|
*/
|
208
|
public List<String> getRecords() {
|
209
|
return records;
|
210
|
}
|
211
|
|
212
|
|
213
|
/**
|
214
|
* Sets the records.
|
215
|
*
|
216
|
* @param records the records to set
|
217
|
*/
|
218
|
public void setRecords(final List<String> records) {
|
219
|
this.records = records;
|
220
|
}
|
221
|
|
222
|
|
223
|
/**
|
224
|
* Gets the object store id.
|
225
|
*
|
226
|
* @return the objectStoreID
|
227
|
*/
|
228
|
public String getObjectStoreID() {
|
229
|
return objectStoreID;
|
230
|
}
|
231
|
|
232
|
|
233
|
/**
|
234
|
* Sets the object store id.
|
235
|
*
|
236
|
* @param objectStoreID the objectStoreID to set
|
237
|
*/
|
238
|
public void setObjectStoreID(final String objectStoreID) {
|
239
|
this.objectStoreID = objectStoreID;
|
240
|
}
|
241
|
|
242
|
|
243
|
|
244
|
|
245
|
|
246
|
/**
|
247
|
* Gets the base uri.
|
248
|
*
|
249
|
* @return the baseURI
|
250
|
*/
|
251
|
public String getBaseURI() {
|
252
|
return baseURI;
|
253
|
}
|
254
|
|
255
|
|
256
|
/**
|
257
|
* Sets the base uri.
|
258
|
*
|
259
|
* @param baseURI the baseURI to set
|
260
|
*/
|
261
|
public void setBaseURI(final String baseURI) {
|
262
|
this.baseURI = baseURI;
|
263
|
}
|
264
|
|
265
|
|
266
|
/**
|
267
|
* Gets the current size.
|
268
|
*
|
269
|
* @return the currentSize
|
270
|
*/
|
271
|
public int getCurrentSize() {
|
272
|
return currentSize;
|
273
|
}
|
274
|
|
275
|
|
276
|
/**
|
277
|
* Sets the current size.
|
278
|
*
|
279
|
* @param currentSize the currentSize to set
|
280
|
*/
|
281
|
public void setCurrentSize(final int currentSize) {
|
282
|
this.currentSize = currentSize;
|
283
|
}
|
284
|
|
285
|
|
286
|
/**
|
287
|
* Gets the current cursor.
|
288
|
*
|
289
|
* @return the currentCursor
|
290
|
*/
|
291
|
public MongoCursor<DBObject> getCurrentCursor() {
|
292
|
return currentCursor;
|
293
|
}
|
294
|
|
295
|
|
296
|
/**
|
297
|
* Sets the current cursor.
|
298
|
*
|
299
|
* @param currentCursor the currentCursor to set
|
300
|
*/
|
301
|
public void setCurrentCursor(final MongoCursor<DBObject> currentCursor) {
|
302
|
this.currentCursor = currentCursor;
|
303
|
}
|
304
|
|
305
|
|
306
|
/**
|
307
|
* Gets the cursor position.
|
308
|
*
|
309
|
* @return the cursorPosition
|
310
|
*/
|
311
|
public long getCursorPosition() {
|
312
|
return cursorPosition;
|
313
|
}
|
314
|
|
315
|
|
316
|
/**
|
317
|
* Sets the cursor position.
|
318
|
*
|
319
|
* @param cursorPosition the cursorPosition to set
|
320
|
*/
|
321
|
public void setCursorPosition(final long cursorPosition) {
|
322
|
this.cursorPosition = cursorPosition;
|
323
|
}
|
324
|
|
325
|
/**
|
326
|
* Gets the mongo collection.
|
327
|
*
|
328
|
* @return the mongo collection
|
329
|
*/
|
330
|
public MongoCollection<DBObject> getMongoCollection() {
|
331
|
return mongoCollection;
|
332
|
}
|
333
|
|
334
|
/**
|
335
|
* Sets the mongo collection.
|
336
|
*
|
337
|
* @param mongoCollection the new mongo collection
|
338
|
*/
|
339
|
public void setMongoCollection(final MongoCollection<DBObject> mongoCollection) {
|
340
|
this.mongoCollection = mongoCollection;
|
341
|
}
|
342
|
|
343
|
public String getBasePath() {
|
344
|
return basePath;
|
345
|
}
|
346
|
|
347
|
public void setBasePath(final String basePath) {
|
348
|
this.basePath = basePath;
|
349
|
}
|
350
|
}
|