Revision 46532
Added by Claudio Atzori about 7 years ago
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/mongo/MongoMDStore.java | ||
---|---|---|
13 | 13 |
import com.mongodb.client.ListIndexesIterable; |
14 | 14 |
import com.mongodb.client.MongoCollection; |
15 | 15 |
import com.mongodb.client.MongoCursor; |
16 |
import com.mongodb.client.model.IndexOptions; |
|
16 | 17 |
import eu.dnetlib.data.mdstore.MDStore; |
17 | 18 |
import eu.dnetlib.data.mdstore.model.MDStoreRecord; |
18 | 19 |
import eu.dnetlib.data.mdstore.model.ReadLock; |
... | ... | |
59 | 60 |
final BlockingQueue<MDStoreRecord> queue = new ArrayBlockingQueue<>(BULK_SIZE); |
60 | 61 |
final MDStoreRecord sentinel = new MDStoreRecord(); |
61 | 62 |
int countStored = 0; |
62 |
final Callable<Integer> writer = () -> {
|
|
63 |
final MongoBulkWritesManager bulkWritesManager =
|
|
63 |
final Future<Integer> count = Executors.newSingleThreadExecutor().submit(() -> {
|
|
64 |
final MongoBulkWritesManager writer =
|
|
64 | 65 |
new MongoBulkWritesManager(recordCollection, discardedCollection, BULK_SIZE); |
65 |
int count = 0; |
|
66 |
int counter = 0;
|
|
66 | 67 |
while (true) { |
67 | 68 |
try { |
68 |
final MDStoreRecord MDStoreRecord = queue.take();
|
|
69 |
if (MDStoreRecord.equals(sentinel)) {
|
|
70 |
bulkWritesManager.flushBulks();
|
|
69 |
final MDStoreRecord record = queue.take();
|
|
70 |
if (record.equals(sentinel)) {
|
|
71 |
writer.flushBulks();
|
|
71 | 72 |
break; |
72 | 73 |
} |
73 |
count++; |
|
74 |
bulkWritesManager.insert(MDStoreRecord);
|
|
74 |
counter++;
|
|
75 |
writer.insert(record);
|
|
75 | 76 |
} catch (final InterruptedException e) { |
76 | 77 |
log.fatal("got exception in background thread", e); |
77 | 78 |
throw new IllegalStateException(e); |
78 | 79 |
} |
79 | 80 |
} |
80 |
log.debug(String.format("extracted %s records from feeder queue", count)); |
|
81 |
return count; |
|
82 |
}; |
|
81 |
log.debug(String.format("extracted %s records from feeder queue", counter));
|
|
82 |
return counter;
|
|
83 |
});
|
|
83 | 84 |
|
84 |
final Future<Integer> count = Executors.newSingleThreadExecutor().submit(writer); |
|
85 |
|
|
86 | 85 |
try { |
87 | 86 |
log.info("feeding mdstore " + id); |
88 | 87 |
records.forEach(r -> { |
... | ... | |
107 | 106 |
} |
108 | 107 |
|
109 | 108 |
public void ensureIndices() { |
110 |
indexes.forEach(key -> recordCollection.createIndex(new BasicDBObject(key, 1))); |
|
109 |
final IndexOptions op = new IndexOptions().background(true); |
|
110 |
indexes.forEach(key -> recordCollection.createIndex(new BasicDBObject(key, 1), op)); |
|
111 | 111 |
} |
112 | 112 |
|
113 | 113 |
public boolean isIndexed() { |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/mongo/MongoMDStoreService.java | ||
---|---|---|
96 | 96 |
mongoDatabase.getCollection(name).drop(); |
97 | 97 |
}); |
98 | 98 |
|
99 |
//TODO check repositories has proper indexes |
|
100 |
//createIndex(MDID, getTxManager()); |
|
101 |
|
|
102 | 99 |
log.info("Bootstrap mdstore complete!"); |
103 | 100 |
} |
104 | 101 |
|
... | ... | |
107 | 104 |
log.debug("Creating new mdstore"); |
108 | 105 |
|
109 | 106 |
final String mdId = getProfileManager().registerProfile(format, layout, interpretation); |
110 |
|
|
111 |
final Metadata m = Metadata.create() |
|
107 |
metadata.save(Metadata.create() |
|
112 | 108 |
.setMdId(mdId) |
113 | 109 |
.setCurrentId(mdId) |
114 | 110 |
.setFormat(format) |
115 | 111 |
.setLayout(layout) |
116 |
.setInterpretation(interpretation); |
|
117 |
metadata.save(m); |
|
112 |
.setInterpretation(interpretation)); |
|
113 |
mongoDatabase.createCollection(mdId); |
|
114 |
((MongoMDStore) getMDStore(mdId)).ensureIndices(); |
|
118 | 115 |
|
119 |
mongoDatabase.createCollection(m.getCurrentId()); |
|
120 |
createIndex(ID, mongoDatabase.getCollection(m.getCurrentId())); |
|
121 |
|
|
122 | 116 |
log.info("Created new mdstore: " + mdId); |
123 |
|
|
124 | 117 |
return mdId; |
125 | 118 |
} |
126 | 119 |
|
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/model/Metadata.java | ||
---|---|---|
10 | 10 |
|
11 | 11 |
import com.google.common.collect.Lists; |
12 | 12 |
import com.google.common.collect.Maps; |
13 |
import org.springframework.data.mongodb.core.index.CompoundIndex; |
|
14 |
import org.springframework.data.mongodb.core.index.CompoundIndexes; |
|
13 | 15 |
import org.springframework.data.mongodb.core.index.Indexed; |
14 | 16 |
import org.springframework.data.mongodb.core.mapping.Document; |
15 | 17 |
|
16 | 18 |
@Document |
19 |
@CompoundIndexes({ |
|
20 |
@CompoundIndex(name = "fli_idx", def = "{'format': 1, 'layout': 1, 'interpretation': 1}") |
|
21 |
}) |
|
17 | 22 |
public class Metadata { |
18 | 23 |
|
19 | 24 |
@Indexed(unique = true) |
... | ... | |
64 | 69 |
* @return the expiring days |
65 | 70 |
*/ |
66 | 71 |
private long getExpiringDays(final ReadLock info) { |
67 |
final Date lastRead = info.getLastUpdateDate();
|
|
68 |
final LocalDate readDate = lastRead.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
|
|
72 |
final Long lastRead = info.getLastUpdate();
|
|
73 |
final LocalDate readDate = new Date(lastRead).toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
|
|
69 | 74 |
return Duration.between(LocalDate.now().atTime(0, 0), readDate.atTime(0, 0)).toDays(); |
70 | 75 |
} |
71 | 76 |
|
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/model/ReadLock.java | ||
---|---|---|
9 | 9 |
|
10 | 10 |
private String id; |
11 | 11 |
private int readCount; |
12 |
private Date lastUpdateDate;
|
|
12 |
private Long lastUpdate;
|
|
13 | 13 |
|
14 | 14 |
public static ReadLock create(final String id) { |
15 | 15 |
return new ReadLock() |
16 | 16 |
.setId(id) |
17 | 17 |
.setReadCount(0) |
18 |
.setLastUpdateDate(new Date());
|
|
18 |
.setLastUpdate(new Date().getTime());
|
|
19 | 19 |
} |
20 | 20 |
|
21 | 21 |
public ReadLock incrementReadCount() { |
... | ... | |
28 | 28 |
|
29 | 29 |
private ReadLock modifyReadCount(int v) { |
30 | 30 |
setReadCount(getReadCount() + v); |
31 |
setLastUpdateDate(new Date());
|
|
31 |
setLastUpdate(new Date().getTime());
|
|
32 | 32 |
return this; |
33 | 33 |
} |
34 | 34 |
|
35 | 35 |
/** |
36 | 36 |
* @return the lastRead |
37 | 37 |
*/ |
38 |
public Date getLastUpdateDate() {
|
|
39 |
return lastUpdateDate;
|
|
38 |
public Long getLastUpdate() {
|
|
39 |
return lastUpdate; |
|
40 | 40 |
} |
41 | 41 |
|
42 | 42 |
/** |
... | ... | |
51 | 51 |
return this; |
52 | 52 |
} |
53 | 53 |
|
54 |
public ReadLock setLastUpdateDate(final Date lastUpdateDate) {
|
|
55 |
this.lastUpdateDate = lastUpdateDate;
|
|
54 |
public ReadLock setLastUpdate(final Long lastUpdate) {
|
|
55 |
this.lastUpdate = lastUpdate;
|
|
56 | 56 |
return this; |
57 | 57 |
} |
58 | 58 |
|
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/model/Transaction.java | ||
---|---|---|
14 | 14 |
@Field("id") |
15 | 15 |
private String id; |
16 | 16 |
private Boolean refresh; |
17 |
private Date date;
|
|
17 |
private Long date;
|
|
18 | 18 |
|
19 | 19 |
public static Transaction create() { |
20 | 20 |
return new Transaction() |
21 |
.setDate(new Date()); |
|
21 |
.setDate(new Date().getTime());
|
|
22 | 22 |
} |
23 | 23 |
|
24 | 24 |
/** |
25 | 25 |
* @return the date |
26 | 26 |
*/ |
27 |
public Date getDate() {
|
|
27 |
public Long getDate() {
|
|
28 | 28 |
return date; |
29 | 29 |
} |
30 | 30 |
|
... | ... | |
52 | 52 |
return this; |
53 | 53 |
} |
54 | 54 |
|
55 |
public Transaction setDate(final Date date) {
|
|
55 |
public Transaction setDate(final Long date) {
|
|
56 | 56 |
this.date = date; |
57 | 57 |
return this; |
58 | 58 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/utils/MDStoreUtils.java | ||
---|---|---|
1 |
package eu.dnetlib.data.mdstore.utils; |
|
2 |
|
|
3 |
import java.util.Comparator; |
|
4 |
import java.util.Date; |
|
5 |
import java.util.function.Function; |
|
6 |
|
|
7 |
import com.mongodb.DBObject; |
|
8 |
import eu.dnetlib.data.mdstore.model.Transaction; |
|
9 |
|
|
10 |
/** |
|
11 |
* Created by claudio on 13/03/2017. |
|
12 |
*/ |
|
13 |
public class MDStoreUtils { |
|
14 |
|
|
15 |
public static Function<DBObject, String> mdId() { |
|
16 |
return arg -> (String) arg.get("mdId"); |
|
17 |
} |
|
18 |
|
|
19 |
public static Comparator<Transaction> getComparatorOnDate() { |
|
20 |
return (o1, o2) -> { |
|
21 |
final Date d1 = o1.getDate(); |
|
22 |
final Date d2 = o2.getDate(); |
|
23 |
return d1.compareTo(d2); |
|
24 |
}; |
|
25 |
} |
|
26 |
|
|
27 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/utils/RecordParser.java | ||
---|---|---|
45 | 45 |
.setBody(s) |
46 | 46 |
.setTimestamp(timestamp != null ? timestamp : new Date().getTime()); |
47 | 47 |
|
48 |
System.out.println(res.getTimestamp()); |
|
48 |
//System.out.println(res.getTimestamp());
|
|
49 | 49 |
|
50 | 50 |
while (parser.hasNext()) { |
51 | 51 |
int event = parser.next(); |
Also available in: Unified diff
using long to store dates, cleanup