Revision 41652
Added by Alessia Bardi about 8 years ago
modules/dnet-oai-store-service/trunk/src/main/java/eu/dnetlib/data/oai/store/mongo/MongoCursor.java | ||
---|---|---|
1 |
package eu.dnetlib.data.oai.store.mongo; |
|
2 |
|
|
3 |
import java.util.Iterator; |
|
4 |
|
|
5 |
import com.mongodb.DBCursor; |
|
6 |
import com.mongodb.DBObject; |
|
7 |
|
|
8 |
import eu.dnetlib.data.information.oai.publisher.info.RecordInfo; |
|
9 |
import eu.dnetlib.data.oai.store.Cursor; |
|
10 |
import eu.dnetlib.miscutils.functional.UnaryFunction; |
|
11 |
|
|
12 |
public class MongoCursor implements Cursor { |
|
13 |
|
|
14 |
/** |
|
15 |
* Underlying mongo cursor. |
|
16 |
*/ |
|
17 |
private DBCursor dbCursor; |
|
18 |
/** |
|
19 |
* Function to apply to records before delivering. |
|
20 |
*/ |
|
21 |
private UnaryFunction<String, String> function; |
|
22 |
|
|
23 |
/** |
|
24 |
* true if the RecordInfo returned by this Cursor must include the record body, false otherwise. |
|
25 |
*/ |
|
26 |
private boolean bodyIncluded; |
|
27 |
|
|
28 |
private RecordInfoGenerator recordInfoGenerator; |
|
29 |
private MetadataExtractor metadataExtractor; |
|
30 |
private ProvenanceExtractor provenanceExtractor; |
|
31 |
|
|
32 |
/** |
|
33 |
* |
|
34 |
* {@inheritDoc} |
|
35 |
* |
|
36 |
* @see eu.dnetlib.data.information.oai.publisher.store.Cursor#count() |
|
37 |
*/ |
|
38 |
@Override |
|
39 |
public int count() { |
|
40 |
return this.dbCursor.count(); |
|
41 |
} |
|
42 |
|
|
43 |
/** |
|
44 |
* |
|
45 |
* {@inheritDoc} |
|
46 |
* |
|
47 |
* @see java.lang.Iterable#iterator() |
|
48 |
*/ |
|
49 |
@Override |
|
50 |
public Iterator<RecordInfo> iterator() { |
|
51 |
|
|
52 |
return new Iterator<RecordInfo>() { |
|
53 |
|
|
54 |
@Override |
|
55 |
public boolean hasNext() { |
|
56 |
return dbCursor.hasNext(); |
|
57 |
} |
|
58 |
|
|
59 |
@Override |
|
60 |
public RecordInfo next() { |
|
61 |
DBObject res = dbCursor.next(); |
|
62 |
RecordInfo info = recordInfoGenerator.transformDBObject(res, bodyIncluded); |
|
63 |
if ((function != null) && bodyIncluded && (info != null)) { |
|
64 |
info.setMetadata(function.evaluate(info.getMetadata())); |
|
65 |
} |
|
66 |
return info; |
|
67 |
} |
|
68 |
|
|
69 |
@Override |
|
70 |
public void remove() { |
|
71 |
throw new UnsupportedOperationException(); |
|
72 |
} |
|
73 |
|
|
74 |
}; |
|
75 |
} |
|
76 |
|
|
77 |
public MongoCursor() { |
|
78 |
super(); |
|
79 |
} |
|
80 |
|
|
81 |
public MongoCursor(final DBCursor dbCursor, final boolean bodyIncluded, final RecordInfoGenerator recordInfoGenerator, |
|
82 |
final MetadataExtractor metadataExtractor) { |
|
83 |
this(dbCursor, null, bodyIncluded, recordInfoGenerator, metadataExtractor); |
|
84 |
} |
|
85 |
|
|
86 |
public MongoCursor(final DBCursor dbCursor, final UnaryFunction<String, String> function, final boolean bodyIncluded, |
|
87 |
final RecordInfoGenerator recordInfoGenerator, final MetadataExtractor metadataExtractor) { |
|
88 |
super(); |
|
89 |
this.dbCursor = dbCursor; |
|
90 |
this.function = function; |
|
91 |
this.bodyIncluded = bodyIncluded; |
|
92 |
this.recordInfoGenerator = recordInfoGenerator; |
|
93 |
this.metadataExtractor = metadataExtractor; |
|
94 |
} |
|
95 |
|
|
96 |
public UnaryFunction<String, String> getFunction() { |
|
97 |
return function; |
|
98 |
} |
|
99 |
|
|
100 |
public void setFunction(final UnaryFunction<String, String> function) { |
|
101 |
this.function = function; |
|
102 |
} |
|
103 |
|
|
104 |
public DBCursor getDbCursor() { |
|
105 |
return dbCursor; |
|
106 |
} |
|
107 |
|
|
108 |
public void setDbCursor(final DBCursor dbCursor) { |
|
109 |
this.dbCursor = dbCursor; |
|
110 |
} |
|
111 |
|
|
112 |
@Override |
|
113 |
public boolean isBodyIncluded() { |
|
114 |
return this.bodyIncluded; |
|
115 |
} |
|
116 |
|
|
117 |
@Override |
|
118 |
public void setBodyIncluded(final boolean bodyIncluded) { |
|
119 |
this.bodyIncluded = bodyIncluded; |
|
120 |
} |
|
121 |
|
|
122 |
public RecordInfoGenerator getRecordInfoGenerator() { |
|
123 |
return recordInfoGenerator; |
|
124 |
} |
|
125 |
|
|
126 |
public void setRecordInfoGenerator(final RecordInfoGenerator recordInfoGenerator) { |
|
127 |
this.recordInfoGenerator = recordInfoGenerator; |
|
128 |
} |
|
129 |
|
|
130 |
public MetadataExtractor getMetadataExtractor() { |
|
131 |
return metadataExtractor; |
|
132 |
} |
|
133 |
|
|
134 |
public void setMetadataExtractor(final MetadataExtractor metadataExtractor) { |
|
135 |
this.metadataExtractor = metadataExtractor; |
|
136 |
} |
|
137 |
|
|
138 |
public ProvenanceExtractor getProvenanceExtractor() { |
|
139 |
return provenanceExtractor; |
|
140 |
} |
|
141 |
|
|
142 |
public void setProvenanceExtractor(final ProvenanceExtractor provenanceExtractor) { |
|
143 |
this.provenanceExtractor = provenanceExtractor; |
|
144 |
} |
|
145 |
|
|
146 |
} |
modules/dnet-oai-store-service/trunk/src/test/java/eu/dnetlib/data/oai/store/parser/MongoQueryParserTest.java | ||
---|---|---|
8 | 8 |
import com.mongodb.util.JSON; |
9 | 9 |
import org.apache.commons.logging.Log; |
10 | 10 |
import org.apache.commons.logging.LogFactory; |
11 |
import org.bson.conversions.Bson; |
|
11 | 12 |
import org.bson.types.ObjectId; |
12 | 13 |
import org.joda.time.format.DateTimeFormat; |
13 | 14 |
import org.joda.time.format.DateTimeFormatter; |
... | ... | |
18 | 19 |
|
19 | 20 |
public class MongoQueryParserTest { |
20 | 21 |
|
22 |
private static final Log log = LogFactory.getLog(MongoQueryParserTest.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
21 | 23 |
private final MongoQueryParser mongoParser = new MongoQueryParser(); |
22 |
private static final Log log = LogFactory.getLog(MongoQueryParserTest.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
23 | 24 |
|
24 | 25 |
@Test |
25 | 26 |
public void testParseEq() { |
26 |
DBObject expected = new BasicDBObject("set", "CEDIASManuscripts");
|
|
27 |
DBObject o = this.mongoParser.parse("set = \"CEDIASManuscripts\"");
|
|
27 |
Bson expected = new BasicDBObject("set", "CEDIASManuscripts");
|
|
28 |
Bson o = this.mongoParser.parse("set = \"CEDIASManuscripts\"");
|
|
28 | 29 |
assertEquals(expected, o); |
29 | 30 |
|
30 | 31 |
} |
31 | 32 |
|
32 | 33 |
@Test |
33 | 34 |
public void testParseNeq() { |
34 |
DBObject expected = new BasicDBObject("set", new BasicDBObject("$ne", "CEDIASManuscripts"));
|
|
35 |
DBObject o = this.mongoParser.parse("set <> \"CEDIASManuscripts\"");
|
|
35 |
Bson expected = new BasicDBObject("set", new BasicDBObject("$ne", "CEDIASManuscripts"));
|
|
36 |
Bson o = this.mongoParser.parse("set <> \"CEDIASManuscripts\"");
|
|
36 | 37 |
assertEquals(expected, o); |
37 | 38 |
} |
38 | 39 |
|
39 | 40 |
@Test |
40 | 41 |
public void testParseAnd() { |
41 |
DBObject expected = new BasicDBObject("$and", Lists.newArrayList(new BasicDBObject("set", new BasicDBObject("$ne", "CEDIASManuscripts")), |
|
42 |
BasicDBObject expected = new BasicDBObject("$and", Lists.newArrayList(new BasicDBObject("set", new BasicDBObject("$ne", "CEDIASManuscripts")),
|
|
42 | 43 |
new BasicDBObject("pippo", new BasicDBObject("$gt", "x")))); |
43 |
DBObject o = this.mongoParser.parse("set <> \"CEDIASManuscripts\" AND pippo > x");
|
|
44 |
Bson o = this.mongoParser.parse("set <> \"CEDIASManuscripts\" AND pippo > x");
|
|
44 | 45 |
log.info(o); |
45 | 46 |
assertEquals(expected, o); |
46 | 47 |
} |
47 | 48 |
|
48 | 49 |
@Test |
49 | 50 |
public void testParseOr() { |
50 |
DBObject expected = new BasicDBObject("$or", Lists.newArrayList(new BasicDBObject("set", new BasicDBObject("$ne", "CEDIASManuscripts")), |
|
51 |
BasicDBObject expected = new BasicDBObject("$or", Lists.newArrayList(new BasicDBObject("set", new BasicDBObject("$ne", "CEDIASManuscripts")),
|
|
51 | 52 |
new BasicDBObject("pippo", new BasicDBObject("$gt", "x")))); |
52 |
DBObject o = this.mongoParser.parse("set <> \"CEDIASManuscripts\" OR pippo > x");
|
|
53 |
Bson o = this.mongoParser.parse("set <> \"CEDIASManuscripts\" OR pippo > x");
|
|
53 | 54 |
log.info(o); |
54 | 55 |
assertEquals(expected, o); |
55 | 56 |
} |
56 | 57 |
|
57 | 58 |
@Test |
58 | 59 |
public void testParseNot() { |
59 |
DBObject expected = new BasicDBObject("$and", Lists.newArrayList(new BasicDBObject("set", "CEDIASManuscripts"), new BasicDBObject("$not", |
|
60 |
BasicDBObject expected = new BasicDBObject("$and", Lists.newArrayList(new BasicDBObject("set", "CEDIASManuscripts"), new BasicDBObject("$not",
|
|
60 | 61 |
new BasicDBObject("pippo", new BasicDBObject("$gt", "x"))))); |
61 |
DBObject o = this.mongoParser.parse("set = \"CEDIASManuscripts\" NOT pippo > x");
|
|
62 |
log.info(o);
|
|
62 |
Bson o = this.mongoParser.parse("set = \"CEDIASManuscripts\" NOT pippo > x");
|
|
63 |
//log.info(o)
|
|
63 | 64 |
assertEquals(expected, o); |
64 | 65 |
} |
65 | 66 |
|
66 | 67 |
@Test |
67 | 68 |
public void testParseStar() { |
68 |
DBObject expected = new BasicDBObject(); |
|
69 |
DBObject o = this.mongoParser.parse("*");
|
|
70 |
DBObject o2 = this.mongoParser.parse("*=*");
|
|
69 |
BasicDBObject expected = new BasicDBObject();
|
|
70 |
Bson o = this.mongoParser.parse("*");
|
|
71 |
Bson o2 = this.mongoParser.parse("*=*");
|
|
71 | 72 |
assertEquals(expected, o); |
72 | 73 |
assertEquals(expected, o2); |
73 | 74 |
} |
74 | 75 |
|
75 | 76 |
@Test |
76 | 77 |
public void testParseStarAnd() { |
77 |
DBObject expected = new BasicDBObject("$and", Lists.newArrayList(new BasicDBObject(), new BasicDBObject("pippo", new BasicDBObject("$gt", "x")))); |
|
78 |
DBObject o = this.mongoParser.parse("* AND pippo > x");
|
|
79 |
DBObject o2 = this.mongoParser.parse("*=* AND pippo > x");
|
|
78 |
BasicDBObject expected = new BasicDBObject("$and", Lists.newArrayList(new BasicDBObject(), new BasicDBObject("pippo", new BasicDBObject("$gt", "x"))));
|
|
79 |
Bson o = this.mongoParser.parse("* AND pippo > x");
|
|
80 |
Bson o2 = this.mongoParser.parse("*=* AND pippo > x");
|
|
80 | 81 |
assertEquals(expected, o); |
81 | 82 |
assertEquals(expected, o2); |
82 | 83 |
} |
83 | 84 |
|
84 | 85 |
@Test |
85 | 86 |
public void testParseIdQuery() { |
86 |
DBObject expected = new BasicDBObject("_id", new BasicDBObject("$gt", new ObjectId("5225e093aabf055637bf2c65"))); |
|
87 |
DBObject o = this.mongoParser.parse("_id > 5225e093aabf055637bf2c65");
|
|
87 |
BasicDBObject expected = new BasicDBObject("_id", new BasicDBObject("$gt", new ObjectId("5225e093aabf055637bf2c65")));
|
|
88 |
Bson o = this.mongoParser.parse("_id > 5225e093aabf055637bf2c65");
|
|
88 | 89 |
assertEquals(expected, o); |
89 | 90 |
} |
90 | 91 |
|
... | ... | |
92 | 93 |
public void testParseUntilDatestamp() { |
93 | 94 |
Date dateTime = this.parseDate("2014-04-03T00:00:00.000Z"); |
94 | 95 |
// { "$and" : [ { "oaftype" : { "$ne" : "person"}} , { "datestamp" : { "$lt" : { "$date" : "2014-04-03T00:00:00.000Z"}}}]} |
95 |
DBObject expected = new BasicDBObject("$and", Lists.newArrayList(new BasicDBObject("oaftype", new BasicDBObject("$ne", "person")), new BasicDBObject( |
|
96 |
BasicDBObject expected = |
|
97 |
new BasicDBObject("$and", Lists.newArrayList(new BasicDBObject("oaftype", new BasicDBObject("$ne", "person")), new BasicDBObject( |
|
96 | 98 |
"datestamp", new BasicDBObject("$lt", dateTime)))); |
97 | 99 |
// System.out.println(expected); |
98 |
DBObject o = this.mongoParser.parse("(oaftype <> \"person\") AND datestamp <= 2014-04-02");
|
|
100 |
Bson o = this.mongoParser.parse("(oaftype <> \"person\") AND datestamp <= 2014-04-02");
|
|
99 | 101 |
assertEquals(expected, o); |
100 | 102 |
// System.out.println(o); |
101 | 103 |
} |
modules/dnet-oai-store-service/trunk/src/main/java/eu/dnetlib/data/oai/store/mongo/DNetOAIMongoCursor.java | ||
---|---|---|
1 |
package eu.dnetlib.data.oai.store.mongo; |
|
2 |
|
|
3 |
import java.util.Iterator; |
|
4 |
|
|
5 |
import com.google.common.collect.Lists; |
|
6 |
import com.mongodb.DBObject; |
|
7 |
import com.mongodb.client.MongoCursor; |
|
8 |
import eu.dnetlib.data.information.oai.publisher.info.RecordInfo; |
|
9 |
import eu.dnetlib.data.oai.store.Cursor; |
|
10 |
import eu.dnetlib.miscutils.functional.UnaryFunction; |
|
11 |
|
|
12 |
public class DNetOAIMongoCursor implements Cursor { |
|
13 |
|
|
14 |
/** |
|
15 |
* Underlying mongo cursor. |
|
16 |
*/ |
|
17 |
private MongoCursor<DBObject> dbCursor; |
|
18 |
private int size = 0; |
|
19 |
/** |
|
20 |
* Function to apply to records before delivering. |
|
21 |
*/ |
|
22 |
private UnaryFunction<String, String> function; |
|
23 |
|
|
24 |
/** |
|
25 |
* true if the RecordInfo returned by this Cursor must include the record body, false otherwise. |
|
26 |
*/ |
|
27 |
private boolean bodyIncluded; |
|
28 |
|
|
29 |
private RecordInfoGenerator recordInfoGenerator; |
|
30 |
private MetadataExtractor metadataExtractor; |
|
31 |
private ProvenanceExtractor provenanceExtractor; |
|
32 |
|
|
33 |
public DNetOAIMongoCursor() { |
|
34 |
super(); |
|
35 |
} |
|
36 |
|
|
37 |
public DNetOAIMongoCursor(final MongoCursor<DBObject> dbCursor, final boolean bodyIncluded, final RecordInfoGenerator recordInfoGenerator, |
|
38 |
final MetadataExtractor metadataExtractor) { |
|
39 |
this(dbCursor, null, bodyIncluded, recordInfoGenerator, metadataExtractor); |
|
40 |
} |
|
41 |
|
|
42 |
public DNetOAIMongoCursor(final MongoCursor<DBObject> dbCursor, final UnaryFunction<String, String> function, final boolean bodyIncluded, |
|
43 |
final RecordInfoGenerator recordInfoGenerator, final MetadataExtractor metadataExtractor) { |
|
44 |
super(); |
|
45 |
this.dbCursor = dbCursor; |
|
46 |
this.size = 0; |
|
47 |
this.function = function; |
|
48 |
this.bodyIncluded = bodyIncluded; |
|
49 |
this.recordInfoGenerator = recordInfoGenerator; |
|
50 |
this.metadataExtractor = metadataExtractor; |
|
51 |
} |
|
52 |
|
|
53 |
/** |
|
54 |
* |
|
55 |
* {@inheritDoc} |
|
56 |
*/ |
|
57 |
@Override |
|
58 |
public int count() { |
|
59 |
//I can do it because MongoCursor are always created from queries with "limit", so I do not expect the creation of the list to explode |
|
60 |
//to not exagerate, I'll get the size only if the current size is 0 |
|
61 |
if (size == 0) |
|
62 |
size = Lists.newArrayList(dbCursor).size(); |
|
63 |
return size; |
|
64 |
} |
|
65 |
|
|
66 |
/** |
|
67 |
* |
|
68 |
* {@inheritDoc} |
|
69 |
* |
|
70 |
* @see java.lang.Iterable#iterator() |
|
71 |
*/ |
|
72 |
@Override |
|
73 |
public Iterator<RecordInfo> iterator() { |
|
74 |
|
|
75 |
return new Iterator<RecordInfo>() { |
|
76 |
|
|
77 |
@Override |
|
78 |
public boolean hasNext() { |
|
79 |
return dbCursor.hasNext(); |
|
80 |
} |
|
81 |
|
|
82 |
@Override |
|
83 |
public RecordInfo next() { |
|
84 |
DBObject res = dbCursor.next(); |
|
85 |
RecordInfo info = recordInfoGenerator.transformDBObject(res, bodyIncluded); |
|
86 |
if ((function != null) && bodyIncluded && (info != null)) { |
|
87 |
info.setMetadata(function.evaluate(info.getMetadata())); |
|
88 |
} |
|
89 |
return info; |
|
90 |
} |
|
91 |
|
|
92 |
@Override |
|
93 |
public void remove() { |
|
94 |
throw new UnsupportedOperationException(); |
|
95 |
} |
|
96 |
|
|
97 |
}; |
|
98 |
} |
|
99 |
|
|
100 |
public UnaryFunction<String, String> getFunction() { |
|
101 |
return function; |
|
102 |
} |
|
103 |
|
|
104 |
public void setFunction(final UnaryFunction<String, String> function) { |
|
105 |
this.function = function; |
|
106 |
} |
|
107 |
|
|
108 |
public MongoCursor<DBObject> getDbCursor() { |
|
109 |
return dbCursor; |
|
110 |
} |
|
111 |
|
|
112 |
public void setDbCursor(final MongoCursor<DBObject> dbCursor) { |
|
113 |
this.dbCursor = dbCursor; |
|
114 |
} |
|
115 |
|
|
116 |
@Override |
|
117 |
public boolean isBodyIncluded() { |
|
118 |
return this.bodyIncluded; |
|
119 |
} |
|
120 |
|
|
121 |
@Override |
|
122 |
public void setBodyIncluded(final boolean bodyIncluded) { |
|
123 |
this.bodyIncluded = bodyIncluded; |
|
124 |
} |
|
125 |
|
|
126 |
public RecordInfoGenerator getRecordInfoGenerator() { |
|
127 |
return recordInfoGenerator; |
|
128 |
} |
|
129 |
|
|
130 |
public void setRecordInfoGenerator(final RecordInfoGenerator recordInfoGenerator) { |
|
131 |
this.recordInfoGenerator = recordInfoGenerator; |
|
132 |
} |
|
133 |
|
|
134 |
public MetadataExtractor getMetadataExtractor() { |
|
135 |
return metadataExtractor; |
|
136 |
} |
|
137 |
|
|
138 |
public void setMetadataExtractor(final MetadataExtractor metadataExtractor) { |
|
139 |
this.metadataExtractor = metadataExtractor; |
|
140 |
} |
|
141 |
|
|
142 |
public ProvenanceExtractor getProvenanceExtractor() { |
|
143 |
return provenanceExtractor; |
|
144 |
} |
|
145 |
|
|
146 |
public void setProvenanceExtractor(final ProvenanceExtractor provenanceExtractor) { |
|
147 |
this.provenanceExtractor = provenanceExtractor; |
|
148 |
} |
|
149 |
|
|
150 |
} |
modules/dnet-oai-store-service/trunk/src/main/java/eu/dnetlib/data/oai/store/mongo/MongoPublisherStore.java | ||
---|---|---|
10 | 10 |
import java.util.zip.ZipEntry; |
11 | 11 |
import java.util.zip.ZipOutputStream; |
12 | 12 |
|
13 |
import org.apache.commons.io.output.ByteArrayOutputStream; |
|
14 |
import org.apache.commons.lang.StringUtils; |
|
15 |
import org.apache.commons.logging.Log; |
|
16 |
import org.apache.commons.logging.LogFactory; |
|
17 |
import org.bson.types.Binary; |
|
18 |
|
|
19 | 13 |
import com.google.common.base.Function; |
20 | 14 |
import com.google.common.base.Predicate; |
21 | 15 |
import com.google.common.base.Stopwatch; |
22 | 16 |
import com.google.common.collect.Iterables; |
17 |
import com.google.common.collect.Lists; |
|
23 | 18 |
import com.google.common.collect.Multimap; |
24 | 19 |
import com.mongodb.BasicDBObject; |
25 | 20 |
import com.mongodb.BasicDBObjectBuilder; |
26 |
import com.mongodb.DBCollection; |
|
27 |
import com.mongodb.DBCursor; |
|
28 | 21 |
import com.mongodb.DBObject; |
29 | 22 |
import com.mongodb.WriteConcern; |
30 |
import com.mongodb.WriteResult; |
|
31 |
|
|
23 |
import com.mongodb.client.FindIterable; |
|
24 |
import com.mongodb.client.ListIndexesIterable; |
|
25 |
import com.mongodb.client.MongoCollection; |
|
26 |
import com.mongodb.client.MongoDatabase; |
|
27 |
import com.mongodb.client.model.Filters; |
|
28 |
import com.mongodb.client.model.IndexOptions; |
|
29 |
import com.mongodb.client.model.Sorts; |
|
30 |
import com.mongodb.client.model.UpdateOptions; |
|
31 |
import com.mongodb.client.result.DeleteResult; |
|
32 |
import com.mongodb.client.result.UpdateResult; |
|
32 | 33 |
import eu.dnetlib.data.information.oai.publisher.OaiPublisherRuntimeException; |
33 | 34 |
import eu.dnetlib.data.information.oai.publisher.PublisherField; |
34 | 35 |
import eu.dnetlib.data.information.oai.publisher.conf.OAIConfigurationReader; |
... | ... | |
40 | 41 |
import eu.dnetlib.data.oai.store.parser.PublisherRecordParser; |
41 | 42 |
import eu.dnetlib.data.oai.store.sets.MongoSetCollection; |
42 | 43 |
import eu.dnetlib.miscutils.functional.UnaryFunction; |
44 |
import org.apache.commons.io.output.ByteArrayOutputStream; |
|
45 |
import org.apache.commons.lang.StringUtils; |
|
46 |
import org.apache.commons.logging.Log; |
|
47 |
import org.apache.commons.logging.LogFactory; |
|
48 |
import org.bson.conversions.Bson; |
|
49 |
import org.bson.types.Binary; |
|
43 | 50 |
|
44 |
public class MongoPublisherStore implements PublisherStore<MongoCursor> { |
|
51 |
public class MongoPublisherStore implements PublisherStore<DNetOAIMongoCursor> {
|
|
45 | 52 |
|
46 | 53 |
private static final Log log = LogFactory.getLog(MongoPublisherStore.class); // NOPMD by marko on 11/24/08 5:02 PM |
47 | 54 |
|
... | ... | |
49 | 56 |
/** Keeps information about the fields to be created in mongo. **/ |
50 | 57 |
private List<PublisherField> mongoFields; |
51 | 58 |
|
52 |
private DBCollection collection;
|
|
53 |
private DBCollection discardedCollection;
|
|
59 |
private MongoCollection<DBObject> collection;
|
|
60 |
private MongoCollection<DBObject> discardedCollection;
|
|
54 | 61 |
|
55 | 62 |
private RecordInfoGenerator recordInfoGenerator; |
56 | 63 |
private MetadataExtractor metadataExtractor; |
... | ... | |
74 | 81 |
|
75 | 82 |
private boolean alwaysNewRecord; |
76 | 83 |
|
84 |
public MongoPublisherStore() { |
|
85 |
super(); |
|
86 |
} |
|
87 |
|
|
88 |
public MongoPublisherStore(final String id, |
|
89 |
final String metadataFormat, |
|
90 |
final String interpretation, |
|
91 |
final String layout, |
|
92 |
final MongoCollection<DBObject> collection, |
|
93 |
final List<PublisherField> mongoFields, |
|
94 |
final MongoQueryParser queryParser, |
|
95 |
final RecordInfoGenerator recordInfoGenerator, |
|
96 |
final String idScheme, |
|
97 |
final String idNamespace, |
|
98 |
final MetadataExtractor metadataExtractor, |
|
99 |
final RecordChangeDetector recordChangeDetector, |
|
100 |
final boolean alwaysNewRecord, |
|
101 |
final MongoDatabase mongodb) { |
|
102 |
super(); |
|
103 |
this.id = id; |
|
104 |
this.metadataFormat = metadataFormat; |
|
105 |
this.interpretation = interpretation; |
|
106 |
this.layout = layout; |
|
107 |
this.collection = collection; |
|
108 |
this.discardedCollection = mongodb.getCollection("discarded-" + collection.getNamespace().getCollectionName(), DBObject.class); |
|
109 |
this.mongoFields = mongoFields; |
|
110 |
this.queryParser = queryParser; |
|
111 |
this.recordInfoGenerator = recordInfoGenerator; |
|
112 |
this.idScheme = idScheme; |
|
113 |
this.idNamespace = idNamespace; |
|
114 |
this.recordChangeDetector = recordChangeDetector; |
|
115 |
this.alwaysNewRecord = alwaysNewRecord; |
|
116 |
} |
|
117 |
|
|
77 | 118 |
@Override |
78 | 119 |
public RecordInfo getRecord(final String recordId) { |
79 |
DBObject query = new BasicDBObject(OAIConfigurationReader.ID_FIELD, recordId);
|
|
80 |
DBObject result = this.collection.findOne(query);
|
|
120 |
Bson query = Filters.eq(OAIConfigurationReader.ID_FIELD, recordId);
|
|
121 |
DBObject result = this.collection.find(query).first();
|
|
81 | 122 |
log.debug(result); |
82 | 123 |
return this.recordInfoGenerator.transformDBObject(result, true); |
83 | 124 |
} |
... | ... | |
93 | 134 |
} |
94 | 135 |
|
95 | 136 |
@Override |
96 |
public MongoCursor getRecords(final String queryString, final boolean bodyIncluded, final int limit) { |
|
97 |
DBCursor cursor = loggedFindByQuery(queryString, limit);
|
|
98 |
return new MongoCursor(cursor, bodyIncluded, this.recordInfoGenerator, this.metadataExtractor);
|
|
137 |
public DNetOAIMongoCursor getRecords(final String queryString, final boolean bodyIncluded, final int limit) {
|
|
138 |
FindIterable<DBObject> iter = loggedFindByQuery(queryString, limit);
|
|
139 |
return new DNetOAIMongoCursor(iter.iterator(), bodyIncluded, this.recordInfoGenerator, this.metadataExtractor);
|
|
99 | 140 |
} |
100 | 141 |
|
101 | 142 |
@Override |
102 |
public MongoCursor getRecords(final String queryString, final UnaryFunction<String, String> unaryFunction, final boolean bodyIncluded, final int limit) { |
|
103 |
DBCursor cursor = loggedFindByQuery(queryString, limit); |
|
104 |
return new MongoCursor(cursor, unaryFunction, bodyIncluded, this.recordInfoGenerator, this.metadataExtractor); |
|
143 |
public DNetOAIMongoCursor getRecords(final String queryString, |
|
144 |
final UnaryFunction<String, String> unaryFunction, |
|
145 |
final boolean bodyIncluded, |
|
146 |
final int limit) { |
|
147 |
FindIterable<DBObject> iter = loggedFindByQuery(queryString, limit); |
|
148 |
return new DNetOAIMongoCursor(iter.iterator(), unaryFunction, bodyIncluded, this.recordInfoGenerator, this.metadataExtractor); |
|
105 | 149 |
} |
106 | 150 |
|
107 |
private DBCursor loggedFindByQuery(final String queryString, final int limit) {
|
|
108 |
DBObject query = this.queryParser.parse(queryString);
|
|
151 |
private FindIterable<DBObject> loggedFindByQuery(final String queryString, final int limit) {
|
|
152 |
Bson query = this.queryParser.parse(queryString);
|
|
109 | 153 |
long start = System.currentTimeMillis(); |
110 |
DBCursor cursor = this.collection.find(query).sort(new BasicDBObject("_id", 1)).limit(limit); |
|
154 |
Bson sortByIdAsc = Sorts.orderBy(Sorts.ascending("_id")); |
|
155 |
FindIterable<DBObject> iter = this.collection.find(query).sort(sortByIdAsc).limit(limit); |
|
111 | 156 |
long end = System.currentTimeMillis(); |
112 |
log.debug("Query:" + query + "\ntime to get mongo cursor (ms): " + (end - start));
|
|
113 |
return cursor;
|
|
157 |
log.debug("Query:" + query + "\ntime to get mongo iterable (ms): " + (end - start));
|
|
158 |
return iter;
|
|
114 | 159 |
} |
115 | 160 |
|
116 | 161 |
@Override |
... | ... | |
127 | 172 |
* then you have to specify it in the configuration file of the OAI Publisher: <br> |
128 | 173 |
* <INDEX name="deleted"> |
129 | 174 |
* </p> |
130 |
*
|
|
175 |
* |
|
131 | 176 |
* {@inheritDoc} |
132 |
* |
|
133 |
* @see eu.dnetlib.data.information.oai.publisher.store.PublisherStore#ensureIndices() |
|
177 |
* |
|
134 | 178 |
*/ |
135 | 179 |
@Override |
136 | 180 |
public void ensureIndices() { |
137 |
List<DBObject> indexInfoList = this.collection.getIndexInfo(); |
|
181 |
final ListIndexesIterable<BasicDBObject> indexesIterable = this.collection.listIndexes(BasicDBObject.class); |
|
182 |
final IndexOptions indexOptions = new IndexOptions().background(true); |
|
138 | 183 |
Stopwatch sw = Stopwatch.createUnstarted(); |
139 | 184 |
sw.start(); |
140 | 185 |
// I want to keep the composite indexes that might have been defined manually |
141 | 186 |
log.debug("Ensuring currently defined composite indexes:"); |
142 |
DBObject indexOptions = new BasicDBObject("background", true); |
|
143 |
for (DBObject o : indexInfoList) { |
|
144 |
DBObject fieldIndexed = (DBObject) o.get("key"); |
|
187 |
for (BasicDBObject o : indexesIterable) { |
|
188 |
BasicDBObject fieldIndexed = (BasicDBObject) o.get("key"); |
|
145 | 189 |
if (fieldIndexed.keySet().size() > 1) { |
146 | 190 |
log.debug(o); |
147 | 191 |
this.collection.createIndex(fieldIndexed, indexOptions); |
148 | 192 |
} |
149 | 193 |
} |
194 |
|
|
150 | 195 |
// Indexes on single fields. |
151 | 196 |
for (PublisherField field : this.mongoFields) { |
152 |
DBObject mongoIdx = new BasicDBObject(field.getFieldName(), 1); |
|
197 |
BasicDBObject mongoIdx = new BasicDBObject(field.getFieldName(), 1);
|
|
153 | 198 |
log.debug("Creating index : " + mongoIdx); |
154 | 199 |
this.collection.createIndex(mongoIdx, indexOptions); |
155 | 200 |
} |
... | ... | |
166 | 211 |
* <p> |
167 | 212 |
* The creation is performed on the background |
168 | 213 |
* </p> |
169 |
*
|
|
214 |
* |
|
170 | 215 |
* @param fieldNames |
171 | 216 |
* List of fields to be included in the compound index |
172 | 217 |
* @theStore MongoPublisherStore where to create the index |
... | ... | |
175 | 220 |
if ((fieldNames == null) || fieldNames.isEmpty()) { |
176 | 221 |
log.fatal("No fields specified for the creation of the compound index"); |
177 | 222 |
} |
178 |
DBObject indexOptions = new BasicDBObject("background", true); |
|
179 | 223 |
BasicDBObjectBuilder theIndexBuilder = BasicDBObjectBuilder.start(); |
180 | 224 |
for (String f : fieldNames) { |
181 | 225 |
theIndexBuilder.add(f, 1); |
182 | 226 |
} |
183 |
DBObject theIndex = theIndexBuilder.get();
|
|
227 |
BasicDBObject theIndex = (BasicDBObject) theIndexBuilder.get();
|
|
184 | 228 |
log.info("Creating index " + theIndex + " on " + this.getId()); |
185 |
this.getCollection().createIndex(theIndex, indexOptions);
|
|
229 |
this.getCollection().createIndex(theIndex, new IndexOptions().background(true));
|
|
186 | 230 |
} |
187 | 231 |
|
188 | 232 |
private void dropDiscarded(final String source) { |
... | ... | |
191 | 235 |
discardedCollection.drop(); |
192 | 236 |
} else { |
193 | 237 |
log.debug("Dropping discarded records for source " + source + " from publisherStore " + id); |
194 |
discardedCollection.remove(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source));
|
|
238 |
discardedCollection.deleteMany(Filters.eq(OAIConfigurationReader.SET_FIELD, source));
|
|
195 | 239 |
} |
196 | 240 |
} |
197 | 241 |
|
... | ... | |
205 | 249 |
|
206 | 250 |
@Override |
207 | 251 |
public void run() { |
252 |
//For fast feeding we want to use a collection with unack write concern |
|
253 |
final MongoCollection<DBObject> unackCollection = collection.withWriteConcern(WriteConcern.UNACKNOWLEDGED); |
|
208 | 254 |
while (true) { |
209 | 255 |
try { |
210 | 256 |
Object record = queue.take(); |
211 | 257 |
if (record == sentinel) { |
212 | 258 |
break; |
213 | 259 |
} |
214 |
safeFeedRecord((String) record, source, feedDate); |
|
260 |
safeFeedRecord((String) record, source, feedDate, unackCollection);
|
|
215 | 261 |
} catch (InterruptedException e) { |
216 | 262 |
log.fatal("got exception in background thread", e); |
217 | 263 |
throw new IllegalStateException(e); |
... | ... | |
246 | 292 |
* http://www.openarchives.org/OAI/openarchivesprotocol.html#DeletedRecords: if a repository does keep track of deletions then the |
247 | 293 |
* datestamp of the deleted record must be the date and time that it was deleted. |
248 | 294 |
* </p> |
249 |
*
|
|
295 |
* |
|
250 | 296 |
* @param feedDate |
251 | 297 |
* @param source |
252 | 298 |
*/ |
253 | 299 |
private void setDeletedFlags(final Date feedDate, final String source) { |
300 |
//get the collection with ACKNOWLEDGE Write concern |
|
301 |
final MongoCollection<DBObject> ackCollection = collection.withWriteConcern(WriteConcern.ACKNOWLEDGED); |
|
254 | 302 |
Thread deletedSetter = new Thread(new Runnable() { |
255 | 303 |
|
256 | 304 |
@Override |
257 | 305 |
public void run() { |
258 |
DBObject query = BasicDBObjectBuilder.start(OAIConfigurationReader.DELETED_FIELD, false)
|
|
259 |
.append(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, new BasicDBObject("$lt", feedDate)).get();
|
|
306 |
Bson filter = Filters.and(Filters.eq(OAIConfigurationReader.DELETED_FIELD, false),
|
|
307 |
Filters.lt(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate));
|
|
260 | 308 |
if (!StringUtils.isBlank(source)) { |
261 |
query.put(OAIConfigurationReader.SET_FIELD, source);
|
|
309 |
filter = Filters.and(filter, Filters.eq(OAIConfigurationReader.SET_FIELD, source));
|
|
262 | 310 |
} |
263 |
log.debug("Delete flag query: " + query.toString()); |
|
264 |
DBObject update = new BasicDBObject("$set", BasicDBObjectBuilder.start(OAIConfigurationReader.DELETED_FIELD, true) |
|
265 |
.append(OAIConfigurationReader.DATESTAMP_FIELD, feedDate).append(OAIConfigurationReader.UPDATED_FIELD, true).get()); |
|
311 |
log.debug("Delete flag query: " + filter); |
|
312 |
BasicDBObject update = new BasicDBObject("$set", |
|
313 |
BasicDBObjectBuilder.start(OAIConfigurationReader.DELETED_FIELD, true).append(OAIConfigurationReader.DATESTAMP_FIELD, feedDate) |
|
314 |
.append(OAIConfigurationReader.UPDATED_FIELD, true).get()); |
|
266 | 315 |
log.debug("Updating as: " + update.toString()); |
267 |
WriteResult wr = collection.update(query, update, false, true, WriteConcern.ACKNOWLEDGED);
|
|
268 |
log.debug("Deleted flags set for source: " + source + " #records = " + wr.getN());
|
|
316 |
final UpdateResult updateResult = ackCollection.updateMany(filter, update, new UpdateOptions().upsert(false));
|
|
317 |
log.debug("Deleted flags set for source: " + source + " #records = " + updateResult.getModifiedCount());
|
|
269 | 318 |
} |
270 | 319 |
}); |
271 | 320 |
|
... | ... | |
284 | 333 |
|
285 | 334 |
@Override |
286 | 335 |
public void drop(final String queryString) { |
287 |
DBObject query = this.queryParser.parse(queryString); |
|
288 |
this.collection.remove(query); |
|
336 |
Bson query = this.queryParser.parse(queryString); |
|
337 |
final DeleteResult deleteResult = this.collection.deleteMany(query); |
|
338 |
log.debug("Deleted by query: " + queryString + " #deleted: " + deleteResult.getDeletedCount()); |
|
339 |
|
|
289 | 340 |
} |
290 | 341 |
|
291 | 342 |
@Override |
... | ... | |
296 | 347 |
@Override |
297 | 348 |
public int count(final String queryString) { |
298 | 349 |
if (StringUtils.isBlank(queryString)) return (int) this.collection.count(); |
299 |
DBObject query = this.queryParser.parse(queryString);
|
|
350 |
Bson query = this.queryParser.parse(queryString);
|
|
300 | 351 |
return (int) this.collection.count(query); |
301 | 352 |
} |
302 | 353 |
|
303 | 354 |
public List<String> getDistinctSetNamesFromRecords() { |
304 | 355 |
log.info("Going to ask for all distinct sets in the oaistore " + id + ": this may take a long time..."); |
305 |
@SuppressWarnings("unchecked") |
|
306 |
List<String> distinctSets = this.collection.distinct(OAIConfigurationReader.SET_FIELD); |
|
307 |
return distinctSets; |
|
356 |
return Lists.newArrayList(this.collection.distinct(OAIConfigurationReader.SET_FIELD, String.class)); |
|
308 | 357 |
} |
309 | 358 |
|
310 | 359 |
// ***********************************************************************************************// |
311 | 360 |
// Feed utilities |
312 | 361 |
// ***********************************************************************************************// |
313 |
private boolean safeFeedRecord(final String record, final String source, final Date feedDate) { |
|
362 |
private boolean safeFeedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) {
|
|
314 | 363 |
try { |
315 |
if (!record.isEmpty()) return feedRecord(record, source, feedDate); |
|
364 |
if (!record.isEmpty()) return feedRecord(record, source, feedDate, unackCollection);
|
|
316 | 365 |
} catch (final Throwable e) { |
317 | 366 |
log.error("Got unhandled exception while parsing record", e); |
318 |
discardedCollection.insert(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record)); |
|
367 |
discardedCollection.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record));
|
|
319 | 368 |
} |
320 | 369 |
return false; |
321 | 370 |
} |
322 | 371 |
|
323 | 372 |
/** |
324 | 373 |
* Feed the record to the store. |
325 |
*
|
|
374 |
* |
|
326 | 375 |
* @return true if the record is new, false otherwise |
327 | 376 |
*/ |
328 |
private boolean feedRecord(final String record, final String source, final Date feedDate) { |
|
377 |
private boolean feedRecord(final String record, final String source, final Date feedDate, final MongoCollection<DBObject> unackCollection) {
|
|
329 | 378 |
PublisherRecordParser parser = new PublisherRecordParser(this.mongoFields); |
330 | 379 |
final Multimap<String, String> recordProperties = parser.parseRecord(record); |
331 | 380 |
String id = ""; |
... | ... | |
334 | 383 |
id = recordProperties.get(OAIConfigurationReader.ID_FIELD).iterator().next(); |
335 | 384 |
oaiID = getOAIIdentifier(id); |
336 | 385 |
if (isNewRecord(oaiID)) { |
337 |
feedNew(oaiID, record, recordProperties, feedDate); |
|
386 |
feedNew(oaiID, record, recordProperties, feedDate, unackCollection);
|
|
338 | 387 |
return true; |
339 | 388 |
} else { |
340 | 389 |
if (isChanged(oaiID, record)) { |
341 |
updateRecord(oaiID, record, recordProperties, feedDate); |
|
390 |
updateRecord(oaiID, record, recordProperties, feedDate, unackCollection);
|
|
342 | 391 |
} else { |
343 | 392 |
// it is not changed, I only have to update the last collection date |
344 |
handleRecord(oaiID, feedDate); |
|
393 |
handleRecord(oaiID, feedDate, unackCollection);
|
|
345 | 394 |
} |
346 | 395 |
} |
347 | 396 |
} else { |
348 | 397 |
log.error("parsed record seems invalid -- no identifier property with name: " + OAIConfigurationReader.ID_FIELD); |
349 |
discardedCollection.insert(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record).append( |
|
398 |
discardedCollection.insertOne(new BasicDBObject(OAIConfigurationReader.SET_FIELD, source).append(OAIConfigurationReader.BODY_FIELD, record).append(
|
|
350 | 399 |
OAIConfigurationReader.DATESTAMP_FIELD, feedDate)); |
351 | 400 |
} |
352 | 401 |
return false; |
353 | 402 |
} |
354 | 403 |
|
355 |
private DBObject createBasicObject(final String oaiID, final String record, final Multimap<String, String> recordProperties) { |
|
356 |
DBObject obj = new BasicDBObject(); |
|
404 |
private BasicDBObject createBasicObject(final String oaiID, final String record, final Multimap<String, String> recordProperties) {
|
|
405 |
BasicDBObject obj = new BasicDBObject();
|
|
357 | 406 |
for (final String key : recordProperties.keySet()) { |
358 | 407 |
if (key.equals(OAIConfigurationReader.ID_FIELD)) { |
359 | 408 |
obj.put(key, oaiID); |
... | ... | |
417 | 466 |
return new Binary(os.toByteArray()); |
418 | 467 |
} |
419 | 468 |
|
420 |
private void feedNew(final String oaiID, final String record, final Multimap<String, String> recordProperties, final Date feedDate) { |
|
469 |
private void feedNew(final String oaiID, |
|
470 |
final String record, |
|
471 |
final Multimap<String, String> recordProperties, |
|
472 |
final Date feedDate, |
|
473 |
final MongoCollection<DBObject> unackCollection) { |
|
421 | 474 |
log.debug("New record received. Assigned oai id: " + oaiID); |
422 | 475 |
DBObject obj = this.createBasicObject(oaiID, record, recordProperties); |
423 | 476 |
obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate); |
424 | 477 |
obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate); |
425 | 478 |
obj.put(OAIConfigurationReader.UPDATED_FIELD, false); |
426 |
collection.insert(obj, WriteConcern.UNACKNOWLEDGED);
|
|
479 |
unackCollection.insertOne(obj);
|
|
427 | 480 |
this.upsertSets(recordProperties.get(OAIConfigurationReader.SET_FIELD)); |
428 | 481 |
} |
429 | 482 |
|
430 |
private void updateRecord(final String oaiID, final String record, final Multimap<String, String> recordProperties, final Date feedDate) { |
|
483 |
private void updateRecord(final String oaiID, |
|
484 |
final String record, |
|
485 |
final Multimap<String, String> recordProperties, |
|
486 |
final Date feedDate, |
|
487 |
final MongoCollection<DBObject> unackCollection) { |
|
431 | 488 |
log.debug("updating record " + oaiID); |
432 |
DBObject obj = this.createBasicObject(oaiID, record, recordProperties); |
|
489 |
BasicDBObject obj = this.createBasicObject(oaiID, record, recordProperties);
|
|
433 | 490 |
obj.put(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, feedDate); |
434 | 491 |
obj.put(OAIConfigurationReader.DATESTAMP_FIELD, feedDate); |
435 | 492 |
obj.put(OAIConfigurationReader.UPDATED_FIELD, true); |
436 |
DBObject oldObj = new BasicDBObject(OAIConfigurationReader.ID_FIELD, oaiID);
|
|
437 |
collection.update(oldObj, obj, true, false);
|
|
493 |
Bson oldObj = Filters.eq(OAIConfigurationReader.ID_FIELD, oaiID);
|
|
494 |
unackCollection.updateOne(oldObj, obj, new UpdateOptions().upsert(true));
|
|
438 | 495 |
this.upsertSets(recordProperties.get(OAIConfigurationReader.SET_FIELD)); |
439 | 496 |
} |
440 | 497 |
|
... | ... | |
451 | 508 |
set.setEnabled(true); |
452 | 509 |
String query = "(" + OAIConfigurationReader.SET_FIELD + " = \"" + setSpec + "\") "; |
453 | 510 |
set.setQuery(query); |
454 |
this.mongoSetCollection.upsertSet(set, false, getCollection().getDB().getName());
|
|
511 |
this.mongoSetCollection.upsertSet(set, false, getCollection().getNamespace().getDatabaseName());
|
|
455 | 512 |
} |
456 | 513 |
} |
457 | 514 |
} |
458 | 515 |
} |
459 | 516 |
|
460 |
private void handleRecord(final String oaiID, final Date lastCollectionDate) { |
|
517 |
private void handleRecord(final String oaiID, final Date lastCollectionDate, final MongoCollection<DBObject> unackCollection) {
|
|
461 | 518 |
log.debug("handling unchanged record " + oaiID); |
462 |
DBObject oldObj = new BasicDBObject(OAIConfigurationReader.ID_FIELD, oaiID);
|
|
463 |
DBObject update = new BasicDBObject("$set", new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, lastCollectionDate)); |
|
464 |
collection.update(oldObj, update, true, false);
|
|
519 |
Bson oldObj = Filters.eq(OAIConfigurationReader.ID_FIELD, oaiID);
|
|
520 |
BasicDBObject update = new BasicDBObject("$set", new BasicDBObject(OAIConfigurationReader.LAST_COLLECTION_DATE_FIELD, lastCollectionDate));
|
|
521 |
unackCollection.updateOne(oldObj, update, new UpdateOptions().upsert(true));
|
|
465 | 522 |
} |
466 | 523 |
|
467 | 524 |
private boolean isNewRecord(final String oaiIdentifier) { |
468 | 525 |
if (alwaysNewRecord || (collection.count() == 0)) return true; |
469 |
return this.collection.findOne(new BasicDBObject(OAIConfigurationReader.ID_FIELD, oaiIdentifier)) == null;
|
|
526 |
return this.collection.find(Filters.eq(OAIConfigurationReader.ID_FIELD, oaiIdentifier)).first() == null;
|
|
470 | 527 |
} |
471 | 528 |
|
529 |
// ***********************************************************************************************// |
|
530 |
// Setters / Getters / Basic utilities |
|
531 |
// ***********************************************************************************************// |
|
532 |
|
|
472 | 533 |
private boolean isChanged(final String oaiID, final String record) { |
473 | 534 |
RecordInfo oldRecord = getRecord(oaiID); |
474 | 535 |
if (oldRecord == null) return StringUtils.isBlank(record); |
... | ... | |
479 | 540 |
return this.idScheme + ":" + this.idNamespace + ":" + id; |
480 | 541 |
} |
481 | 542 |
|
482 |
// ***********************************************************************************************// |
|
483 |
// Setters / Getters / Basic utilities |
|
484 |
// ***********************************************************************************************// |
|
485 |
|
|
486 | 543 |
@Override |
487 | 544 |
public int hashCode() { |
488 | 545 |
final int prime = 31; |
... | ... | |
519 | 576 |
return true; |
520 | 577 |
} |
521 | 578 |
|
522 |
public MongoPublisherStore() { |
|
523 |
super(); |
|
524 |
} |
|
525 |
|
|
526 |
public MongoPublisherStore(final String id, final String metadataFormat, final String interpretation, final String layout, final DBCollection collection, |
|
527 |
final List<PublisherField> mongoFields, final MongoQueryParser queryParser, final RecordInfoGenerator recordInfoGenerator, final String idScheme, |
|
528 |
final String idNamespace, final MetadataExtractor metadataExtractor, final RecordChangeDetector recordChangeDetector, final boolean alwaysNewRecord) { |
|
529 |
super(); |
|
530 |
this.id = id; |
|
531 |
this.metadataFormat = metadataFormat; |
|
532 |
this.interpretation = interpretation; |
|
533 |
this.layout = layout; |
|
534 |
this.collection = collection; |
|
535 |
this.discardedCollection = collection.getDB().getCollection("discarded-" + collection.getName()); |
|
536 |
this.mongoFields = mongoFields; |
|
537 |
this.queryParser = queryParser; |
|
538 |
this.recordInfoGenerator = recordInfoGenerator; |
|
539 |
this.idScheme = idScheme; |
|
540 |
this.idNamespace = idNamespace; |
|
541 |
this.recordChangeDetector = recordChangeDetector; |
|
542 |
this.alwaysNewRecord = alwaysNewRecord; |
|
543 |
} |
|
544 |
|
|
545 |
public void setId(final String id) { |
|
546 |
this.id = id; |
|
547 |
} |
|
548 |
|
|
549 |
public void setMetadataFormat(final String metadataFormat) { |
|
550 |
this.metadataFormat = metadataFormat; |
|
551 |
} |
|
552 |
|
|
553 |
public void setInterpretation(final String interpretation) { |
|
554 |
this.interpretation = interpretation; |
|
555 |
} |
|
556 |
|
|
557 |
public void setLayout(final String layout) { |
|
558 |
this.layout = layout; |
|
559 |
} |
|
560 |
|
|
561 |
public DBCollection getCollection() { |
|
579 |
public MongoCollection<DBObject> getCollection() { |
|
562 | 580 |
return collection; |
563 | 581 |
} |
564 | 582 |
|
565 |
public void setCollection(final DBCollection collection) {
|
|
583 |
public void setCollection(final MongoCollection<DBObject> collection) {
|
|
566 | 584 |
this.collection = collection; |
567 | 585 |
} |
568 | 586 |
|
... | ... | |
574 | 592 |
this.queryParser = queryParser; |
575 | 593 |
} |
576 | 594 |
|
577 |
public DBCollection getDiscardedCollection() {
|
|
595 |
public MongoCollection<DBObject> getDiscardedCollection() {
|
|
578 | 596 |
return discardedCollection; |
579 | 597 |
} |
580 | 598 |
|
581 |
public void setDiscardedCollection(final DBCollection discardedCollection) {
|
|
599 |
public void setDiscardedCollection(final MongoCollection<DBObject> discardedCollection) {
|
|
582 | 600 |
this.discardedCollection = discardedCollection; |
583 | 601 |
} |
584 | 602 |
|
... | ... | |
627 | 645 |
return this.id; |
628 | 646 |
} |
629 | 647 |
|
648 |
public void setId(final String id) { |
|
649 |
this.id = id; |
|
650 |
} |
|
651 |
|
|
630 | 652 |
@Override |
631 | 653 |
public String getMetadataFormat() { |
632 | 654 |
return this.metadataFormat; |
633 | 655 |
} |
634 | 656 |
|
657 |
public void setMetadataFormat(final String metadataFormat) { |
|
658 |
this.metadataFormat = metadataFormat; |
|
659 |
} |
|
660 |
|
|
635 | 661 |
@Override |
636 | 662 |
public String getInterpretation() { |
637 | 663 |
return this.interpretation; |
638 | 664 |
} |
639 | 665 |
|
666 |
public void setInterpretation(final String interpretation) { |
|
667 |
this.interpretation = interpretation; |
|
668 |
} |
|
669 |
|
|
640 | 670 |
@Override |
641 | 671 |
public String getLayout() { |
642 | 672 |
return this.layout; |
643 | 673 |
} |
644 | 674 |
|
675 |
public void setLayout(final String layout) { |
|
676 |
this.layout = layout; |
|
677 |
} |
|
678 |
|
|
645 | 679 |
public MongoSetCollection getMongoSetCollection() { |
646 | 680 |
return mongoSetCollection; |
647 | 681 |
} |
modules/dnet-oai-store-service/trunk/src/main/java/eu/dnetlib/data/oai/store/mongo/MongoPublisherStoreDAO.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.oai.store.mongo; |
2 | 2 |
|
3 | 3 |
import java.util.List; |
4 |
|
|
5 | 4 |
import javax.annotation.Resource; |
6 | 5 |
|
7 |
import org.apache.commons.logging.Log; |
|
8 |
import org.apache.commons.logging.LogFactory; |
|
9 |
import org.springframework.beans.factory.annotation.Autowired; |
|
10 |
import org.springframework.beans.factory.annotation.Required; |
|
11 |
|
|
6 |
import com.google.common.base.Function; |
|
7 |
import com.google.common.collect.Iterables; |
|
12 | 8 |
import com.google.common.collect.Lists; |
13 |
import com.mongodb.BasicDBObject; |
|
14 | 9 |
import com.mongodb.BasicDBObjectBuilder; |
15 |
import com.mongodb.DB; |
|
16 |
import com.mongodb.DBCollection; |
|
17 |
import com.mongodb.DBCursor; |
|
18 | 10 |
import com.mongodb.DBObject; |
19 |
import com.mongodb.Mongo; |
|
20 |
|
|
11 |
import com.mongodb.MongoClient; |
|
12 |
import com.mongodb.client.FindIterable; |
|
13 |
import com.mongodb.client.MongoCollection; |
|
14 |
import com.mongodb.client.MongoDatabase; |
|
15 |
import com.mongodb.client.model.Filters; |
|
21 | 16 |
import eu.dnetlib.data.information.oai.publisher.OaiPublisherException; |
22 | 17 |
import eu.dnetlib.data.information.oai.publisher.OaiPublisherRuntimeException; |
23 | 18 |
import eu.dnetlib.data.information.oai.publisher.conf.OAIConfigurationReader; |
... | ... | |
27 | 22 |
import eu.dnetlib.data.oai.store.parser.MongoQueryParser; |
28 | 23 |
import eu.dnetlib.data.oai.store.sets.MongoSetCollection; |
29 | 24 |
import eu.dnetlib.miscutils.cache.EhCache; |
25 |
import org.apache.commons.logging.Log; |
|
26 |
import org.apache.commons.logging.LogFactory; |
|
27 |
import org.springframework.beans.factory.annotation.Autowired; |
|
28 |
import org.springframework.beans.factory.annotation.Required; |
|
30 | 29 |
|
31 |
public class MongoPublisherStoreDAO implements PublisherStoreDAO<MongoPublisherStore, MongoCursor> { |
|
30 |
public class MongoPublisherStoreDAO implements PublisherStoreDAO<MongoPublisherStore, DNetOAIMongoCursor> {
|
|
32 | 31 |
|
33 | 32 |
private static final Log log = LogFactory.getLog(MongoPublisherStoreDAO.class); // NOPMD by marko on 11/24/08 5:02 PM |
34 | 33 |
|
35 | 34 |
@Autowired |
36 |
private Mongo publisherMongoServer;
|
|
35 |
private MongoClient publisherMongoClient;
|
|
37 | 36 |
|
38 | 37 |
/** Name of the collection with information about the OAI stores. **/ |
39 | 38 |
private String metadataCollection; |
... | ... | |
65 | 64 |
|
66 | 65 |
private boolean alwaysNewRecord; |
67 | 66 |
|
68 |
protected DB getDB(final String dbName) {
|
|
69 |
return this.publisherMongoServer.getDB(dbName);
|
|
67 |
protected MongoDatabase getDB(final String dbName) {
|
|
68 |
return this.publisherMongoClient.getDatabase(dbName);
|
|
70 | 69 |
} |
71 | 70 |
|
72 | 71 |
@Override |
73 | 72 |
public List<MongoPublisherStore> listPublisherStores(final String dbName) { |
74 |
List<MongoPublisherStore> stores = Lists.newArrayList(); |
|
75 |
DB db = getDB(dbName); |
|
76 |
DBCursor cursor = db.getCollection(this.metadataCollection).find(); |
|
77 |
for (DBObject storeInfo : cursor) { |
|
78 |
stores.add(this.createFromDBObject(storeInfo, db)); |
|
79 |
} |
|
80 |
return stores; |
|
73 |
final MongoDatabase db = getDB(dbName); |
|
74 |
final FindIterable<DBObject> stores = db.getCollection(this.metadataCollection, DBObject.class).find(); |
|
75 |
return Lists.newArrayList( |
|
76 |
Iterables.transform(stores, new Function<DBObject, MongoPublisherStore>() { |
|
77 |
@Override |
|
78 |
public MongoPublisherStore apply(final DBObject storeInfo) { |
|
79 |
return createFromDBObject(storeInfo, db); |
|
80 |
} |
|
81 |
}) |
|
82 |
); |
|
81 | 83 |
} |
82 | 84 |
|
83 | 85 |
@Override |
84 | 86 |
public MongoPublisherStore getStore(final String storeId, final String dbName) { |
85 |
DBObject storeInfo = getDB(dbName).getCollection(this.metadataCollection).findOne(new BasicDBObject("id", storeId));
|
|
87 |
DBObject storeInfo = getDB(dbName).getCollection(this.metadataCollection, DBObject.class).find(Filters.eq("id", storeId)).first();
|
|
86 | 88 |
return this.createFromDBObject(storeInfo, getDB(dbName)); |
87 | 89 |
} |
88 | 90 |
|
... | ... | |
105 | 107 |
@Override |
106 | 108 |
public MongoPublisherStore createStore(final String mdfName, final String mdfInterpretation, final String mdfLayout, final String dbName) |
107 | 109 |
throws OaiPublisherException { |
108 |
DB db = getDB(dbName);
|
|
110 |
MongoDatabase db = getDB(dbName);
|
|
109 | 111 |
DBObject store = createMetadataEntry(mdfName, mdfInterpretation, mdfLayout); |
110 |
DBCollection metadata = db.getCollection(this.metadataCollection);
|
|
111 |
metadata.insert(store); |
|
112 |
MongoCollection<DBObject> metadata = db.getCollection(this.metadataCollection, DBObject.class);
|
|
113 |
metadata.insertOne(store);
|
|
112 | 114 |
MongoPublisherStore theStore = this.createFromDBObject(store, db); |
113 | 115 |
return theStore; |
114 | 116 |
|
... | ... | |
116 | 118 |
|
117 | 119 |
@Override |
118 | 120 |
public boolean deleteStore(final String storeId, final String dbName) { |
119 |
DB db = getDB(dbName); |
|
120 |
DBCollection metadata = db.getCollection(this.metadataCollection); |
|
121 |
DBObject storeInfo = metadata.findOne(new BasicDBObject("id", storeId)); |
|
122 |
if (storeInfo == null) return false; |
|
121 |
|
|
122 |
MongoDatabase db = getDB(dbName); |
|
123 |
MongoCollection<DBObject> metadata = db.getCollection(this.metadataCollection, DBObject.class); |
|
124 |
final DBObject storeDeleted = metadata.findOneAndDelete(Filters.eq("id", storeId)); |
|
125 |
if (storeDeleted == null) return false; |
|
123 | 126 |
else { |
124 | 127 |
db.getCollection(storeId).drop(); |
125 |
metadata.remove(storeInfo); |
|
126 | 128 |
// TODO: should drop entries related to mdPrefix served by the store we are deleting, not all of them. |
127 | 129 |
this.mongoSetCollection.dropOAISets(dbName); |
128 | 130 |
log.debug("Deleted oaistore " + storeId + ", db: " + dbName); |
... | ... | |
132 | 134 |
|
133 | 135 |
@Override |
134 | 136 |
public boolean deleteFromStore(final String storeId, final String dbName, final String set) { |
135 |
DB db = getDB(dbName);
|
|
136 |
DBCollection metadata = db.getCollection(this.metadataCollection);
|
|
137 |
DBObject storeInfo = metadata.findOne(new BasicDBObject("id", storeId));
|
|
137 |
MongoDatabase db = getDB(dbName);
|
|
138 |
MongoCollection<DBObject> metadata = db.getCollection(this.metadataCollection, DBObject.class);
|
|
139 |
DBObject storeInfo = metadata.find(Filters.eq("id", storeId)).first();
|
|
138 | 140 |
if (storeInfo == null) return false; |
139 | 141 |
else { |
140 |
db.getCollection(storeId).remove(new BasicDBObject(OAIConfigurationReader.SET_FIELD, set));
|
|
142 |
db.getCollection(storeId).deleteOne(Filters.eq(OAIConfigurationReader.SET_FIELD, set));
|
|
141 | 143 |
this.mongoSetCollection.dropSet(dbName, set); |
142 | 144 |
log.debug("Deleted set " + set + " from oaistore " + storeId + ", db: " + dbName); |
143 | 145 |
return true; |
... | ... | |
174 | 176 |
|
175 | 177 |
} |
176 | 178 |
|
177 |
private MongoPublisherStore createFromDBObject(final DBObject storeInfo, final DB db) {
|
|
179 |
private MongoPublisherStore createFromDBObject(final DBObject storeInfo, final MongoDatabase db) {
|
|
178 | 180 |
if (storeInfo == null) return null; |
179 | 181 |
String storeId = (String) storeInfo.get("id"); |
180 | 182 |
String mdFormat = (String) storeInfo.get("metadataFormat"); |
... | ... | |
189 | 191 |
} else { |
190 | 192 |
log.debug("Store retreived, cache miss, alwaysNewRecord is" + alwaysNewRecord); |
191 | 193 |
log.fatal("Not using cache to create oaistore from dbObject: " + k); |
192 |
MongoPublisherStore store = new MongoPublisherStore(storeId, mdFormat, mdInterpreation, mdLayout, db.getCollection(storeId), |
|
194 |
MongoPublisherStore store = new MongoPublisherStore(storeId, mdFormat, mdInterpreation, mdLayout, db.getCollection(storeId, DBObject.class),
|
|
193 | 195 |
this.configuration.getFields(mdFormat, mdInterpreation, mdLayout), queryParser, recordInfoGenerator, this.configuration.getIdScheme(), |
194 |
this.configuration.getIdNamespace(), this.metadataExtractor, this.recordChangeDetector, alwaysNewRecord); |
|
196 |
this.configuration.getIdNamespace(), this.metadataExtractor, this.recordChangeDetector, alwaysNewRecord, db);
|
|
195 | 197 |
store.setMongoSetCollection(mongoSetCollection); |
196 | 198 |
mongoOaistoreCache.put(k, store); |
197 | 199 |
return store; |
... | ... | |
266 | 268 |
this.mongoSetCollection = mongoSetCollection; |
267 | 269 |
} |
268 | 270 |
|
269 |
public Mongo getPublisherMongoServer() { |
|
270 |
return publisherMongoServer; |
|
271 |
} |
|
272 |
|
|
273 |
public void setPublisherMongoServer(final Mongo publisherMongoServer) { |
|
274 |
this.publisherMongoServer = publisherMongoServer; |
|
275 |
} |
|
276 |
|
|
277 | 271 |
public boolean isAlwaysNewRecord() { |
278 | 272 |
return alwaysNewRecord; |
279 | 273 |
} |
modules/dnet-oai-store-service/trunk/src/main/java/eu/dnetlib/data/oai/store/parser/MongoQueryParser.java | ||
---|---|---|
6 | 6 |
import com.google.common.collect.Lists; |
7 | 7 |
import com.mongodb.BasicDBObject; |
8 | 8 |
import com.mongodb.BasicDBObjectBuilder; |
9 |
import com.mongodb.DBObject; |
|
10 | 9 |
import eu.dnetlib.data.information.oai.publisher.OaiPublisherRuntimeException; |
11 | 10 |
import eu.dnetlib.data.information.oai.publisher.conf.OAIConfigurationReader; |
12 | 11 |
import eu.dnetlib.functionality.index.parse.Relation; |
... | ... | |
14 | 13 |
import org.apache.commons.lang.StringUtils; |
15 | 14 |
import org.apache.commons.logging.Log; |
16 | 15 |
import org.apache.commons.logging.LogFactory; |
16 |
import org.bson.conversions.Bson; |
|
17 | 17 |
import org.bson.types.ObjectId; |
18 | 18 |
import org.joda.time.DateTime; |
19 | 19 |
import org.joda.time.format.DateTimeFormat; |
... | ... | |
38 | 38 |
* String to parse |
39 | 39 |
* @return DBObject corresponding to the query string |
40 | 40 |
*/ |
41 |
public DBObject parse(final String query) {
|
|
41 |
public Bson parse(final String query) {
|
|
42 | 42 |
log.debug("PARSING: " + query); |
43 | 43 |
if (StringUtils.isBlank(query)) return new BasicDBObject(); |
44 |
DBObject parsed = toMongo(query);
|
|
44 |
Bson parsed = toMongo(query);
|
|
45 | 45 |
log.debug(parsed); |
46 | 46 |
return parsed; |
47 | 47 |
} |
48 | 48 |
|
49 |
private DBObject toMongo(final String query) {
|
|
49 |
private Bson toMongo(final String query) {
|
|
50 | 50 |
CQLParser parser = new CQLParser(); |
51 | 51 |
CQLNode root; |
52 | 52 |
try { |
... | ... | |
59 | 59 |
} |
60 | 60 |
} |
61 | 61 |
|
62 |
private DBObject toMongo(final CQLNode node) {
|
|
62 |
private Bson toMongo(final CQLNode node) {
|
|
63 | 63 |
if (node instanceof CQLTermNode) return doTranslate((CQLTermNode) node); |
64 | 64 |
if (node instanceof CQLBooleanNode) return doTranslate((CQLBooleanNode) node); |
65 | 65 |
|
66 | 66 |
throw new RuntimeException("error choice for CQLNode " + node.getClass()); |
67 | 67 |
} |
68 | 68 |
|
69 |
private DBObject doTranslate(final CQLTermNode termNode) {
|
|
69 |
private Bson doTranslate(final CQLTermNode termNode) {
|
|
70 | 70 |
if (termNode.getTerm().equals("*")) return new BasicDBObject(); |
71 | 71 |
String relation = termNode.getRelation().getBase(); |
72 | 72 |
Relation rel = Relations.get(relation); |
73 | 73 |
return this.handleRelationNode(rel, termNode); |
74 | 74 |
} |
75 | 75 |
|
76 |
private DBObject handleRelationNode(final Relation rel, final CQLTermNode termNode) {
|
|
77 |
DBObject mongoQueryObject = new BasicDBObject(); |
|
76 |
private Bson handleRelationNode(final Relation rel, final CQLTermNode termNode) {
|
|
77 |
BasicDBObject mongoQueryObject = new BasicDBObject();
|
|
78 | 78 |
String term = termNode.getTerm(); |
79 | 79 |
String indexName = termNode.getIndex(); |
80 | 80 |
Object termObj = term; |
... | ... | |
131 | 131 |
* @param date |
132 | 132 |
* @return |
133 | 133 |
*/ |
134 |
private DBObject handleDateRelationNode(final String indexName, final Relation rel, final OAIDate date) {
|
|
135 |
DBObject mongoQueryObject = new BasicDBObject(); |
|
134 |
private Bson handleDateRelationNode(final String indexName, final Relation rel, final OAIDate date) {
|
|
135 |
BasicDBObject mongoQueryObject = new BasicDBObject();
|
|
136 | 136 |
DateTime fromDate = date.date; |
137 | 137 |
switch (rel) { |
138 | 138 |
case EQUAL: |
... | ... | |
176 | 176 |
return mongoQueryObject; |
177 | 177 |
} |
178 | 178 |
|
179 |
private DBObject doTranslate(final CQLBooleanNode node) {
|
|
179 |
private Bson doTranslate(final CQLBooleanNode node) {
|
|
180 | 180 |
if (node instanceof CQLAndNode) return getBooleanQuery("$and", node); |
181 | 181 |
if (node instanceof CQLOrNode) return getBooleanQuery("$or", node); |
182 | 182 |
if (node instanceof CQLNotNode) return getNotQuery((CQLNotNode) node); |
183 | 183 |
throw new RuntimeException("error choice for CQLBooleanNode " + node.getClass()); |
184 | 184 |
} |
185 | 185 |
|
186 |
private DBObject getBooleanQuery(final String mongoOperator, final CQLBooleanNode node) {
|
|
187 |
DBObject left = this.toMongo(node.left);
|
|
188 |
DBObject right = this.toMongo(node.right);
|
|
189 |
BasicDBObject andQuery = new BasicDBObject();
|
|
190 |
List<DBObject> termList = Lists.newArrayList(left, right);
|
|
191 |
andQuery.put(mongoOperator, termList);
|
|
192 |
return andQuery;
|
|
186 |
private Bson getBooleanQuery(final String mongoOperator, final CQLBooleanNode node) {
|
|
187 |
Bson left = this.toMongo(node.left);
|
|
188 |
Bson right = this.toMongo(node.right);
|
|
189 |
BasicDBObject opQuery = new BasicDBObject();
|
|
190 |
List<Bson> termList = Lists.newArrayList(left, right);
|
|
191 |
opQuery.put(mongoOperator, termList);
|
|
192 |
return opQuery;
|
|
193 | 193 |
} |
194 | 194 |
|
195 |
private DBObject getNotQuery(final CQLNotNode node) {
|
|
196 |
DBObject left = this.toMongo(node.left);
|
|
197 |
DBObject right = this.toMongo(node.right);
|
|
198 |
DBObject notRight = new BasicDBObject("$not", right);
|
|
195 |
private Bson getNotQuery(final CQLNotNode node) {
|
|
196 |
Bson left = this.toMongo(node.left);
|
|
197 |
Bson right = this.toMongo(node.right);
|
|
198 |
Bson notRight = new BasicDBObject("$not", right);
|
|
199 | 199 |
BasicDBObject andQuery = new BasicDBObject(); |
200 |
List<DBObject> termList = Lists.newArrayList(left, notRight);
|
|
200 |
List<Bson> termList = Lists.newArrayList(left, notRight);
|
|
201 | 201 |
andQuery.put("$and", termList); |
202 | 202 |
return andQuery; |
203 | 203 |
} |
modules/dnet-oai-store-service/trunk/src/main/java/eu/dnetlib/data/oai/store/sets/MongoSetCollection.java | ||
---|---|---|
3 | 3 |
import java.text.Normalizer; |
4 | 4 |
import java.util.List; |
5 | 5 |
|
6 |
import com.google.common.base.Function; |
|
7 |
import com.google.common.collect.Iterables; |
|
6 | 8 |
import com.google.common.collect.Lists; |
7 |
import com.mongodb.*; |
|
9 |
import com.mongodb.BasicDBObject; |
|
10 |
import com.mongodb.BasicDBObjectBuilder; |
|
11 |
import com.mongodb.DBObject; |
|
12 |
import com.mongodb.MongoClient; |
|
13 |
import com.mongodb.client.FindIterable; |
|
14 |
import com.mongodb.client.MongoCollection; |
|
15 |
import com.mongodb.client.model.Filters; |
|
16 |
import com.mongodb.client.model.FindOneAndReplaceOptions; |
|
17 |
import com.mongodb.client.model.FindOneAndUpdateOptions; |
|
18 |
import com.mongodb.client.model.IndexOptions; |
|
8 | 19 |
import eu.dnetlib.data.information.oai.publisher.info.SetInfo; |
9 | 20 |
import eu.dnetlib.data.information.oai.sets.SetCollection; |
10 | 21 |
import org.apache.commons.lang.StringEscapeUtils; |
11 | 22 |
import org.apache.commons.lang.StringUtils; |
23 |
import org.bson.conversions.Bson; |
|
12 | 24 |
import org.springframework.beans.factory.annotation.Autowired; |
13 | 25 |
|
14 | 26 |
public class MongoSetCollection implements SetCollection { |
15 | 27 |
|
16 | 28 |
public static String DEFAULT_SET = "OTHER"; |
29 |
|
|
17 | 30 |
@Autowired |
18 |
private Mongo publisherMongoServer;
|
|
31 |
private MongoClient publisherMongoClient;
|
|
19 | 32 |
private String setCollection = "sets"; |
20 | 33 |
private String setCountCollection = "setsCount"; |
21 | 34 |
|
... | ... | |
26 | 39 |
|
27 | 40 |
@Override |
28 | 41 |
public List<SetInfo> getAllSets(final boolean enabledOnly, final String dbName) { |
29 |
DBCursor cursor = null;
|
|
42 |
FindIterable<DBObject> iter = null;
|
|
30 | 43 |
if (!enabledOnly) { |
31 |
cursor = this.getSetsCollection(dbName).find();
|
|
44 |
iter = this.getSetsCollection(dbName).find();
|
|
32 | 45 |
} else { |
33 |
DBObject where = new BasicDBObject("enabled", true);
|
|
34 |
cursor = this.getSetsCollection(dbName).find(where);
|
|
46 |
Bson where = Filters.eq("enabled", true);
|
|
47 |
iter = this.getSetsCollection(dbName).find(where);
|
|
35 | 48 |
} |
36 |
List<SetInfo> res = Lists.newArrayList(); |
|
37 |
while (cursor.hasNext()) { |
|
38 |
DBObject obj = cursor.next(); |
|
39 |
res.add(this.getSetFromDBObject(obj)); |
|
40 |
} |
|
41 |
return res; |
|
49 |
return Lists.newArrayList(Iterables.transform(iter, new Function<DBObject, SetInfo>() { |
|
50 |
|
|
51 |
@Override |
|
52 |
public SetInfo apply(final DBObject dbObject) { |
|
53 |
return getSetFromDBObject(dbObject); |
|
54 |
} |
|
55 |
})); |
|
42 | 56 |
} |
43 | 57 |
|
44 | 58 |
@Override |
45 | 59 |
public boolean containSet(final String set, final String dbName) { |
46 |
DBObject query = new BasicDBObject("spec", set);
|
|
47 |
return this.getSetsCollection(dbName).find(query).limit(1).size() != 0;
|
|
60 |
Bson query = Filters.eq("spec", set);
|
|
61 |
return this.getSetsCollection(dbName).count(query) != 0;
|
|
48 | 62 |
} |
49 | 63 |
|
50 | 64 |
@Override |
51 | 65 |
public boolean containEnabledSet(final String set, final String publisherDBName) { |
52 |
DBObject query = new BasicDBObject("spec", set).append("enabled", true);
|
|
53 |
return this.getSetsCollection(publisherDBName).find(query).limit(1).size() != 0;
|
|
66 |
Bson query = Filters.and(Filters.eq("spec", set), Filters.eq("enabled", true));
|
|
67 |
return this.getSetsCollection(publisherDBName).count(query) != 0;
|
|
54 | 68 |
} |
55 | 69 |
|
56 | 70 |
@Override |
57 | 71 |
public String getSetQuery(final String set, final String dbName) { |
58 |
DBObject query = new BasicDBObject("spec", set);
|
|
59 |
DBObject returnField = new BasicDBObject("query", 1); |
|
60 |
DBObject obj = this.getSetsCollection(dbName).findOne(query, returnField);
|
|
72 |
Bson query = Filters.eq("spec", set);
|
|
73 |
BasicDBObject returnField = new BasicDBObject("query", 1);
|
|
74 |
DBObject obj = this.getSetsCollection(dbName).find(query).projection(returnField).first();
|
|
61 | 75 |
return (String) obj.get("query"); |
62 | 76 |
} |
63 | 77 |
|
64 | 78 |
@Override |
65 | 79 |
public int count(final String setSpec, final String mdPrefix, final String dbName) { |
66 |
DBObject query = BasicDBObjectBuilder.start("spec", setSpec).add("mdPrefix", mdPrefix).get();
|
|
67 |
DBObject returnField = new BasicDBObject("count", 1); |
|
68 |
DBObject obj = this.getSetsCountCollection(dbName).findOne(query, returnField);
|
|
80 |
Bson query = Filters.and(Filters.eq("spec", setSpec), Filters.eq("mdPrefix", mdPrefix));
|
|
81 |
BasicDBObject returnField = new BasicDBObject("count", 1);
|
|
82 |
DBObject obj = this.getSetsCountCollection(dbName).find(query).projection(returnField).first();
|
|
69 | 83 |
if (obj == null) return 0; |
70 | 84 |
return (Integer) obj.get("count"); |
71 | 85 |
} |
72 | 86 |
|
73 | 87 |
public void updateCounts(final String setSpec, final String mdPrefix, final int count, final String dbName) { |
74 |
DBObject setCount = BasicDBObjectBuilder.start("spec", setSpec).add("mdPrefix", mdPrefix).get(); |
|
75 |
this.getSetsCountCollection(dbName).update(setCount, new BasicDBObject("$set", new BasicDBObject("count", count)), true, false); |
|
88 |
BasicDBObject countUpdate = new BasicDBObject("$set", new BasicDBObject("count", count)); |
|
89 |
Bson query = Filters.and(Filters.eq("spec", setSpec), Filters.eq("mdPrefix", mdPrefix)); |
|
90 |
//DBObject setCount = BasicDBObjectBuilder.start("spec", setSpec).add("mdPrefix", mdPrefix).get(); |
|
91 |
//this.getSetsCountCollection(dbName).update(setCount, new BasicDBObject("$set", new BasicDBObject("count", count)), true, false); |
|
92 |
this.getSetsCollection(dbName).findOneAndUpdate(query, countUpdate, new FindOneAndUpdateOptions().upsert(true)); |
|
76 | 93 |
} |
77 | 94 |
|
78 | 95 |
public void upsertSet(final SetInfo setInfo, final boolean fromConfiguration, final String dbName) { |
79 | 96 |
DBObject obj = this.getObjectFromSet(setInfo); |
80 | 97 |
obj.put("fromConfiguration", fromConfiguration); |
81 |
this.getSetsCollection(dbName).update(new BasicDBObject("spec", setInfo.getSetSpec()), obj, true, false); |
|
98 |
//this.getSetsCollection(dbName).update(new BasicDBObject("spec", setInfo.getSetSpec()), obj, true, false); |
|
99 |
this.getSetsCollection(dbName).findOneAndReplace(Filters.eq("spec", setInfo.getSetSpec()), obj, new FindOneAndReplaceOptions().upsert(true)); |
|
82 | 100 |
} |
83 | 101 |
|
84 | 102 |
public String normalizeSetSpec(final String setName) { |
... | ... | |
102 | 120 |
} |
103 | 121 |
|
104 | 122 |
public List<SetInfo> getConfiguredSets(final String dbName) { |
105 |
DBObject query = new BasicDBObject("fromConfiguration", true);
|
|
123 |
Bson query = Filters.eq("fromConfiguration", true);
|
|
106 | 124 |
return this.findSets(query, dbName); |
107 | 125 |
} |
108 | 126 |
|
109 | 127 |
public List<SetInfo> getSetsFromData(final String dbName) { |
110 |
DBObject query = new BasicDBObject("fromConfiguration", false);
|
|
128 |
Bson query = Filters.eq("fromConfiguration", false);
|
|
111 | 129 |
return this.findSets(query, dbName); |
112 | 130 |
} |
113 | 131 |
|
... | ... | |
117 | 135 |
} |
118 | 136 |
|
119 | 137 |
public void dropSet(final String dbName, final String setSpec) { |
120 |
DBObject query = BasicDBObjectBuilder.start("spec", setSpec).get();
|
|
121 |
this.getSetsCollection(dbName).remove(query);
|
|
122 |
this.getSetsCountCollection(dbName).remove(query);
|
|
138 |
Bson query = Filters.eq("spec", setSpec);
|
|
139 |
this.getSetsCollection(dbName).deleteMany(query);
|
|
140 |
this.getSetsCountCollection(dbName).deleteMany(query);
|
|
123 | 141 |
} |
124 | 142 |
|
125 | 143 |
public void dropConfigurationSets(final String dbName) { |
126 |
this.getSetsCollection(dbName).remove(new BasicDBObject("fromConfiguration", true));
|
|
144 |
this.getSetsCollection(dbName).deleteMany(Filters.eq("fromConfiguration", true));
|
|
127 | 145 |
} |
128 | 146 |
|
129 |
protected List<SetInfo> findSets(final DBObject query, final String dbName) {
|
|
130 |
DBCursor cursor = this.getSetsCollection(dbName).find(query);
|
|
147 |
protected List<SetInfo> findSets(final Bson query, final String dbName) {
|
|
148 |
final FindIterable<DBObject> sets = this.getSetsCollection(dbName).find(query);
|
|
131 | 149 |
List<SetInfo> res = Lists.newArrayList(); |
132 |
while (cursor.hasNext()) { |
|
133 |
DBObject obj = cursor.next(); |
|
150 |
for (DBObject obj : sets) { |
|
134 | 151 |
res.add(this.getSetFromDBObject(obj)); |
135 | 152 |
} |
136 | 153 |
return res; |
... | ... | |
153 | 170 |
} |
154 | 171 |
|
155 | 172 |
private void ensureIndexesOnSets(final String dbName) { |
156 |
DBObject onBackground = new BasicDBObject("background", true); |
|
157 |
this.getSetsCollection(dbName).createIndex(new BasicDBObject("spec", 1), onBackground); |
|
158 |
this.getSetsCollection(dbName).createIndex(new BasicDBObject("fromConfiguration", 1), onBackground); |
|
173 |
this.getSetsCollection(dbName).createIndex(new BasicDBObject("spec", 1), new IndexOptions().background(true)); |
|
174 |
this.getSetsCollection(dbName).createIndex(new BasicDBObject("fromConfiguration", 1), new IndexOptions().background(true)); |
|
159 | 175 |
} |
160 | 176 |
|
161 | 177 |
private void ensureIndexesOnCount(final String dbName) { |
162 |
DBObject index = BasicDBObjectBuilder.start("spec", 1).add("mdPrefix", 1).get();
|
|
163 |
this.getSetsCountCollection(dbName).createIndex(index, new BasicDBObject("background", true));
|
|
178 |
BasicDBObject index = (BasicDBObject) BasicDBObjectBuilder.start("spec", 1).add("mdPrefix", 1).get();
|
|
179 |
this.getSetsCountCollection(dbName).createIndex(index, new IndexOptions().background(true));
|
|
164 | 180 |
} |
165 | 181 |
|
166 |
public DBCollection getSetsCollection(final String dbName) {
|
|
182 |
public MongoCollection<DBObject> getSetsCollection(final String dbName) {
|
|
167 | 183 |
return this.getCollection(this.setCollection, dbName); |
168 | 184 |
} |
169 | 185 |
|
170 |
public DBCollection getSetsCountCollection(final String dbName) {
|
|
186 |
public MongoCollection<DBObject> getSetsCountCollection(final String dbName) {
|
|
171 | 187 |
return this.getCollection(this.setCountCollection, dbName); |
172 | 188 |
} |
173 | 189 |
|
174 |
private DBCollection getCollection(final String collectionName, final String dbName) {
|
|
175 |
return publisherMongoServer.getDB(dbName).getCollection(collectionName);
|
|
190 |
private MongoCollection<DBObject> getCollection(final String collectionName, final String dbName) {
|
|
191 |
return publisherMongoClient.getDatabase(dbName).getCollection(collectionName, DBObject.class);
|
|
176 | 192 |
} |
Also available in: Unified diff
Using new mongo API included in mongo-java-driver 3.2.2 instead of deprecated API