Project

General

Profile

« Previous | Next » 

Revision 46532

using long to store dates, cleanup

View differences:

modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/mongo/MongoMDStore.java
13 13
import com.mongodb.client.ListIndexesIterable;
14 14
import com.mongodb.client.MongoCollection;
15 15
import com.mongodb.client.MongoCursor;
16
import com.mongodb.client.model.IndexOptions;
16 17
import eu.dnetlib.data.mdstore.MDStore;
17 18
import eu.dnetlib.data.mdstore.model.MDStoreRecord;
18 19
import eu.dnetlib.data.mdstore.model.ReadLock;
......
59 60
		final BlockingQueue<MDStoreRecord> queue = new ArrayBlockingQueue<>(BULK_SIZE);
60 61
		final MDStoreRecord sentinel = new MDStoreRecord();
61 62
		int countStored = 0;
62
		final Callable<Integer> writer = () -> {
63
			final MongoBulkWritesManager bulkWritesManager =
63
		final Future<Integer> count = Executors.newSingleThreadExecutor().submit(() -> {
64
			final MongoBulkWritesManager writer =
64 65
					new MongoBulkWritesManager(recordCollection, discardedCollection, BULK_SIZE);
65
			int count = 0;
66
			int counter = 0;
66 67
			while (true) {
67 68
				try {
68
					final MDStoreRecord MDStoreRecord = queue.take();
69
					if (MDStoreRecord.equals(sentinel)) {
70
						bulkWritesManager.flushBulks();
69
					final MDStoreRecord record = queue.take();
70
					if (record.equals(sentinel)) {
71
						writer.flushBulks();
71 72
						break;
72 73
					}
73
					count++;
74
					bulkWritesManager.insert(MDStoreRecord);
74
					counter++;
75
					writer.insert(record);
75 76
				} catch (final InterruptedException e) {
76 77
					log.fatal("got exception in background thread", e);
77 78
					throw new IllegalStateException(e);
78 79
				}
79 80
			}
80
			log.debug(String.format("extracted %s records from feeder queue", count));
81
			return count;
82
		};
81
			log.debug(String.format("extracted %s records from feeder queue", counter));
82
			return counter;
83
		});
83 84

  
84
		final Future<Integer> count = Executors.newSingleThreadExecutor().submit(writer);
85

  
86 85
		try {
87 86
			log.info("feeding mdstore " + id);
88 87
			records.forEach(r -> {
......
107 106
	}
108 107

  
109 108
	public void ensureIndices() {
110
		indexes.forEach(key -> recordCollection.createIndex(new BasicDBObject(key, 1)));
109
		final IndexOptions op = new IndexOptions().background(true);
110
		indexes.forEach(key -> recordCollection.createIndex(new BasicDBObject(key, 1), op));
111 111
	}
112 112

  
113 113
	public boolean isIndexed() {
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/mongo/MongoMDStoreService.java
96 96
						mongoDatabase.getCollection(name).drop();
97 97
					});
98 98

  
99
		//TODO check repositories has proper indexes
100
		//createIndex(MDID, getTxManager());
101

  
102 99
		log.info("Bootstrap mdstore complete!");
103 100
	}
104 101

  
......
107 104
		log.debug("Creating new mdstore");
108 105

  
109 106
		final String mdId = getProfileManager().registerProfile(format, layout, interpretation);
110

  
111
		final Metadata m = Metadata.create()
107
		metadata.save(Metadata.create()
112 108
				.setMdId(mdId)
113 109
				.setCurrentId(mdId)
114 110
				.setFormat(format)
115 111
				.setLayout(layout)
116
				.setInterpretation(interpretation);
117
		metadata.save(m);
112
				.setInterpretation(interpretation));
113
		mongoDatabase.createCollection(mdId);
114
		((MongoMDStore) getMDStore(mdId)).ensureIndices();
118 115

  
119
		mongoDatabase.createCollection(m.getCurrentId());
120
		createIndex(ID, mongoDatabase.getCollection(m.getCurrentId()));
121

  
122 116
		log.info("Created new mdstore: " + mdId);
123

  
124 117
		return mdId;
125 118
	}
126 119

  
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/model/Metadata.java
10 10

  
11 11
import com.google.common.collect.Lists;
12 12
import com.google.common.collect.Maps;
13
import org.springframework.data.mongodb.core.index.CompoundIndex;
14
import org.springframework.data.mongodb.core.index.CompoundIndexes;
13 15
import org.springframework.data.mongodb.core.index.Indexed;
14 16
import org.springframework.data.mongodb.core.mapping.Document;
15 17

  
16 18
@Document
19
@CompoundIndexes({
20
		@CompoundIndex(name = "fli_idx", def = "{'format': 1, 'layout': 1, 'interpretation': 1}")
21
})
17 22
public class Metadata {
18 23

  
19 24
	@Indexed(unique = true)
......
64 69
	 * @return the expiring days
65 70
	 */
66 71
	private long getExpiringDays(final ReadLock info) {
67
		final Date lastRead = info.getLastUpdateDate();
68
		final LocalDate readDate = lastRead.toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
72
		final Long lastRead = info.getLastUpdate();
73
		final LocalDate readDate = new Date(lastRead).toInstant().atZone(ZoneId.systemDefault()).toLocalDate();
69 74
		return Duration.between(LocalDate.now().atTime(0, 0), readDate.atTime(0, 0)).toDays();
70 75
	}
71 76

  
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/model/ReadLock.java
9 9

  
10 10
	private String id;
11 11
	private int readCount;
12
	private Date lastUpdateDate;
12
	private Long lastUpdate;
13 13

  
14 14
	public static ReadLock create(final String id) {
15 15
		return new ReadLock()
16 16
				.setId(id)
17 17
				.setReadCount(0)
18
				.setLastUpdateDate(new Date());
18
				.setLastUpdate(new Date().getTime());
19 19
	}
20 20

  
21 21
	public ReadLock incrementReadCount() {
......
28 28

  
29 29
	private ReadLock modifyReadCount(int v) {
30 30
		setReadCount(getReadCount() + v);
31
		setLastUpdateDate(new Date());
31
		setLastUpdate(new Date().getTime());
32 32
		return this;
33 33
	}
34 34

  
35 35
	/**
36 36
	 * @return the lastRead
37 37
	 */
38
	public Date getLastUpdateDate() {
39
		return lastUpdateDate;
38
	public Long getLastUpdate() {
39
		return lastUpdate;
40 40
	}
41 41

  
42 42
	/**
......
51 51
		return this;
52 52
	}
53 53

  
54
	public ReadLock setLastUpdateDate(final Date lastUpdateDate) {
55
		this.lastUpdateDate = lastUpdateDate;
54
	public ReadLock setLastUpdate(final Long lastUpdate) {
55
		this.lastUpdate = lastUpdate;
56 56
		return this;
57 57
	}
58 58

  
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/model/Transaction.java
14 14
	@Field("id")
15 15
	private String id;
16 16
	private Boolean refresh;
17
	private Date date;
17
	private Long date;
18 18

  
19 19
	public static Transaction create() {
20 20
		return new Transaction()
21
				.setDate(new Date());
21
				.setDate(new Date().getTime());
22 22
	}
23 23

  
24 24
	/**
25 25
	 * @return the date
26 26
	 */
27
	public Date getDate() {
27
	public Long getDate() {
28 28
		return date;
29 29
	}
30 30

  
......
52 52
		return this;
53 53
	}
54 54

  
55
	public Transaction setDate(final Date date) {
55
	public Transaction setDate(final Long date) {
56 56
		this.date = date;
57 57
		return this;
58 58
	}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/utils/MDStoreUtils.java
1
package eu.dnetlib.data.mdstore.utils;
2

  
3
import java.util.Comparator;
4
import java.util.Date;
5
import java.util.function.Function;
6

  
7
import com.mongodb.DBObject;
8
import eu.dnetlib.data.mdstore.model.Transaction;
9

  
10
/**
11
 * Created by claudio on 13/03/2017.
12
 */
13
public class MDStoreUtils {
14

  
15
	public static Function<DBObject, String> mdId() {
16
		return arg -> (String) arg.get("mdId");
17
	}
18

  
19
	public static Comparator<Transaction> getComparatorOnDate() {
20
		return (o1, o2) -> {
21
			final Date d1 = o1.getDate();
22
			final Date d2 = o2.getDate();
23
			return d1.compareTo(d2);
24
		};
25
	}
26

  
27
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/data/mdstore/utils/RecordParser.java
45 45
					.setBody(s)
46 46
					.setTimestamp(timestamp != null ? timestamp : new Date().getTime());
47 47

  
48
			System.out.println(res.getTimestamp());
48
			//System.out.println(res.getTimestamp());
49 49

  
50 50
			while (parser.hasNext()) {
51 51
				int event = parser.next();

Also available in: Unified diff