Project

General

Profile

1
package eu.dnetlib.datasource.publisher.clients;
2

    
3
import java.util.List;
4

    
5
import com.google.common.base.Function;
6
import com.google.common.base.Predicate;
7
import com.google.common.collect.Iterables;
8
import com.google.common.collect.Lists;
9
import com.google.common.primitives.Ints;
10
import com.mongodb.BasicDBObject;
11
import com.mongodb.MongoClient;
12
import com.mongodb.client.FindIterable;
13
import com.mongodb.client.MongoCollection;
14
import com.mongodb.client.model.Projections;
15
import eu.dnetlib.datasource.publisher.ApiException;
16
import eu.dnetlib.datasource.publisher.clients.utils.DatasourceFunctions;
17
import eu.dnetlib.datasource.publisher.model.*;
18
import eu.dnetlib.miscutils.datetime.DateUtils;
19
import org.apache.commons.lang.StringUtils;
20
import org.apache.commons.lang.time.DateFormatUtils;
21
import org.apache.commons.logging.Log;
22
import org.apache.commons.logging.LogFactory;
23
import org.bson.Document;
24
import org.bson.conversions.Bson;
25
import org.springframework.beans.factory.annotation.Autowired;
26
import org.springframework.beans.factory.annotation.Value;
27
import org.springframework.cache.annotation.Cacheable;
28

    
29
import static com.mongodb.client.model.Filters.*;
30

    
31

    
32

    
33
/**
34
 * Created by claudio on 20/10/2016.
35
 */
36
public class MongoLoggerClient {
37

    
38
	private static final Log log = LogFactory.getLog(MongoLoggerClient.class);
39

    
40
	@Autowired
41
	private MongoClient datasourcePublisherMongoClient;
42
	@Value("${datasource.publisher.mongodb.collection}")
43
	private String collectionName;
44
	@Value("${datasource.publisher.mongodb.db}")
45
	private String dbName;
46
	@Value("${datasource.publisher.mongodb.query.limit}")
47
	private int limit;
48

    
49
	private static Bson fields = getFields();
50

    
51
	private static MongoCollection<Document> collection = null;
52

    
53
	private Bson getQuery(final String dsId) {
54
		return and(
55
				eq("parentDatasourceId", dsId),
56
				eq("system:profileFamily", "aggregator"),
57
				eq("system:isCompletedSuccessfully", "true"));
58
	}
59

    
60
	private synchronized MongoCollection<Document> getCollection() {
61
		if (collection == null) {
62
			log.info("inizializing mongodb collection ...");
63
			collection = datasourcePublisherMongoClient.getDatabase(dbName).getCollection(collectionName);
64
		}
65
		return collection;
66
	}
67

    
68
	@Cacheable("datasources-mongo-cache")
69
	public List<AggregationInfo> getAggregationHistory(final String dsId) throws ApiException {
70

    
71
		log.warn("getAggregationHistory(): not using cache");
72

    
73
		final Bson query = getQuery(dsId);
74

    
75
		final FindIterable<Document> iterable = getCollection().find(query).projection(fields).limit(limit).sort(dbo("system:startHumanDate", -1));
76
		final Iterable<AggregationInfo> transform = Iterables.transform(iterable, new Function<Document, AggregationInfo>() {
77

    
78
			@Override
79
			public AggregationInfo apply(final Document d) {
80

    
81
				AggregationInfo info = null;
82
				final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
83
				switch (stage) {
84

    
85
				case COLLECT:
86
					CollectionInfo cInfo = new CollectionInfo();
87
					cInfo.setAggregationStage(stage);
88
					final String collectionMode = d.getString("system:node:SELECT_MODE:selection");
89
					cInfo.setCollectionMode(StringUtils.isNotBlank(collectionMode) ? CollectionMode.valueOf(collectionMode) : null);
90
					cInfo.setNumberOfRecords(getNumberOfRecords(d));
91
					cInfo.setDate(getDate(d));
92
					info = cInfo;
93
					break;
94
				case TRANSFORM:
95
					TransformationInfo tInfo = new TransformationInfo();
96
					tInfo.setAggregationStage(stage);
97
					tInfo.setNumberOfRecords(getNumberOfRecords(d));
98
					tInfo.setDate(getDate(d));
99
					info = tInfo;
100
					break;
101
				}
102
				return info;
103
			}
104

    
105
			private String getDate(final Document d) {
106
				final String dateString = d.getString("system:startHumanDate");
107
				if (StringUtils.isBlank(dateString)) {
108
					return "";
109
				}
110
				return DateFormatUtils.format(new DateUtils().parse(dateString), DatasourceFunctions.DATE_FORMAT);
111
			}
112

    
113
			private Integer getNumberOfRecords(final Document d) {
114
				final String sinkSize = d.getString("mainlog:sinkSize");
115
				final String total = d.getString("mainlog:total");
116

    
117
				if(StringUtils.isNotBlank(sinkSize)) {
118
					return Ints.tryParse(sinkSize);
119
				} else if(StringUtils.isNotBlank(total)) {
120
					return Ints.tryParse(total);
121
				} else {
122
					return -1;
123
				}
124
			}
125
		});
126

    
127
		final Iterable<AggregationInfo> filter = Iterables.filter(transform, new Predicate<AggregationInfo>() {
128
			@Override
129
			public boolean apply(final AggregationInfo ai) {
130
				return ai.getNumberOfRecords() >= 0 && StringUtils.isNotBlank(ai.getDate());
131
			}
132
		});
133

    
134
		return Lists.newArrayList(filter);
135
	}
136

    
137
	private static Bson getFields() {
138
		return Projections.fields(
139
				eq("system:wfName", 1),
140
				eq("system:node:SELECT_MODE:selection", 1),
141
				eq("mainlog:sinkSize", 1),
142
				eq("mainlog:writeOps", 1),
143
				eq("mainlog:total", 1),
144
				eq("system:startHumanDate", 1),
145
				eq("system:profileName", 1));
146
	}
147

    
148
	private static BasicDBObject dbo(final String key, final Object value) {
149
		return new BasicDBObject(key, value);
150
	}
151

    
152

    
153
}
(6-6/7)