Revision 46417
Added by Claudio Atzori about 7 years ago
MDStoreTransactionManagerImpl.java | ||
---|---|---|
3 | 3 |
import java.time.Duration; |
4 | 4 |
import java.time.LocalDate; |
5 | 5 |
import java.time.ZoneId; |
6 |
import java.util.Collections; |
|
7 | 6 |
import java.util.Date; |
8 | 7 |
import java.util.List; |
8 |
import java.util.Optional; |
|
9 |
import java.util.TreeSet; |
|
9 | 10 |
import java.util.stream.Collectors; |
11 |
import javax.annotation.PostConstruct; |
|
10 | 12 |
|
11 | 13 |
import com.google.common.collect.Lists; |
12 |
import com.mongodb.*; |
|
13 |
import com.mongodb.client.FindIterable; |
|
14 |
import com.google.common.collect.Sets; |
|
15 |
import com.mongodb.BasicDBObject; |
|
16 |
import com.mongodb.WriteConcern; |
|
14 | 17 |
import com.mongodb.client.MongoCollection; |
15 | 18 |
import com.mongodb.client.MongoDatabase; |
16 |
import com.mongodb.client.MongoIterable; |
|
17 | 19 |
import com.mongodb.client.model.Filters; |
18 | 20 |
import com.mongodb.client.model.UpdateOptions; |
19 | 21 |
import eu.dnetlib.miscutils.streams.DnetStreamSupport; |
20 | 22 |
import eu.dnetlib.msro.workers.aggregation.mdstore.*; |
21 |
import eu.dnetlib.msro.workers.aggregation.mdstore.model.MDStoreExpiredInfo; |
|
22 |
import eu.dnetlib.msro.workers.aggregation.mdstore.model.MDStoreManagerInfo; |
|
23 |
import eu.dnetlib.msro.workers.aggregation.mdstore.model.MDStoreTransactionInfo; |
|
23 |
import eu.dnetlib.msro.workers.aggregation.mdstore.model.*; |
|
24 | 24 |
import org.apache.commons.lang3.StringUtils; |
25 | 25 |
import org.apache.commons.logging.Log; |
26 | 26 |
import org.apache.commons.logging.LogFactory; |
27 |
import org.bson.conversions.Bson; |
|
28 | 27 |
import org.springframework.beans.factory.annotation.Autowired; |
29 | 28 |
import org.springframework.stereotype.Component; |
30 | 29 |
|
30 |
import static eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreConstants.*; |
|
31 |
|
|
31 | 32 |
/** |
32 | 33 |
* Created by claudio on 13/03/2017. |
33 | 34 |
*/ |
... | ... | |
45 | 46 |
private int maxTransactions = 1; |
46 | 47 |
|
47 | 48 |
/** |
49 |
* The expired days. |
|
50 |
*/ |
|
51 |
private int expiredDays = 3; |
|
52 |
|
|
53 |
/** |
|
48 | 54 |
* The db. |
49 | 55 |
*/ |
50 | 56 |
@Autowired |
... | ... | |
53 | 59 |
/** |
54 | 60 |
* The manager table. |
55 | 61 |
*/ |
56 |
private MongoCollection<DBObject> managerTable;
|
|
62 |
private MongoCollection<MDStoreManagerInfo> managerTable;
|
|
57 | 63 |
|
58 | 64 |
/** |
59 |
* The expired days. |
|
60 |
*/ |
|
61 |
private int expiredDays; |
|
62 |
|
|
63 |
/** |
|
64 | 65 |
* Bootstrap manager. |
65 | 66 |
*/ |
67 |
@PostConstruct |
|
66 | 68 |
private void bootstrapManager() { |
67 | 69 |
log.debug("Bootstrap Manager start"); |
68 |
final MongoCollection<DBObject> metadataColl = db.getCollection(MDStoreConstants.METADATA_NAME, DBObject.class); |
|
69 |
final FindIterable<DBObject> values = metadataColl.find(); |
|
70 |
this.setManagerTable(db.getCollection(MDStoreConstants.METADATA_MANAGER_TABLE, DBObject.class)); |
|
71 |
for (DBObject object : values) { |
|
72 |
final String id = (String) object.get(MDStoreConstants.MDID); |
|
73 |
String newId = id; |
|
74 |
if (id.contains("_")) { |
|
75 |
newId = StringUtils.substringBefore(id, "_"); |
|
76 |
} |
|
77 |
final DBObject input = BasicDBObjectBuilder.start() |
|
78 |
.add(MDStoreConstants.MDID, id) |
|
79 |
.add(MDStoreConstants.CURRENT_ID, newId) |
|
80 |
.add(MDStoreConstants.EXPIRING, new String[] {}) |
|
81 |
.add(MDStoreConstants.TRANSACTIONS, new String[] {}) |
|
82 |
.get(); |
|
83 |
getManagerTable().insertOne(input); |
|
84 |
log.debug(String.format("Added %s to Metadata Manager data structure", id)); |
|
70 |
final MongoCollection<MDStoreDescription> metadataColl = db.getCollection(METADATA_NAME, MDStoreDescription.class); |
|
71 |
managerTable = db.getCollection(METADATA_MANAGER_TABLE, MDStoreManagerInfo.class); |
|
72 |
metadataColl.find().iterator().forEachRemaining(desc -> { |
|
73 |
getManagerTable().insertOne(asManagerInfo(desc.getMdId())); |
|
74 |
log.debug(String.format("Added %s to Metadata Manager data structure", desc.getMdId())); |
|
75 |
}); |
|
85 | 76 |
|
86 |
} |
|
87 |
final BasicDBObject ensureIndex = new BasicDBObject(); |
|
88 |
ensureIndex.put(MDStoreConstants.MDID, 1); |
|
89 |
log.debug("Create index in MetadaManager "); |
|
90 |
this.getManagerTable().createIndex(ensureIndex); |
|
77 |
log.debug("Create index in MetadaManager"); |
|
78 |
getManagerTable().createIndex(new BasicDBObject(MDStoreConstants.MDID, 1)); |
|
91 | 79 |
} |
92 | 80 |
|
93 |
/** |
|
94 |
* Gets the DBObject describing an mdstore. null if there is no mdstore with the given id. |
|
95 |
* |
|
96 |
* @param mdstoreID |
|
97 |
* @return DBObject or null |
|
98 |
*/ |
|
99 |
private DBObject getMDStoreAsDBObject(String mdstoreID) { |
|
100 |
final BasicDBObject query = new BasicDBObject(); |
|
101 |
query.put(MDStoreConstants.MDID, mdstoreID); |
|
102 |
return getManagerTable().find(query).first(); |
|
81 |
private MDStoreManagerInfo asManagerInfo(final String mdId) { |
|
82 |
return MDStoreManagerInfo.create() |
|
83 |
.setMdId(mdId) |
|
84 |
.setCurrentId(createInternalId(mdId)); |
|
103 | 85 |
} |
104 | 86 |
|
105 | 87 |
/** |
... | ... | |
107 | 89 |
*/ |
108 | 90 |
@Override |
109 | 91 |
public void verifyConsistency() throws MDStoreException { |
110 |
if (this.getManagerTable() == null) {
|
|
92 |
if (getManagerTable() == null) { |
|
111 | 93 |
final int size = DnetStreamSupport.generateStreamFromIterator( |
112 | 94 |
db.listCollectionNames().iterator()) |
113 |
.filter(it -> it.contains(MDStoreConstants.METADATA_MANAGER_TABLE))
|
|
95 |
.filter(it -> it.contains(METADATA_MANAGER_TABLE)) |
|
114 | 96 |
.collect(Collectors.toList()) |
115 | 97 |
.size(); |
116 |
|
|
117 |
if (size == 0) |
|
98 |
if (size == 0) { |
|
118 | 99 |
bootstrapManager(); |
119 |
else { |
|
120 |
this.setManagerTable(db.getCollection(MDStoreConstants.METADATA_MANAGER_TABLE, DBObject.class));
|
|
100 |
} else {
|
|
101 |
managerTable = db.getCollection(METADATA_MANAGER_TABLE, MDStoreManagerInfo.class);
|
|
121 | 102 |
} |
122 | 103 |
} |
123 |
if (this.getManagerTable() == null) throw new MDStoreException("Something bad happen, unable to create managerTable"); |
|
104 |
if (getManagerTable() == null) { |
|
105 |
throw new MDStoreException("Something bad happen, unable to create managerTable"); |
|
106 |
} |
|
124 | 107 |
} |
125 | 108 |
|
126 | 109 |
/** |
... | ... | |
129 | 112 |
* @see MDStoreTransactionManager#createMDStore(String) |
130 | 113 |
*/ |
131 | 114 |
@Override |
132 |
public void createMDStore(final String mdId) throws MDStoreException {
|
|
115 |
public String createMDStore(final String mdId) throws MDStoreException {
|
|
133 | 116 |
log.debug("Creating new mdstore"); |
134 | 117 |
verifyConsistency(); |
118 |
final MDStoreManagerInfo managerInfo = asManagerInfo(mdId); |
|
119 |
getManagerTable().insertOne(managerInfo); |
|
120 |
db.createCollection(managerInfo.getCurrentId()); |
|
121 |
return managerInfo.getCurrentId(); |
|
122 |
} |
|
123 |
|
|
124 |
private String createInternalId(final String mdId) { |
|
135 | 125 |
String newId = mdId; |
136 | 126 |
if (mdId.contains("_")) { |
137 | 127 |
newId = StringUtils.substringBefore(mdId, "_"); |
138 | 128 |
} |
139 |
final BasicDBObject instance = new BasicDBObject(); |
|
140 |
instance.put(MDStoreConstants.MDID, mdId); |
|
141 |
instance.put(MDStoreConstants.CURRENT_ID, newId); |
|
142 |
instance.put(MDStoreConstants.EXPIRING, new String[] {}); |
|
143 |
getManagerTable().insertOne(instance); |
|
129 |
return newId; |
|
144 | 130 |
} |
145 | 131 |
|
146 | 132 |
/** |
... | ... | |
151 | 137 |
@Override |
152 | 138 |
public void dropMDStore(final String mdId) throws MDStoreException { |
153 | 139 |
verifyConsistency(); |
154 |
log.debug("Droping MDStore: " + mdId); |
|
155 |
final BasicDBObject query = new BasicDBObject(); |
|
156 |
query.put(MDStoreConstants.MDID, mdId); |
|
157 |
final DBObject dropped = this.getManagerTable().findOneAndDelete(query); |
|
140 |
log.debug("Dropping MDStore: " + mdId); |
|
141 |
|
|
142 |
final MDStoreManagerInfo dropped = getManagerTable().findOneAndDelete(Filters.eq(MDID, mdId)); |
|
158 | 143 |
garbage(); |
159 |
final String collectionName = (String) dropped.get(MDStoreConstants.CURRENT_ID); |
|
160 |
this.db.getCollection(collectionName).drop(); |
|
161 |
this.db.getCollection(MDStoreConstants.DISCARDED_PREFIX + collectionName).drop(); |
|
144 |
doDrop(dropped.getCurrentId()); |
|
162 | 145 |
} |
163 | 146 |
|
164 | 147 |
/** |
... | ... | |
169 | 152 |
@Override |
170 | 153 |
public String getMDStoreCollection(final String mdId) throws MDStoreException { |
171 | 154 |
verifyConsistency(); |
172 |
final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId); |
|
173 |
if (mdstoreInfo != null) { |
|
174 |
return (String) mdstoreInfo.get(MDStoreConstants.CURRENT_ID); |
|
175 |
} else { |
|
176 |
throw new MDStoreException("unable to find MDStore " + mdId); |
|
177 |
} |
|
155 |
return getInfoForCurrentMdStore(mdId).getCurrentId(); |
|
178 | 156 |
} |
179 | 157 |
|
180 | 158 |
/** |
... | ... | |
186 | 164 |
public String startTransaction(final String mdId, final boolean refresh) throws MDStoreException { |
187 | 165 |
verifyConsistency(); |
188 | 166 |
log.info("Start transaction for metadata store " + mdId); |
189 |
final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId); |
|
190 |
if (mdstoreInfo == null) throw new MDStoreException("Error, unable to find Mdstore with Id " + mdId); |
|
191 |
String idCreation = StringUtils.substringBefore(mdId, "_"); |
|
192 |
idCreation = idCreation + "::" + System.currentTimeMillis(); |
|
167 |
final MDStoreManagerInfo info = getInfoForCurrentMdStore(mdId); |
|
168 |
final String transactionId = StringUtils.substringBefore(mdId, "_") + "::" + System.currentTimeMillis(); |
|
193 | 169 |
|
194 |
BasicDBList values; |
|
195 |
if (mdstoreInfo.containsField(MDStoreConstants.TRANSACTIONS)) { |
|
196 |
values = (BasicDBList) mdstoreInfo.get(MDStoreConstants.TRANSACTIONS); |
|
197 |
if (values.size() > getMaxTransactions()) |
|
198 |
throw new MDStoreException(String.format("Cannot create more than %s transactions, found: %s, mdId: %s", getMaxTransactions(), values.size(), mdId)); |
|
199 |
} else { |
|
200 |
values = new BasicDBList(); |
|
170 |
if (info.getTransactions().size() > getMaxTransactions()) { |
|
171 |
throw new MDStoreException( |
|
172 |
String.format("Cannot create more than %s transactions, found: %s, mdId: %s", getMaxTransactions(), info.getTransactions().size(), mdId)); |
|
201 | 173 |
} |
202 |
final BasicDBObject transactionMetadata = new BasicDBObject(); |
|
203 |
transactionMetadata.put(MDStoreConstants.ID, idCreation.toString());
|
|
204 |
transactionMetadata.put(MDStoreConstants.REFRESH, refresh);
|
|
205 |
transactionMetadata.put(MDStoreConstants.DATE, new Date());
|
|
206 |
values.add(transactionMetadata);
|
|
207 |
mdstoreInfo.put(MDStoreConstants.TRANSACTIONS, values); |
|
208 |
getManagerTable().findOneAndReplace(new BasicDBObject(MDStoreConstants.INTERNAL_MONGO_ID, mdstoreInfo.get(MDStoreConstants.INTERNAL_MONGO_ID)), mdstoreInfo);
|
|
209 |
return idCreation.toString();
|
|
174 |
|
|
175 |
info.getTransactions().add(
|
|
176 |
MDStoreTransactionInfo.create()
|
|
177 |
.setId(transactionId)
|
|
178 |
.setRefresh(refresh));
|
|
179 |
|
|
180 |
getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
|
|
181 |
return transactionId;
|
|
210 | 182 |
} |
211 | 183 |
|
212 | 184 |
/** |
213 | 185 |
* {@inheritDoc} |
214 | 186 |
* |
215 |
* @see MDStoreTransactionManager#commit(String, String, |
|
216 |
* MDStore) |
|
187 |
* @see MDStoreTransactionManager#commit(String, String, MDStore) |
|
217 | 188 |
*/ |
218 | 189 |
@Override |
219 |
public boolean commit(final String transactionId, final String mdstoreId, final MDStore current) throws MDStoreException {
|
|
190 |
public boolean commit(final String transactionId, final String mdId, final MDStore current) throws MDStoreException { |
|
220 | 191 |
verifyConsistency(); |
221 |
final DBObject mdstoreInfo = getMDStoreAsDBObject(mdstoreId); |
|
222 |
if (mdstoreInfo == null) { |
|
223 |
throw new MDStoreException("Error, unable to find Mdstore with Id " + mdstoreId); |
|
224 |
} |
|
225 |
final BasicDBList transactions = (BasicDBList) mdstoreInfo.get(MDStoreConstants.TRANSACTIONS); |
|
192 |
final MDStoreManagerInfo info = getInfoForCurrentMdStore(mdId); |
|
193 |
final List<MDStoreTransactionInfo> transactions = info.getTransactions(); |
|
226 | 194 |
if (transactions.isEmpty()) { |
227 | 195 |
throw new MDStoreException("Error, unable to find transaction with Id " + transactionId); |
228 | 196 |
} |
229 |
final DBObject transaction = findTransaction(transactions, transactionId); |
|
230 |
final boolean refresh = (Boolean) transaction.get(MDStoreConstants.REFRESH); |
|
231 |
transactions.remove(transaction); |
|
232 |
final String oldId = (String) mdstoreInfo.get(MDStoreConstants.CURRENT_ID); |
|
233 |
if (refresh) { |
|
234 |
mdstoreInfo.put(MDStoreConstants.CURRENT_ID, transactionId); |
|
235 |
final BasicDBList stillUsed = (BasicDBList) mdstoreInfo.get(MDStoreConstants.EXPIRING); |
|
236 |
if (stillUsed.size() == 0) { |
|
237 |
db.getCollection(oldId).drop(); |
|
238 |
db.getCollection(MDStoreConstants.DISCARDED_PREFIX + oldId).drop(); |
|
197 |
final MDStoreTransactionInfo tx = findTransaction(transactions, transactionId); |
|
198 |
transactions.remove(tx); |
|
199 |
final String oldId = info.getCurrentId(); |
|
200 |
if (tx.getRefresh()) { |
|
201 |
info.setCurrentId(transactionId); |
|
202 |
if (info.getExpiring().isEmpty()) { |
|
203 |
doDrop(oldId); |
|
239 | 204 |
} |
240 | 205 |
log.debug("Replaced collection "); |
241 | 206 |
} else { |
242 | 207 |
log.debug("commit incremental "); |
243 | 208 |
updateIncremental(transactionId, oldId); |
244 |
db.getCollection(transactionId).drop(); |
|
245 |
db.getCollection(MDStoreConstants.DISCARDED_PREFIX + transactionId).drop(); |
|
209 |
doDrop(transactionId); |
|
246 | 210 |
} |
247 |
this.getManagerTable().findOneAndReplace(new BasicDBObject(MDStoreConstants.INTERNAL_MONGO_ID, mdstoreInfo.get(MDStoreConstants.INTERNAL_MONGO_ID)), mdstoreInfo);
|
|
211 |
getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
|
|
248 | 212 |
|
249 |
log.info("Committed transaction for metadata store " + mdstoreId);
|
|
213 |
log.info("Committed transaction for metadata store " + mdId); |
|
250 | 214 |
return true; |
251 | 215 |
} |
252 | 216 |
|
... | ... | |
257 | 221 |
* @param transactionId the transaction id |
258 | 222 |
* @return the DB object |
259 | 223 |
*/ |
260 |
private DBObject findTransaction(final BasicDBList transactions, final String transactionId) throws MDStoreException {
|
|
261 |
if (transactions.size() == 0) return null;
|
|
262 |
for (int i = 0; i < transactions.size(); i++) {
|
|
263 |
final BasicDBObject transaction = (BasicDBObject) transactions.get(i);
|
|
264 |
if (transactionId.equals(transaction.getString(MDStoreConstants.ID))) {
|
|
265 |
return transaction;
|
|
266 |
}
|
|
224 |
private MDStoreTransactionInfo findTransaction(final List<MDStoreTransactionInfo> transactions, final String transactionId) throws MDStoreException {
|
|
225 |
final Optional<MDStoreTransactionInfo> tx = transactions.stream()
|
|
226 |
.filter(t -> transactionId.equals(t.getId())).findFirst();
|
|
227 |
if (!tx.isPresent()) {
|
|
228 |
throw new MDStoreException("Error, unable to find transaction with Id " + transactionId);
|
|
229 |
} else {
|
|
230 |
return tx.get();
|
|
267 | 231 |
} |
268 |
throw new MDStoreException("Error, unable to find transaction with Id " + transactionId); |
|
269 | 232 |
} |
270 | 233 |
|
271 | 234 |
/** |
... | ... | |
274 | 237 |
* @see MDStoreTransactionManager#readMdStore(String) |
275 | 238 |
*/ |
276 | 239 |
@Override |
277 |
public String readMdStore(final String mdStoreId) throws MDStoreException {
|
|
240 |
public String readMdStore(final String mdId) throws MDStoreException { |
|
278 | 241 |
verifyConsistency(); |
279 |
final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId);
|
|
280 |
if (mdstoreInfo == null) throw new MDStoreException("Error, unable to find Mdstore with Id " + mdStoreId); |
|
281 |
final String currentId = (String) mdstoreInfo.get(MDStoreConstants.CURRENT_ID);
|
|
282 |
final BasicDBList values = (BasicDBList) mdstoreInfo.get(MDStoreConstants.EXPIRING);
|
|
283 |
updateMdstoreUsed(values, currentId);
|
|
284 |
this.getManagerTable().findOneAndReplace(new BasicDBObject(MDStoreConstants.INTERNAL_MONGO_ID, mdstoreInfo.get(MDStoreConstants.INTERNAL_MONGO_ID)), mdstoreInfo);
|
|
242 |
final MDStoreManagerInfo info = getInfoForCurrentMdStore(mdId);
|
|
243 |
|
|
244 |
final String currentId = info.getCurrentId();
|
|
245 |
final List<MDStoreExpiredInfo> expiring = info.getExpiring();
|
|
246 |
updateMdstoreUsed(expiring, currentId);
|
|
247 |
getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info);
|
|
285 | 248 |
return currentId; |
286 | 249 |
} |
287 | 250 |
|
288 | 251 |
/** |
289 | 252 |
* Update mdstore used. |
290 | 253 |
* |
291 |
* @param values the values
|
|
292 |
* @param mdId the md id
|
|
254 |
* @param expiring the values to be updated
|
|
255 |
* @param mdId the mdstore id
|
|
293 | 256 |
*/ |
294 |
private void updateMdstoreUsed(final BasicDBList values, final String mdId) { |
|
295 |
if (values.size() > 0) { |
|
296 |
for (int i = 0; i < values.size(); i++) { |
|
297 |
final DBObject obj = (DBObject) values.get(i); |
|
298 |
final String id = (String) obj.get(MDStoreConstants.ID); |
|
299 |
if (mdId.equals(id)) { |
|
300 |
obj.put(MDStoreConstants.LAST_READ, new Date()); |
|
301 |
return; |
|
302 |
} |
|
303 |
} |
|
257 |
private void updateMdstoreUsed(final List<MDStoreExpiredInfo> expiring, final String mdId) { |
|
258 |
final Optional<MDStoreExpiredInfo> expiringInfo = expiring.stream() |
|
259 |
.filter(info -> mdId.equals(info.getId())) |
|
260 |
.findFirst(); |
|
261 |
|
|
262 |
if (expiringInfo.isPresent()) { |
|
263 |
expiringInfo.get().setLastRead(new Date()); |
|
264 |
} else { |
|
265 |
expiring.add(MDStoreExpiredInfo.create().setId(mdId)); |
|
304 | 266 |
} |
305 |
final BasicDBObject readStore = new BasicDBObject(); |
|
306 |
readStore.put(MDStoreConstants.ID, mdId); |
|
307 |
readStore.put(MDStoreConstants.LAST_READ, new Date()); |
|
308 |
values.add(readStore); |
|
309 | 267 |
} |
310 | 268 |
|
311 | 269 |
/** |
... | ... | |
313 | 271 |
* |
314 | 272 |
* @return the managerTable |
315 | 273 |
*/ |
316 |
public MongoCollection<DBObject> getManagerTable() {
|
|
274 |
private MongoCollection<MDStoreManagerInfo> getManagerTable() {
|
|
317 | 275 |
return managerTable; |
318 | 276 |
} |
319 | 277 |
|
320 | 278 |
/** |
321 |
* Sets the manager table. |
|
322 |
* |
|
323 |
* @param managerTable the managerTable to set |
|
324 |
*/ |
|
325 |
public void setManagerTable(final MongoCollection<DBObject> managerTable) { |
|
326 |
this.managerTable = managerTable; |
|
327 |
} |
|
328 |
|
|
329 |
/* |
|
330 | 279 |
* (non-Javadoc) |
331 | 280 |
* |
332 |
* @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#getInfoForCurrentMdStore(java.lang.String)
|
|
281 |
* @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#getInfoForCurrentMdStore(java.lang.String)
|
|
333 | 282 |
*/ |
334 | 283 |
@Override |
335 |
public MDStoreManagerInfo getInfoForCurrentMdStore(final String mdStoreId) throws MDStoreException {
|
|
284 |
public MDStoreManagerInfo getInfoForCurrentMdStore(final String mdId) throws MDStoreException { |
|
336 | 285 |
verifyConsistency(); |
337 |
final DBObject mdstoreInfo = getMDStoreAsDBObject(mdStoreId); |
|
338 |
if (mdstoreInfo == null) throw new MDStoreException("Error, unable to find Mdstore with Id " + mdStoreId); |
|
339 |
final MDStoreManagerInfo result = new MDStoreManagerInfo(); |
|
340 |
result.setCurrentId((String) mdstoreInfo.get(MDStoreConstants.CURRENT_ID)); |
|
341 |
result.setMdId((String) mdstoreInfo.get(MDStoreConstants.MDID)); |
|
342 |
final BasicDBList values = (BasicDBList) mdstoreInfo.get(MDStoreConstants.EXPIRING); |
|
343 |
for (int i = 0; i < values.size(); i++) { |
|
344 |
final MDStoreExpiredInfo stillused = new MDStoreExpiredInfo(); |
|
345 |
final DBObject value = (DBObject) values.get(i); |
|
346 |
stillused.setId((String) value.get(MDStoreConstants.ID)); |
|
347 |
stillused.setLastRead((Date) value.get(MDStoreConstants.LAST_READ)); |
|
348 |
result.addExpiredItem(stillused); |
|
286 |
final MDStoreManagerInfo managerInfo = getManagerTable().find(Filters.eq(MDID, mdId)).first(); |
|
287 |
if (managerInfo == null) { |
|
288 |
throw new MDStoreException("Error, unable to find MDStore " + mdId); |
|
349 | 289 |
} |
350 |
final BasicDBList transactions = (BasicDBList) mdstoreInfo.get(MDStoreConstants.TRANSACTIONS); |
|
351 |
if (transactions != null) { |
|
352 |
for (int i = 0; i < transactions.size(); i++) { |
|
353 |
final MDStoreTransactionInfo transaction = new MDStoreTransactionInfo(); |
|
354 |
final DBObject value = (DBObject) transactions.get(i); |
|
355 |
final String transactionId = (String) value.get(MDStoreConstants.ID); |
|
356 |
transaction.setId(transactionId); |
|
357 |
transaction.setDate((Date) value.get(MDStoreConstants.DATE)); |
|
358 |
transaction.setRefresh((Boolean) value.get(MDStoreConstants.REFRESH)); |
|
359 |
transaction.setSize(db.getCollection(transactionId).count()); |
|
360 |
result.addTransactionInfo(transaction); |
|
361 |
} |
|
362 |
} |
|
363 |
return result; |
|
290 |
return managerInfo; |
|
364 | 291 |
} |
365 | 292 |
|
366 |
/* |
|
293 |
/**
|
|
367 | 294 |
* (non-Javadoc) |
368 | 295 |
* |
369 |
* @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropUsed(java.lang.String, java.lang.String)
|
|
296 |
* @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#dropUsed(java.lang.String, java.lang.String)
|
|
370 | 297 |
*/ |
371 | 298 |
@Override |
372 | 299 |
public Boolean dropUsed(final String mdId, final String idToDrop) throws MDStoreException { |
373 | 300 |
verifyConsistency(); |
374 |
final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId); |
|
375 |
if (mdstoreInfo == null) throw new MDStoreException("Error, unable to find Mdstore with Id " + mdId); |
|
376 |
return dropStore(mdstoreInfo, idToDrop, MDStoreConstants.EXPIRING); |
|
377 |
} |
|
301 |
final MDStoreManagerInfo managerInfo = getInfoForCurrentMdStore(mdId); |
|
378 | 302 |
|
379 |
private boolean dropStore(DBObject mdstoreInfo, final String idToDrop, String transactionListName) throws MDStoreException { |
|
380 |
final BasicDBList transactionList = (BasicDBList) mdstoreInfo.get(transactionListName); |
|
381 |
for (int i = 0; i < transactionList.size(); i++) { |
|
382 |
final DBObject value = (DBObject) transactionList.get(i); |
|
383 |
final String currentUsedId = (String) value.get(MDStoreConstants.ID); |
|
384 |
if (currentUsedId.equals(idToDrop)) { |
|
385 |
db.getCollection(idToDrop).drop(); |
|
386 |
db.getCollection(MDStoreConstants.DISCARDED_PREFIX + idToDrop).drop(); |
|
387 |
transactionList.remove(value); |
|
388 |
mdstoreInfo.put(transactionListName, transactionList); |
|
389 |
this.getManagerTable().findOneAndReplace(new BasicDBObject( |
|
390 |
MDStoreConstants.INTERNAL_MONGO_ID, mdstoreInfo.get(MDStoreConstants.INTERNAL_MONGO_ID)), mdstoreInfo); |
|
391 |
return true; |
|
392 |
} |
|
303 |
final Optional<MDStoreExpiredInfo> info = managerInfo.getExpiring().stream() |
|
304 |
.filter(i -> idToDrop.equals(i.getId())) |
|
305 |
.findFirst(); |
|
306 |
|
|
307 |
if (info.isPresent()) { |
|
308 |
doDrop(idToDrop); |
|
309 |
managerInfo.getExpiring().remove(info.get()); |
|
310 |
|
|
311 |
getManagerTable().findOneAndReplace(Filters.eq(MDID, managerInfo.getMdId()), managerInfo); |
|
312 |
return true; |
|
313 |
} else { |
|
314 |
throw new MDStoreException("Error, unable to drop collection " + idToDrop); |
|
393 | 315 |
} |
394 |
throw new MDStoreException("Error, unable to drop collection " + idToDrop); |
|
395 | 316 |
} |
396 | 317 |
|
397 |
/* |
|
318 |
private void doDrop(final String mdId) { |
|
319 |
db.getCollection(mdId).drop(); |
|
320 |
db.getCollection(DISCARDED_PREFIX + mdId).drop(); |
|
321 |
log.debug("deleted collection " + mdId); |
|
322 |
} |
|
323 |
|
|
324 |
/** |
|
398 | 325 |
* (non-Javadoc) |
399 | 326 |
* |
400 |
* @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#garbage()
|
|
327 |
* @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#garbage()
|
|
401 | 328 |
*/ |
402 | 329 |
@Override |
403 | 330 |
public void garbage() throws MDStoreException { |
404 | 331 |
verifyConsistency(); |
405 | 332 |
log.info("Start garbage collection of MdStore"); |
406 |
final FindIterable<DBObject> it = this.managerTable.find(); |
|
407 | 333 |
int totalDeleted = 0; |
408 |
for (DBObject currentObject : it) {
|
|
334 |
for (MDStoreManagerInfo currentInfo : getManagerTable().find()) {
|
|
409 | 335 |
if (log.isDebugEnabled()) { |
410 |
log.debug("start to check id: " + currentObject.get(MDStoreConstants.CURRENT_ID));
|
|
336 |
log.debug("start to check id: " + currentInfo.getCurrentId());
|
|
411 | 337 |
} |
412 |
garbageExpiring(currentObject, (String) currentObject.get(MDStoreConstants.CURRENT_ID));
|
|
413 |
garbageTransactions(currentObject, (String) currentObject.get(MDStoreConstants.CURRENT_ID));
|
|
414 |
this.getManagerTable().findOneAndReplace(new BasicDBObject(MDStoreConstants.INTERNAL_MONGO_ID, currentObject.get(MDStoreConstants.INTERNAL_MONGO_ID)), currentObject);
|
|
338 |
garbageExpiring(currentInfo);
|
|
339 |
garbageTransactions(currentInfo);
|
|
340 |
getManagerTable().findOneAndReplace(Filters.eq(MDID, currentInfo.getMdId()), currentInfo);
|
|
415 | 341 |
} |
416 | 342 |
|
417 | 343 |
// DELETING Collection that are not in the metadataManager table |
418 |
MongoIterable<String> collections = this.db.listCollectionNames(); |
|
419 |
for (String collection : collections) { |
|
420 |
if ((collection.length() > 30) && (collection.contains(MDStoreConstants.DISCARDED_PREFIX) == false) && (collection.contains("resolved") == false)) { |
|
421 |
final DBObject item = getMetadataObjectForCollections(collection); |
|
422 | 344 |
|
423 |
if (shouldDelete(collection, item)) { |
|
345 |
for (String coll : db.listCollectionNames()) { |
|
346 |
if ((coll.length() > 30) && (!coll.contains(DISCARDED_PREFIX)) && (!coll.contains("resolved"))) { |
|
347 |
final MDStoreManagerInfo info = getMetadataObjectForCollections(coll); |
|
348 |
|
|
349 |
if (shouldDelete(coll, info)) { |
|
424 | 350 |
if (log.isDebugEnabled()) { |
425 |
log.debug("delete collection: " + collection + " from mongo");
|
|
351 |
log.debug("delete collection: " + coll + " from mongo"); |
|
426 | 352 |
} |
427 |
db.getCollection(collection).drop(); |
|
428 |
db.getCollection(MDStoreConstants.DISCARDED_PREFIX + collection).drop(); |
|
353 |
doDrop(coll); |
|
429 | 354 |
if (log.isDebugEnabled()) { |
430 |
log.debug("delete collection: discarded-" + collection + " from mongo");
|
|
355 |
log.debug("delete collection: discarded-" + coll + " from mongo"); |
|
431 | 356 |
} |
432 | 357 |
} |
433 | 358 |
} |
... | ... | |
436 | 361 |
log.info("Complete garbage collection of MdStore, total store deleted: " + totalDeleted); |
437 | 362 |
} |
438 | 363 |
|
439 |
private DBObject getMetadataObjectForCollections(final String collectionName) { |
|
440 |
if (collectionName == null) return null; |
|
441 |
final String tmp = collectionName.contains( |
|
442 |
MDStoreConstants.DISCARDED_PREFIX) == true ? StringUtils.substringAfter(collectionName, MDStoreConstants.DISCARDED_PREFIX) : collectionName; |
|
443 |
final String collectionNameCleaned = StringUtils.substringBefore(tmp, "::") + MDStoreConstants.MDSTORE_ID_SUFFIX; |
|
364 |
private boolean shouldDelete(final String collection, final MDStoreManagerInfo managerInfo) { |
|
444 | 365 |
|
445 |
//DBObject query = QueryBuilder.start("mdId").is(collectionNameCleaned).get(); |
|
446 |
Bson query = Filters.eq(MDStoreConstants.MDID, collectionNameCleaned); |
|
447 |
return this.managerTable.find(query).first(); |
|
448 |
|
|
449 |
} |
|
450 |
|
|
451 |
private boolean shouldDelete(final String collectionName, final DBObject metadataManagerInstance) { |
|
452 |
log.debug("should delete instance " + metadataManagerInstance); |
|
453 |
if ((metadataManagerInstance == null) || (metadataManagerInstance.get(MDStoreConstants.CURRENT_ID) == null)) { |
|
366 |
if (StringUtils.isBlank(managerInfo.getCurrentId())) { |
|
454 | 367 |
log.debug("the instance has not currentID"); |
455 | 368 |
return true; |
456 | 369 |
} |
457 |
String currentId = (String) metadataManagerInstance.get(MDStoreConstants.CURRENT_ID); |
|
458 |
if (collectionName.equals(currentId)) return false; |
|
459 |
BasicDBList expiringList = (BasicDBList) metadataManagerInstance.get(MDStoreConstants.EXPIRING); |
|
460 |
if (findInList(expiringList, collectionName, MDStoreConstants.ID) == true) return false; |
|
461 |
BasicDBList transactionsList = (BasicDBList) metadataManagerInstance.get(MDStoreConstants.TRANSACTIONS); |
|
462 |
return findInList(transactionsList, collectionName, MDStoreConstants.ID) != true; |
|
370 |
if (collection.equals(managerInfo.getCurrentId())) { |
|
371 |
return false; |
|
372 |
} |
|
373 |
if (managerInfo.getExpiring().stream().anyMatch(i -> collection.equals(i.getId()))) { |
|
374 |
return false; |
|
375 |
} |
|
376 |
return managerInfo.getTransactions().stream().anyMatch(i -> collection.equals(i.getId())); |
|
463 | 377 |
} |
464 | 378 |
|
465 |
private boolean findInList(final BasicDBList list, final String object, final String tagname) { |
|
466 |
if (list == null) return false; |
|
467 |
for (int i = 0; i < list.size(); i++) { |
|
468 |
DBObject currentObject = (DBObject) list.get(i); |
|
469 |
final String id = (String) currentObject.get(tagname); |
|
470 |
if (id.equals(object)) return true; |
|
379 |
private MDStoreManagerInfo getMetadataObjectForCollections(final String collection) throws MDStoreException { |
|
380 |
if (collection == null) { |
|
381 |
throw new MDStoreException("missing collection name"); |
|
471 | 382 |
} |
472 |
return false; |
|
473 |
} |
|
383 |
final String tmp = collection.contains( |
|
384 |
DISCARDED_PREFIX) == true ? StringUtils.substringAfter(collection, DISCARDED_PREFIX) : collection; |
|
385 |
final String collectionNameCleaned = StringUtils.substringBefore(tmp, "::") + MDSTORE_ID_SUFFIX; |
|
474 | 386 |
|
475 |
/** |
|
476 |
* Delete. |
|
477 |
* |
|
478 |
* @param list the list |
|
479 |
* @param toRemove the to remove |
|
480 |
*/ |
|
481 |
private void delete(final BasicDBList list, final List<DBObject> toRemove) { |
|
482 |
for (final DBObject obj : toRemove) { |
|
483 |
if (log.isDebugEnabled()) { |
|
484 |
log.debug("deleting " + obj); |
|
485 |
} |
|
486 |
list.remove(obj); |
|
487 |
} |
|
387 |
return getManagerTable().find(Filters.eq(MDID, collectionNameCleaned)).first(); |
|
488 | 388 |
} |
489 | 389 |
|
490 | 390 |
/** |
491 | 391 |
* Garbage transactions. |
492 | 392 |
* |
493 |
* @param currentObject the current object |
|
494 |
* @param currentId the current id |
|
393 |
* @param currentInfo the current object |
|
495 | 394 |
*/ |
496 |
private void garbageTransactions(final DBObject currentObject, final String currentId) {
|
|
395 |
private void garbageTransactions(final MDStoreManagerInfo currentInfo) {
|
|
497 | 396 |
if (log.isDebugEnabled()) { |
498 | 397 |
log.debug("Start garbage transactions "); |
499 | 398 |
} |
500 | 399 |
|
501 |
final BasicDBList expiring = (BasicDBList) currentObject.get(MDStoreConstants.TRANSACTIONS); |
|
502 |
if ((expiring == null) || (expiring.size() <= getMaxTransactions())) return; |
|
503 |
|
|
504 |
List<DBObject> expiringList = Lists.newArrayList(); |
|
505 |
|
|
506 |
for (int i = 0; i < expiring.size(); i++) { |
|
507 |
final DBObject cobj = (DBObject) expiring.get(i); |
|
508 |
if (cobj != null) { |
|
509 |
expiringList.add((DBObject) expiring.get(i)); |
|
510 |
} |
|
511 |
|
|
400 |
if (currentInfo.getTransactions().size() <= getMaxTransactions()) { |
|
401 |
return; |
|
512 | 402 |
} |
513 | 403 |
|
514 |
Collections.sort(expiringList, MDStoreUtils.getComparatorOnDate()); |
|
404 |
final TreeSet<MDStoreTransactionInfo> expiringSet = Sets.newTreeSet(MDStoreUtils.getComparatorOnDate()); |
|
405 |
expiringSet.addAll(currentInfo.getTransactions()); |
|
515 | 406 |
|
516 |
List<DBObject> toRemove = Lists.newArrayList(); |
|
517 |
int i = 0; |
|
407 |
int toRemove = 0; |
|
518 | 408 |
|
519 |
// We should remove the k item less recent |
|
520 |
// where k = numberOftotalTransaction - maxNumberOfTransaction |
|
521 |
// k = numberOfItemToRemove |
|
522 |
|
|
523 |
while (((expiringList.size() - toRemove.size()) > getMaxTransactions()) || (i < expiringList.size())) { |
|
524 |
DBObject currentObj = expiringList.get(i++); |
|
525 |
String objectId = (String) currentObj.get(MDStoreConstants.ID); |
|
526 |
if (!objectId.equals(currentId)) { |
|
409 |
for(MDStoreTransactionInfo i : expiringSet) { |
|
410 |
if (currentInfo.getTransactions().size() - toRemove >= getMaxTransactions()) { |
|
411 |
return; |
|
412 |
} |
|
413 |
if (!i.getId().equals(currentInfo.getCurrentId())) { |
|
527 | 414 |
if (log.isDebugEnabled()) { |
528 |
log.debug("delete collection: " + objectId + " from mongo");
|
|
415 |
log.debug("delete collection: " + i.getId() + " from mongo");
|
|
529 | 416 |
} |
530 |
db.getCollection(objectId).drop(); |
|
531 |
db.getCollection(MDStoreConstants.DISCARDED_PREFIX + objectId).drop(); |
|
532 |
if (log.isDebugEnabled()) { |
|
533 |
log.debug("delete collection: discarded-" + objectId + " from mongo"); |
|
534 |
} |
|
535 |
toRemove.add(currentObj); |
|
417 |
toRemove++; |
|
418 |
doDrop(i.getId()); |
|
536 | 419 |
} else { |
537 |
if (log.isDebugEnabled()) { |
|
538 |
log.debug("Cannot remove transaction " + objectId + " because is the currentId: " + currentId); |
|
539 |
} |
|
420 |
log.debug("Cannot remove transaction " + i.getId() + " because is the currentId: " + currentInfo.getCurrentId()); |
|
540 | 421 |
} |
541 | 422 |
} |
542 |
|
|
543 |
delete(expiring, toRemove); |
|
544 |
log.info("Deleted " + toRemove.size() + " transactions, mdStore Id:" + currentObject.get(MDStoreConstants.MDID)); |
|
423 |
log.info("Deleted " + toRemove + " transactions, mdStore Id:" + currentInfo.getMdId()); |
|
545 | 424 |
} |
546 | 425 |
|
547 | 426 |
/** |
548 | 427 |
* Garbage expiring. |
549 | 428 |
* |
550 |
* @param currentObject the current object |
|
551 |
* @param currentId the current id |
|
429 |
* @param currentInfo the current object |
|
552 | 430 |
*/ |
553 |
private void garbageExpiring(final DBObject currentObject, final String currentId) {
|
|
431 |
private void garbageExpiring(final MDStoreManagerInfo currentInfo) {
|
|
554 | 432 |
if (log.isDebugEnabled()) { |
555 |
log.debug("Start to search expiring mdstores for id: " + currentObject.get(MDStoreConstants.MDID));
|
|
433 |
log.debug("Start to search expiring mdstores for id: " + currentInfo.getMdId());
|
|
556 | 434 |
} |
557 |
final BasicDBList expiring = (BasicDBList) currentObject.get(MDStoreConstants.EXPIRING); |
|
558 |
final List<DBObject> toRemove = Lists.newArrayList(); |
|
559 |
if (log.isDebugEnabled()) { |
|
560 |
if (expiring == null) { |
|
561 |
log.debug("expiring list is null"); |
|
562 |
} else { |
|
563 |
log.debug("expiring list size is :" + expiring.size()); |
|
564 |
} |
|
565 |
} |
|
566 |
if ((expiring == null) || (expiring.size() == 0)) { |
|
567 |
log.debug("Deleted 0 expired collections, mdStore Id:" + currentObject.get(MDStoreConstants.MDID)); |
|
568 |
return; |
|
569 |
} |
|
570 |
for (int i = 0; i < expiring.size(); i++) { |
|
571 |
final DBObject currentExpiredStore = (DBObject) expiring.get(i); |
|
572 |
final String currentUsedId = (String) currentExpiredStore.get(MDStoreConstants.ID); |
|
573 |
final long d = getExpiringDays(currentExpiredStore, MDStoreConstants.LAST_READ); |
|
574 |
if (log.isDebugEnabled()) { |
|
575 |
log.debug("the store :" + currentId + " expired since " + d + "days "); |
|
576 |
} |
|
577 |
// DELETE the collection where the last time they was read |
|
578 |
// is more than 3 days ago |
|
435 |
final List<MDStoreExpiredInfo> expiring = currentInfo.getExpiring(); |
|
436 |
expiring.forEach(i -> { |
|
437 |
|
|
438 |
final LocalDate readDate = i.getLastRead().toInstant().atZone(ZoneId.systemDefault()).toLocalDate(); |
|
439 |
final long d = Duration.between(LocalDate.now().atTime(0, 0), readDate.atTime(0, 0)).toDays(); |
|
579 | 440 |
if (d > getExpiredDays()) { |
580 |
if (currentUsedId.equals(currentId) == false) { |
|
581 |
db.getCollection(currentUsedId).drop(); |
|
582 |
db.getCollection(MDStoreConstants.DISCARDED_PREFIX + currentUsedId).drop(); |
|
583 |
log.debug("deleted collection " + currentUsedId); |
|
441 |
if (!i.getId().equals(currentInfo.getCurrentId())) { |
|
442 |
doDrop(i.getId()); |
|
584 | 443 |
} |
585 |
toRemove.add(currentExpiredStore); |
|
586 | 444 |
} |
587 |
} |
|
588 |
delete(expiring, toRemove); |
|
589 |
log.debug("Deleted expired " + toRemove.size() + "collections, mdStore Id:" + currentObject.get(MDStoreConstants.MDID)); |
|
590 |
} |
|
445 |
}); |
|
591 | 446 |
|
592 |
/** |
|
593 |
* Gets the expiring days. |
|
594 |
* |
|
595 |
* @param value the value |
|
596 |
* @param paramName the param name |
|
597 |
* @return the expiring days |
|
598 |
*/ |
|
599 |
private long getExpiringDays(final DBObject value, final String paramName) { |
|
600 |
|
|
601 |
final Date lastRead = (Date) value.get(paramName); |
|
602 |
|
|
603 |
final LocalDate readDate = lastRead.toInstant().atZone(ZoneId.systemDefault()).toLocalDate(); |
|
604 |
return Duration.between(LocalDate.now().atTime(0, 0), readDate.atTime(0, 0)).toDays(); |
|
447 |
log.debug("Deleted expired collections, mdId: " + currentInfo.getMdId()); |
|
605 | 448 |
} |
606 | 449 |
|
607 | 450 |
/** |
608 |
* Gets the expired days. |
|
609 |
* |
|
610 |
* @return the expiredDays |
|
611 |
*/ |
|
612 |
public int getExpiredDays() { |
|
613 |
if (this.expiredDays == 0) return 3; |
|
614 |
return expiredDays; |
|
615 |
} |
|
616 |
|
|
617 |
/** |
|
618 |
* Sets the expired days. |
|
619 |
* |
|
620 |
* @param expiredDays the expiredDays to set |
|
621 |
*/ |
|
622 |
public void setExpiredDays(final int expiredDays) { |
|
623 |
this.expiredDays = expiredDays; |
|
624 |
} |
|
625 |
|
|
626 |
/** |
|
627 | 451 |
* Update incremental. |
628 | 452 |
* |
629 | 453 |
* @param transactionId the transaction id |
630 | 454 |
* @param currentId the current id |
631 | 455 |
*/ |
632 | 456 |
private void updateIncremental(final String transactionId, final String currentId) { |
633 |
final MongoCollection<DBObject> transaction = db.getCollection(transactionId, DBObject.class); |
|
634 |
final MongoCollection<DBObject> mdstore = db.getCollection(currentId, DBObject.class); |
|
635 |
final FindIterable<DBObject> it = transaction.find().noCursorTimeout(true); |
|
636 |
for (DBObject currentObj : it) { |
|
457 |
final MongoCollection<MDStoreRecord> transaction = db.getCollection(transactionId, MDStoreRecord.class); |
|
458 |
final MongoCollection<MDStoreRecord> mdstore = db.getCollection(currentId, MDStoreRecord.class); |
|
637 | 459 |
|
638 |
BasicDBObject newObj = new BasicDBObject();
|
|
460 |
for (MDStoreRecord record : transaction.find().noCursorTimeout(true)) {
|
|
639 | 461 |
|
640 |
final String id = (String) currentObj.get(MDStoreConstants.ID); |
|
641 |
final String body = (String) currentObj.get(MDStoreConstants.BODY); |
|
642 |
newObj.put(MDStoreConstants.ID, id); |
|
643 |
newObj.put(MDStoreConstants.BODY, body); |
|
644 |
if (StringUtils.isNotBlank(id)) { |
|
462 |
final MDStoreRecord r = record.clone(); |
|
463 |
if (StringUtils.isNotBlank(r.getId())) { |
|
645 | 464 |
//setting to journaled write concern to be sure that when the write returns everything has been flushed to disk (https://docs.mongodb.org/manual/faq/developers/#when-does-mongodb-write-updates-to-disk) |
646 | 465 |
//the explicit fsync command can't be run anymore: 'Command failed with error 13: 'fsync may only be run against the admin database.' |
647 |
final MongoCollection<DBObject> mdstoreWrite = mdstore.withWriteConcern(WriteConcern.JOURNALED);
|
|
648 |
mdstoreWrite.replaceOne(new BasicDBObject(MDStoreConstants.ID, id), newObj, new UpdateOptions().upsert(true));
|
|
466 |
final MongoCollection<MDStoreRecord> mdstoreWrite = mdstore.withWriteConcern(WriteConcern.JOURNALED);
|
|
467 |
mdstoreWrite.replaceOne(Filters.eq(ID, r.getId()), r, new UpdateOptions().upsert(true));
|
|
649 | 468 |
} |
650 | 469 |
} |
651 | 470 |
} |
652 | 471 |
|
653 |
/* |
|
472 |
/**
|
|
654 | 473 |
* (non-Javadoc) |
655 | 474 |
* |
656 |
* @see eu.dnetlib.data.mdstore.modular.connector.MDStoreTransactionManager#dropTransaction(java.lang.String, java.lang.String)
|
|
475 |
* @see eu.dnetlib.msro.workers.aggregation.mdstore.MDStoreTransactionManager#dropTransaction(java.lang.String, java.lang.String)
|
|
657 | 476 |
*/ |
658 | 477 |
@Override |
659 | 478 |
public Boolean dropTransaction(final String mdId, final String idToDrop) throws MDStoreException { |
660 | 479 |
verifyConsistency(); |
661 |
final DBObject mdstoreInfo = getMDStoreAsDBObject(mdId); |
|
662 |
if (mdstoreInfo == null) throw new MDStoreException("Error, unable to find Mdstore with Id " + mdId); |
|
663 |
return dropStore(mdstoreInfo, idToDrop, MDStoreConstants.TRANSACTIONS); |
|
480 |
final MDStoreManagerInfo managerInfo = getInfoForCurrentMdStore(mdId); |
|
481 |
managerInfo.getTransactions().removeIf(info -> { |
|
482 |
if (idToDrop.equals(info.getId())) { |
|
483 |
doDrop(idToDrop); |
|
484 |
return true; |
|
485 |
} else { |
|
486 |
return false; |
|
487 |
} |
|
488 |
}); |
|
489 |
getManagerTable().findOneAndReplace(Filters.eq(MDID, managerInfo.getMdId()), managerInfo); |
|
490 |
return true; |
|
664 | 491 |
} |
665 | 492 |
|
666 | 493 |
public void garbageTransactionsOnStart() throws MDStoreException { |
667 | 494 |
verifyConsistency(); |
668 |
FindIterable<DBObject> it = this.managerTable.find(); |
|
669 |
for (DBObject currentObject : it) { |
|
670 |
final BasicDBList transactions = (BasicDBList) currentObject.get(MDStoreConstants.TRANSACTIONS); |
|
671 |
if ((transactions != null) && (transactions.size() > 0)) { |
|
672 |
for (int i = 0; i < transactions.size(); i++) { |
|
673 |
final DBObject currentTransactions = (DBObject) transactions.get(i); |
|
674 |
final String id = (String) currentTransactions.get(MDStoreConstants.ID); |
|
675 |
db.getCollection(id).drop(); |
|
676 |
db.getCollection(MDStoreConstants.DISCARDED_PREFIX + id).drop(); |
|
677 |
log.debug("deleted collection " + id); |
|
678 |
} |
|
679 |
currentObject.put(MDStoreConstants.TRANSACTIONS, new BasicDBList()); |
|
680 |
this.getManagerTable().findOneAndReplace(new BasicDBObject( |
|
681 |
MDStoreConstants.INTERNAL_MONGO_ID, currentObject.get(MDStoreConstants.INTERNAL_MONGO_ID)), currentObject); |
|
682 |
} |
|
495 |
for (MDStoreManagerInfo info : getManagerTable().find()) { |
|
496 |
info.getTransactions().forEach(tx -> doDrop(tx.getId())); |
|
497 |
info.setTransactions(Lists.newArrayList()); |
|
498 |
getManagerTable().findOneAndReplace(Filters.eq(MDID, info.getMdId()), info); |
|
683 | 499 |
} |
684 | 500 |
} |
685 | 501 |
|
... | ... | |
701 | 517 |
this.maxTransactions = maxTransactions; |
702 | 518 |
} |
703 | 519 |
|
520 |
/** |
|
521 |
* Gets the expired days. |
|
522 |
* |
|
523 |
* @return the expiredDays |
|
524 |
*/ |
|
525 |
public int getExpiredDays() { |
|
526 |
return expiredDays; |
|
527 |
} |
|
528 |
|
|
529 |
/** |
|
530 |
* Sets the expired days. |
|
531 |
* |
|
532 |
* @param expiredDays the expiredDays to set |
|
533 |
*/ |
|
534 |
public void setExpiredDays(final int expiredDays) { |
|
535 |
this.expiredDays = expiredDays; |
|
536 |
} |
|
704 | 537 |
} |
Also available in: Unified diff
migrated mdstore implementation, early tests