Project

General

Profile

1
package eu.dnetlib.data.mdstore.mongo;
2

    
3
import java.util.Arrays;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Set;
7
import java.util.stream.Collectors;
8
import java.util.stream.Stream;
9
import javax.annotation.PostConstruct;
10

    
11
import com.google.common.collect.Lists;
12
import com.google.common.collect.Sets;
13
import com.mongodb.BasicDBObject;
14
import com.mongodb.WriteConcern;
15
import com.mongodb.client.MongoCollection;
16
import com.mongodb.client.MongoDatabase;
17
import com.mongodb.client.model.IndexOptions;
18
import com.mongodb.client.model.UpdateOptions;
19
import eu.dnetlib.data.mdstore.MDStore;
20
import eu.dnetlib.data.mdstore.MDStoreService;
21
import eu.dnetlib.data.mdstore.model.MDStoreRecord;
22
import eu.dnetlib.data.mdstore.model.Metadata;
23
import eu.dnetlib.data.mdstore.model.Transaction;
24
import eu.dnetlib.data.mdstore.repository.MetadataRepository;
25
import eu.dnetlib.data.mdstore.utils.MDStoreException;
26
import eu.dnetlib.data.mdstore.utils.MDStoreProfileManager;
27
import eu.dnetlib.data.mdstore.utils.MDStoreRuntimeException;
28
import eu.dnetlib.data.mdstore.utils.RecordParser;
29
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
30
import org.apache.commons.lang3.StringUtils;
31
import org.apache.commons.logging.Log;
32
import org.apache.commons.logging.LogFactory;
33
import org.springframework.beans.factory.annotation.Autowired;
34
import org.springframework.beans.factory.annotation.Value;
35
import org.springframework.stereotype.Component;
36

    
37
import static com.mongodb.client.model.Filters.eq;
38
import static eu.dnetlib.data.mdstore.utils.MDStoreConstants.DISCARDED_PREFIX;
39
import static eu.dnetlib.data.mdstore.utils.MDStoreConstants.ID;
40

    
41
/**
42
 * Created by claudio on 24/03/2017.
43
 */
44
@Component
45
public class MongoMDStoreService implements MDStoreService {
46

    
47
	private static final Log log = LogFactory.getLog(MongoMDStoreService.class);
48

    
49
	/**
50
	 * The max number of concurrent transactions per mdstore.
51
	 */
52
	private int maxTransactions = 5;
53

    
54
	/**
55
	 * The expired days.
56
	 */
57
	private int expiredDays = 3;
58

    
59
	@Autowired
60
	private MDStoreProfileManager profileManager;
61

    
62
	@Autowired
63
	private MetadataRepository metadata;
64

    
65
	@Autowired
66
	private MongoDatabase mongoDatabase;
67

    
68
	@Value("#{'${msro.worker.mdstore.skip.prefixes}'.split(',')}")
69
	private List<String> skipPrefixes = Lists.newArrayList("resolved", "metadata");
70

    
71
	/**
72
	 * Bootstrap manager.
73
	 */
74
	@PostConstruct
75
	private void init() {
76
		log.info("Bootstrap mdstore ...");
77

    
78
		//drop all the stale expiring and the transactions mdstores
79
		metadata.findAll().forEach(d -> {
80
			metadata.bootstrap(d.getMdId());
81
			log.debug(String.format("Added %s to Metadata Manager data structure", d.getMdId()));
82
		});
83

    
84
		final Set<String> metadataMdIds = Sets.newHashSet(metadata.listMdIds());
85

    
86
		//delete from metadataManager all the mdId(s) that are not in metadata
87
		Sets.difference(
88
				Sets.newHashSet(metadata.listMdIds()),
89
				metadataMdIds).forEach(mdId -> metadata.delete(mdId));
90

    
91
		//delete all the collections that are not referred by metadata.currentId(s)
92
		Sets.difference(
93
				Sets.newHashSet(metadata.listCurrentIds()),
94
				metadataMdIds).forEach(name -> {
95
						log.info(String.format("dropping collection %s", name));
96
						mongoDatabase.getCollection(name).drop();
97
					});
98

    
99
		log.info("Bootstrap mdstore complete!");
100
	}
101

    
102
	@Override
103
	public String create(final String format, final String layout, final String interpretation) throws MDStoreException {
104
		log.debug("Creating new mdstore");
105

    
106
		final String mdId = getProfileManager().registerProfile(format, layout, interpretation);
107
		metadata.save(Metadata.create()
108
				.setMdId(mdId)
109
				.setCurrentId(mdId)
110
				.setFormat(format)
111
				.setLayout(layout)
112
				.setInterpretation(interpretation));
113
		mongoDatabase.createCollection(mdId);
114
		((MongoMDStore) getMDStore(mdId)).ensureIndices();
115

    
116
		log.info("Created new mdstore: " + mdId);
117
		return mdId;
118
	}
119

    
120
	@Override
121
	public void delete(final String mdId) throws MDStoreException {
122
		if (!metadata.exists(mdId)) {
123
			throw new MDStoreException(String.format("cannot find %s in transaction manager table", mdId));
124
		}
125
		metadata.delete(mdId);
126
		getMDStore(mdId).truncate();
127
		getProfileManager().deleteProfile(mdId);
128
		log.info("deleted mdId: " + mdId);
129
	}
130

    
131
	@Override
132
	public Stream<MDStoreRecord> stream(final String mdId) throws MDStoreException {
133
		return stream(mdId, null, null);
134
	}
135

    
136
	@Override
137
	public Stream<MDStoreRecord> stream(final String mdId, final String from, final String until) throws MDStoreException {
138
		return stream(mdId, from, until, null);
139
	}
140

    
141
	@Override
142
	public Stream<MDStoreRecord> stream(final String mdId, final String from, final String until, final String recordFilter) throws MDStoreException {
143
		return getMDStore(mdId).deliver(from, until, recordFilter);
144
	}
145

    
146
	@Override
147
	public Stream<MDStoreRecord> bulkStream(final String format, final String layout, final String interpretation) throws MDStoreException {
148
		return list(format, layout, interpretation).stream()
149
				.map(mdId -> {
150
					log.debug("bulk deliver of mdId: " + mdId);
151
					try {
152
						return getMDStore(mdId).deliver();
153
					} catch (MDStoreException e) {
154
						throw new MDStoreRuntimeException(e);
155
					}
156
				}).reduce(Stream::concat).orElseGet(Stream::empty);
157
	}
158

    
159
	@Override
160
	public MDStoreRecord findOne(final String mdId, final String recordId) throws MDStoreException {
161
		return getMDStore(mdId).getRecord(recordId);
162
	}
163

    
164
	@Override
165
	public List<String> list() throws MDStoreException {
166
		return DnetStreamSupport.stream(metadata.findAll().iterator())
167
			.map(m -> m.getMdId())
168
			.collect(Collectors.toList());
169
	}
170

    
171
	@Override
172
	public List<String> list(final String format, final String layout, final String interpretation) throws MDStoreException {
173
		return metadata.findByFormatAndLayoutAndInterpretation(format, layout, interpretation).stream()
174
				.map(m -> m.getMdId())
175
				.collect(Collectors.toList());
176
	}
177

    
178
	@Override
179
	public long size(final String mdId) throws MDStoreException {
180
		return getMDStore(mdId).getSize();
181
	}
182

    
183
	@Override
184
	public long size(final String format, final String layout, final String interpretation) {
185
		return metadata.sumOfSize(format, layout, interpretation);
186
	}
187

    
188
	@Override
189
	public void store(final String mdId, final Stream<String> records, final FeedMode feedMode) throws MDStoreException {
190
		final boolean refresh = FeedMode.REFRESH.equals(feedMode);
191
		final String txId = startTransaction(mdId, refresh);
192
		final MDStore mdstore = getMDStore(mdId, txId);
193
		try {
194
			if (refresh) {
195
				mdstore.truncate();
196
			}
197
			mdstore.feed(records.map(new RecordParser()));
198

    
199
			final Metadata m = metadata.findByMdId(mdId);
200

    
201
			final Map<String, Transaction> txs = m.getTransactionMap();
202
			if (!txs.containsKey(txId)) {
203
				throw new MDStoreException("Error, unable to find transaction with Id " + txId);
204
			}
205

    
206
			if (txs.get(txId).getRefresh()) {
207
				if (!m.isCurrentIdExpired(expiredDays)) {
208
					doDrop(m.getCurrentId());
209
				}
210
				m.setCurrentId(txId);
211

    
212
				log.debug(String.format("commit refresh: replaced collection %s with %s", mdId, txId));
213
			} else {
214
				updateIncremental(txId, m.getCurrentId());
215
				doDrop(txId);
216
				log.debug(String.format("commit incremental: "));
217
			}
218
			final long size = getRecordCollection(m.getCurrentId()).count();
219
			metadata.commit(m.setSize(size), txId);
220
			getProfileManager().updateSize(mdId, size);
221
			log.info("Finished feeding mdstore " + mdId + " - new size: " + size);
222
		} catch (Throwable e) {
223
			if (txId != null) {
224
				log.error("Dropping transaction : " + txId);
225
				metadata.dropTransaction(mdId, txId);
226
			}
227
			throw new MDStoreException("Error feeding mdstore: " + mdId, e);
228
		}
229
	}
230

    
231
	/// helpers
232
	private String startTransaction(final String mdId, final boolean refresh) throws MDStoreException {
233
		log.info("Start transaction for metadata store " + mdId);
234

    
235
		final Metadata m = metadata.findByMdId(mdId);
236

    
237
		if (m.getTransactions().size() > getMaxTransactions()) {
238
			throw new MDStoreException(
239
					String.format("Cannot create more than %s transactions, found: %s, mdId: %s", getMaxTransactions(), m.getTransactions().size(), mdId));
240
		}
241

    
242
		final String txId = mdId + "::" + System.currentTimeMillis();
243
		metadata.addTransaction(mdId,
244
				Transaction.create()
245
					.setId(txId)
246
					.setRefresh(refresh));
247

    
248
		return txId;
249
	}
250

    
251
	/**
252
	 * Update incremental.
253
	 *
254
	 * @param transactionId the transaction id
255
	 * @param currentId     the current id
256
	 */
257
	private void updateIncremental(final String transactionId, final String currentId) {
258
		final MongoCollection<MDStoreRecord> source = mongoDatabase.getCollection(transactionId, MDStoreRecord.class);
259
		final MongoCollection<MDStoreRecord> destination = mongoDatabase.getCollection(currentId, MDStoreRecord.class)
260
				.withWriteConcern(WriteConcern.JOURNALED);
261

    
262
		//TODO reimplement using bulk writes on the destination collection
263
		for (MDStoreRecord record : source.find().batchSize(100).noCursorTimeout(true)) {
264

    
265
			final MDStoreRecord r = record.clone();
266
			if (StringUtils.isNotBlank(r.getId())) {
267
				destination.replaceOne(eq(ID, r.getId()), r, new UpdateOptions().upsert(true));
268
			}
269
		}
270
	}
271

    
272
	private MDStore getMDStore(final String mdId, final String currentId) {
273
		return new MongoMDStore(
274
				mdId,
275
				getRecordCollection(currentId),
276
				getRecordCollection(DISCARDED_PREFIX + currentId),
277
				metadata);
278
	}
279

    
280
	private MDStore getMDStore(final String id) {
281
		final Metadata m = metadata.findByMdId(id);
282
		return getMDStore(id, m.getCurrentId());
283
	}
284

    
285
	private void doDrop(final String mdId) {
286
		mongoDatabase.getCollection(mdId).drop();
287
		mongoDatabase.getCollection(DISCARDED_PREFIX + mdId).drop();
288
		log.debug("deleted collection " + mdId);
289
	}
290

    
291
	private MongoCollection<MDStoreRecord> getRecordCollection(final String collectionId) {
292
		return mongoDatabase.getCollection(collectionId, MDStoreRecord.class);
293
	}
294

    
295
	private void createIndex(final String fieldName, final MongoCollection<?> collection) {
296
		Arrays.asList(collection).forEach(c -> {
297
			log.info(String.format("Create index in %s", c.getNamespace().getCollectionName()));
298
			c.createIndex(
299
					new BasicDBObject(fieldName, 1),
300
					new IndexOptions().background(true));
301
		});
302
	}
303

    
304
	public int getMaxTransactions() {
305
		return maxTransactions;
306
	}
307

    
308
	public void setMaxTransactions(final int maxTransactions) {
309
		this.maxTransactions = maxTransactions;
310
	}
311

    
312
	public int getExpiredDays() {
313
		return expiredDays;
314
	}
315

    
316
	public void setExpiredDays(final int expiredDays) {
317
		this.expiredDays = expiredDays;
318
	}
319

    
320
	public MDStoreProfileManager getProfileManager() {
321
		return profileManager;
322
	}
323

    
324
	public void setProfileManager(final MDStoreProfileManager profileManager) {
325
		this.profileManager = profileManager;
326
	}
327
}
(5-5/5)