Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource.clients;
2

    
3
import static com.mongodb.client.model.Filters.and;
4
import static com.mongodb.client.model.Filters.eq;
5
import static com.mongodb.client.model.Filters.regex;
6

    
7
import java.util.List;
8
import java.util.function.Function;
9
import java.util.stream.Collectors;
10

    
11
import org.apache.commons.lang.StringUtils;
12
import org.apache.commons.lang.time.DateFormatUtils;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.bson.Document;
16
import org.bson.conversions.Bson;
17
import org.springframework.beans.factory.annotation.Autowired;
18
import org.springframework.cache.annotation.CacheEvict;
19
import org.springframework.cache.annotation.Cacheable;
20
import org.springframework.stereotype.Component;
21

    
22
import com.google.common.primitives.Ints;
23
import com.mongodb.BasicDBObject;
24
import com.mongodb.MongoClient;
25
import com.mongodb.client.MongoCollection;
26
import com.mongodb.client.model.Projections;
27

    
28
import eu.dnetlib.OpenaireExporterConfig;
29
import eu.dnetlib.OpenaireExporterConfig.Datasource;
30
import eu.dnetlib.Utils;
31
import eu.dnetlib.enabling.datasources.common.AggregationInfo;
32
import eu.dnetlib.enabling.datasources.common.AggregationStage;
33
import eu.dnetlib.enabling.datasources.common.DatasourceManagerException;
34
import eu.dnetlib.miscutils.datetime.DateUtils;
35
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
36
import eu.dnetlib.openaire.exporter.model.datasource.CollectionInfo;
37
import eu.dnetlib.openaire.exporter.model.datasource.CollectionMode;
38
import eu.dnetlib.openaire.exporter.model.datasource.TransformationInfo;
39

    
40
/**
41
 * Created by claudio on 20/10/2016.
42
 */
43
@Component
44
public class MongoLoggerClient {
45

    
46
	private static final Log log = LogFactory.getLog(MongoLoggerClient.class);
47

    
48
	@Autowired
49
	private MongoClient datasourcePublisherMongoClient;
50

    
51
	@Autowired
52
	private OpenaireExporterConfig config;
53

    
54
	private static Bson fields = getFields();
55

    
56
	private static MongoCollection<Document> collection = null;
57

    
58
	private Bson getQuery(final String dsId) {
59
		return and(
60
				eq("parentDatasourceId", dsId),
61
				eq("system:profileFamily", "aggregator"),
62
				eq("system:isCompletedSuccessfully", "true"),
63
				regex("system:wfName", "(collect|transform)", "i"));
64
	}
65

    
66
	private synchronized MongoCollection<Document> getCollection() {
67
		if (collection == null) {
68
			log.info("inizializing mongodb collection ...");
69
			final Datasource conf = config.getDatasource();
70
			collection = datasourcePublisherMongoClient.getDatabase(conf.getMongoDbName()).getCollection(conf.getMongoCollectionName());
71
		}
72
		return collection;
73
	}
74

    
75
	@Cacheable("datasources-mongo-cache")
76
	public List<AggregationInfo> getAggregationHistory(final String dsId) throws DatasourceManagerException {
77

    
78
		log.warn(String.format("getAggregationHistory(dsId = %s): not using cache", dsId));
79
		final Datasource conf = config.getDatasource();
80
		final Bson query = getQuery(dsId);
81
		return Utils.stream(getCollection().find(query).projection(fields).limit(conf.getMongoQueryLimit()).sort(dbo("system:startHumanDate", -1)).iterator())
82
				.map(getMapper())
83
				.filter(ai -> ai.getNumberOfRecords() >= 0 && StringUtils.isNotBlank(ai.getDate()))
84
				.collect(Collectors.toList());
85
	}
86

    
87
	@CacheEvict(cacheNames = "datasources-mongo-cache", allEntries = true)
88
	public void dropCache() {
89
		log.info("dropped dsManager aggregation history cache");
90
	}
91

    
92
	private Function<Document, AggregationInfo> getMapper() {
93
		return new Function<Document, AggregationInfo>() {
94

    
95
			@Override
96
			public AggregationInfo apply(final Document d) {
97

    
98
				AggregationInfo info = null;
99
				final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
100
				switch (stage) {
101

    
102
				case COLLECT:
103
					CollectionInfo cInfo = new CollectionInfo();
104
					cInfo.setAggregationStage(stage);
105
					final String collectionMode = d.getString("system:node:SELECT_MODE:selection");
106
					cInfo.setCollectionMode(StringUtils.isNotBlank(collectionMode) ? CollectionMode.valueOf(collectionMode) : null);
107
					cInfo.setNumberOfRecords(getNumberOfRecords(d));
108
					cInfo.setDate(getDate(d));
109
					info = cInfo;
110
					break;
111
				case TRANSFORM:
112
					TransformationInfo tInfo = new TransformationInfo();
113
					tInfo.setAggregationStage(stage);
114
					tInfo.setNumberOfRecords(getNumberOfRecords(d));
115
					tInfo.setDate(getDate(d));
116
					info = tInfo;
117
					break;
118
				}
119
				return info;
120
			}
121

    
122
			private String getDate(final Document d) {
123
				final String dateString = d.getString("system:startHumanDate");
124
				if (StringUtils.isBlank(dateString)) { return ""; }
125
				return DateFormatUtils.format(new DateUtils().parse(dateString), DatasourceFunctions.DATE_FORMAT);
126
			}
127

    
128
			private Integer getNumberOfRecords(final Document d) {
129
				final String sinkSize = d.getString("mainlog:sinkSize");
130
				final String total = d.getString("mainlog:total");
131

    
132
				if (StringUtils.isNotBlank(sinkSize)) {
133
					return Ints.tryParse(sinkSize);
134
				} else if (StringUtils.isNotBlank(total)) {
135
					return Ints.tryParse(total);
136
				} else {
137
					return -1;
138
				}
139
			}
140
		};
141
	}
142

    
143
	private static Bson getFields() {
144
		return Projections.fields(
145
				eq("system:wfName", 1),
146
				eq("system:node:SELECT_MODE:selection", 1),
147
				eq("mainlog:sinkSize", 1),
148
				eq("mainlog:writeOps", 1),
149
				eq("mainlog:total", 1),
150
				eq("system:startHumanDate", 1),
151
				eq("system:profileName", 1));
152
	}
153

    
154
	private static BasicDBObject dbo(final String key, final Object value) {
155
		return new BasicDBObject(key, value);
156
	}
157

    
158
}
(6-6/10)