Project

General

Profile

1
package eu.dnetlib.datasource.publisher.clients;
2

    
3
import java.util.List;
4
import java.util.function.Function;
5
import java.util.stream.Collectors;
6

    
7
import com.google.common.primitives.Ints;
8
import com.mongodb.BasicDBObject;
9
import com.mongodb.MongoClient;
10
import com.mongodb.client.MongoCollection;
11
import com.mongodb.client.model.Projections;
12
import eu.dnetlib.datasource.publisher.ApiException;
13
import eu.dnetlib.datasource.publisher.clients.utils.DatasourceFunctions;
14
import eu.dnetlib.datasource.publisher.model.*;
15
import eu.dnetlib.datasource.publisher.utils.Utils;
16
import eu.dnetlib.miscutils.datetime.DateUtils;
17
import org.apache.commons.lang.StringUtils;
18
import org.apache.commons.lang.time.DateFormatUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.bson.Document;
22
import org.bson.conversions.Bson;
23
import org.springframework.beans.factory.annotation.Autowired;
24
import org.springframework.beans.factory.annotation.Value;
25
import org.springframework.cache.annotation.Cacheable;
26

    
27
import static com.mongodb.client.model.Filters.and;
28
import static com.mongodb.client.model.Filters.eq;
29

    
30
/**
31
 * Created by claudio on 20/10/2016.
32
 */
33
public class MongoLoggerClient {
34

    
35
	private static final Log log = LogFactory.getLog(MongoLoggerClient.class);
36

    
37
	@Autowired
38
	private MongoClient datasourcePublisherMongoClient;
39
	@Value("${datasource.publisher.mongodb.collection}")
40
	private String collectionName;
41
	@Value("${datasource.publisher.mongodb.db}")
42
	private String dbName;
43
	@Value("${datasource.publisher.mongodb.query.limit}")
44
	private int limit;
45

    
46
	private static Bson fields = getFields();
47

    
48
	private static MongoCollection<Document> collection = null;
49

    
50
	private Bson getQuery(final String dsId) {
51
		return and(
52
				eq("parentDatasourceId", dsId),
53
				eq("system:profileFamily", "aggregator"),
54
				eq("system:isCompletedSuccessfully", "true"));
55
	}
56

    
57
	private synchronized MongoCollection<Document> getCollection() {
58
		if (collection == null) {
59
			log.info("inizializing mongodb collection ...");
60
			collection = datasourcePublisherMongoClient.getDatabase(dbName).getCollection(collectionName);
61
		}
62
		return collection;
63
	}
64

    
65
	@Cacheable("datasources-mongo-cache")
66
	public List<AggregationInfo> getAggregationHistory(final String dsId) throws ApiException {
67

    
68
		log.warn("getAggregationHistory(): not using cache");
69

    
70
		final Bson query = getQuery(dsId);
71

    
72
		return Utils.stream(getCollection().find(query).projection(fields).limit(limit).sort(dbo("system:startHumanDate", -1)).iterator())
73
				.map(getMapper())
74
				.filter(ai -> ai.getNumberOfRecords() >= 0 && StringUtils.isNotBlank(ai.getDate()))
75
				.collect(Collectors.toList());
76
	}
77

    
78
	private Function<Document, AggregationInfo> getMapper() {
79
		return new java.util.function.Function<Document, AggregationInfo>() {
80

    
81
			@Override
82
			public AggregationInfo apply(final Document d) {
83

    
84
				AggregationInfo info = null;
85
				final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
86
				switch (stage) {
87

    
88
				case COLLECT:
89
					CollectionInfo cInfo = new CollectionInfo();
90
					cInfo.setAggregationStage(stage);
91
					final String collectionMode = d.getString("system:node:SELECT_MODE:selection");
92
					cInfo.setCollectionMode(StringUtils.isNotBlank(collectionMode) ? CollectionMode.valueOf(collectionMode) : null);
93
					cInfo.setNumberOfRecords(getNumberOfRecords(d));
94
					cInfo.setDate(getDate(d));
95
					info = cInfo;
96
					break;
97
				case TRANSFORM:
98
					TransformationInfo tInfo = new TransformationInfo();
99
					tInfo.setAggregationStage(stage);
100
					tInfo.setNumberOfRecords(getNumberOfRecords(d));
101
					tInfo.setDate(getDate(d));
102
					info = tInfo;
103
					break;
104
				}
105
				return info;
106
			}
107

    
108
			private String getDate(final Document d) {
109
				final String dateString = d.getString("system:startHumanDate");
110
				if (StringUtils.isBlank(dateString)) {
111
					return "";
112
				}
113
				return DateFormatUtils.format(new DateUtils().parse(dateString), DatasourceFunctions.DATE_FORMAT);
114
			}
115

    
116
			private Integer getNumberOfRecords(final Document d) {
117
				final String sinkSize = d.getString("mainlog:sinkSize");
118
				final String total = d.getString("mainlog:total");
119

    
120
				if(StringUtils.isNotBlank(sinkSize)) {
121
					return Ints.tryParse(sinkSize);
122
				} else if(StringUtils.isNotBlank(total)) {
123
					return Ints.tryParse(total);
124
				} else {
125
					return -1;
126
				}
127
			}
128
		};
129
	}
130

    
131
	private static Bson getFields() {
132
		return Projections.fields(
133
				eq("system:wfName", 1),
134
				eq("system:node:SELECT_MODE:selection", 1),
135
				eq("mainlog:sinkSize", 1),
136
				eq("mainlog:writeOps", 1),
137
				eq("mainlog:total", 1),
138
				eq("system:startHumanDate", 1),
139
				eq("system:profileName", 1));
140
	}
141

    
142
	private static BasicDBObject dbo(final String key, final Object value) {
143
		return new BasicDBObject(key, value);
144
	}
145

    
146

    
147
}
(6-6/7)