Project

General

Profile

1
package eu.dnetlib.datasource.publisher.clients;
2

    
3
import java.util.List;
4

    
5
import com.google.common.base.Function;
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8
import com.google.common.primitives.Ints;
9
import com.mongodb.BasicDBObject;
10
import com.mongodb.MongoClient;
11
import com.mongodb.client.FindIterable;
12
import com.mongodb.client.MongoCollection;
13
import com.mongodb.client.model.Filters;
14
import com.mongodb.client.model.Projections;
15
import eu.dnetlib.datasource.publisher.DatasourceException;
16
import eu.dnetlib.datasource.publisher.model.AggregationInfo;
17
import eu.dnetlib.datasource.publisher.model.AggregationStage;
18
import eu.dnetlib.datasource.publisher.model.CollectionMode;
19
import eu.dnetlib.datasource.publisher.model.DatasourceFunctions;
20
import eu.dnetlib.miscutils.datetime.DateUtils;
21
import org.apache.commons.lang.StringUtils;
22
import org.apache.commons.lang.time.DateFormatUtils;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.bson.Document;
26
import org.bson.conversions.Bson;
27
import org.springframework.beans.factory.annotation.Autowired;
28
import org.springframework.beans.factory.annotation.Value;
29
import org.springframework.cache.annotation.CacheEvict;
30
import org.springframework.cache.annotation.Cacheable;
31

    
32
/**
33
 * Created by claudio on 20/10/2016.
34
 */
35
public class MongoLoggerClient {
36

    
37
	private static final Log log = LogFactory.getLog(MongoLoggerClient.class);
38

    
39
	@Autowired
40
	private MongoClient datasourcePublisherMongoClient;
41
	@Value("${datasource.publisher.mongodb.collection}")
42
	private String collectionName;
43
	@Value("${datasource.publisher.mongodb.db}")
44
	private String dbName;
45
	@Value("${datasource.publisher.mongodb.query.limit}")
46
	private int limit;
47

    
48
	private static Bson fields = getFields();
49

    
50
	private static MongoCollection<Document> collection = null;
51

    
52
	private Bson getQuery(final String dsId) {
53
		return Filters.and(dbo("parentDatasourceId", dsId), dbo("system:profileFamily", "aggregator"), dbo("system:isCompletedSuccessfully", "true"));
54
	}
55

    
56
	private synchronized MongoCollection<Document> getCollection() {
57
		if (collection == null) {
58
			collection = datasourcePublisherMongoClient.getDatabase(dbName).getCollection(collectionName);
59
		}
60
		return collection;
61
	}
62

    
63
	@Cacheable("datasources-mongo-cache")
64
	public List<AggregationInfo> getAggregationHistory(final String dsId) throws DatasourceException {
65

    
66
		log.warn("getAggregationHistory(): not using cache");
67

    
68
		final Bson query = getQuery(dsId);
69

    
70
		final FindIterable<Document> iterable = getCollection().find(query).projection(fields).limit(limit).sort(dbo("system:startHumanDate", -1));
71
		final Iterable<AggregationInfo> transform = Iterables.transform(iterable, new Function<Document, AggregationInfo>() {
72

    
73
			@Override
74
			public AggregationInfo apply(final Document d) {
75
				final AggregationInfo info = new AggregationInfo();
76
				info.setAggregationStage(AggregationStage.parse(d.getString("system:wfName")));
77
				if (AggregationStage.COLLECT.equals(info.getAggregationStage())) {
78
					info.setCollectionMode(CollectionMode.valueOf(d.getString("system:node:SELECT_MODE:selection")));
79
				}
80

    
81
				info.setNumberOfRecords(getNumberOfRecords(d));
82
				info.setDate(getDate(d));
83
				return info;
84
			}
85

    
86
			private String getDate(final Document d) {
87
				return DateFormatUtils.format(new DateUtils().parse(d.getString("system:startHumanDate")), DatasourceFunctions.DATE_FORMAT);
88
			}
89

    
90
			private Integer getNumberOfRecords(final Document d) {
91
				final String sinkSize = d.getString("mainlog:sinkSize");
92
				final String total = d.getString("mainlog:total");
93

    
94
				if(StringUtils.isNotBlank(sinkSize)) {
95
					return Ints.tryParse(sinkSize);
96
				} else if(StringUtils.isNotBlank(total)) {
97
					return Ints.tryParse(total);
98
				} else {
99
					return -1;
100
				}
101
			}
102
		});
103

    
104
		return Lists.newArrayList(transform);
105
	}
106

    
107
	private static Bson getFields() {
108
		return Projections.fields(dbo("system:wfName", 1),
109
				dbo("system:node:SELECT_MODE:selection", 1),
110
				dbo("mainlog:sinkSize", 1),
111
				dbo("mainlog:writeOps", 1),
112
				dbo("mainlog:total", 1),
113
				dbo("system:startHumanDate", 1),
114
				dbo("system:profileName", 1));
115
	}
116

    
117
	private static BasicDBObject dbo(final String key, final Object value) {
118
		return new BasicDBObject(key, value);
119
	}
120

    
121

    
122
}
(5-5/6)