Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource.clients;
2

    
3
import java.util.List;
4
import java.util.function.Function;
5
import java.util.stream.Collectors;
6

    
7
import com.google.common.primitives.Ints;
8
import com.mongodb.BasicDBObject;
9
import com.mongodb.MongoClient;
10
import com.mongodb.client.MongoCollection;
11
import com.mongodb.client.model.Projections;
12
import eu.dnetlib.OpenaireExporterConfig;
13
import eu.dnetlib.OpenaireExporterConfig.Datasource;
14
import eu.dnetlib.Utils;
15
import eu.dnetlib.miscutils.datetime.DateUtils;
16
import eu.dnetlib.openaire.exporter.datasource.ApiException;
17
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
18
import eu.dnetlib.openaire.exporter.model.datasource.*;
19
import org.apache.commons.lang.StringUtils;
20
import org.apache.commons.lang.time.DateFormatUtils;
21
import org.apache.commons.logging.Log;
22
import org.apache.commons.logging.LogFactory;
23
import org.bson.Document;
24
import org.bson.conversions.Bson;
25
import org.springframework.beans.factory.annotation.Autowired;
26
import org.springframework.cache.annotation.CacheEvict;
27
import org.springframework.cache.annotation.Cacheable;
28
import org.springframework.stereotype.Component;
29

    
30
import static com.mongodb.client.model.Filters.*;
31

    
32
/**
33
 * Created by claudio on 20/10/2016.
34
 */
35
@Component
36
public class MongoLoggerClient {
37

    
38
	private static final Log log = LogFactory.getLog(MongoLoggerClient.class);
39

    
40
	@Autowired
41
	private MongoClient datasourcePublisherMongoClient;
42

    
43
	@Autowired
44
	private OpenaireExporterConfig config;
45

    
46
	private static Bson fields = getFields();
47

    
48
	private static MongoCollection<Document> collection = null;
49

    
50
	private Bson getQuery(final String dsId) {
51
		return and(
52
				eq("parentDatasourceId", dsId),
53
				eq("system:profileFamily", "aggregator"),
54
				eq("system:isCompletedSuccessfully", "true"),
55
				regex("system:wfName", "(collect|transform)", "i"));
56
	}
57

    
58
	private synchronized MongoCollection<Document> getCollection() {
59
		if (collection == null) {
60
			log.info("inizializing mongodb collection ...");
61
			final Datasource conf = config.getDatasource();
62
			collection = datasourcePublisherMongoClient.getDatabase(conf.getMongoDbName()).getCollection(conf.getMongoCollectionName());
63
		}
64
		return collection;
65
	}
66

    
67
	@Cacheable("datasources-mongo-cache")
68
	public List<AggregationInfo> getAggregationHistory(final String dsId) throws ApiException {
69

    
70
		log.warn(String.format("getAggregationHistory(dsId = %s): not using cache", dsId));
71
		final Datasource conf = config.getDatasource();
72
		final Bson query = getQuery(dsId);
73
		return Utils.stream(getCollection().find(query).projection(fields).limit(conf.getMongoQueryLimit()).sort(dbo("system:startHumanDate", -1)).iterator())
74
				.map(getMapper())
75
				.filter(ai -> ai.getNumberOfRecords() >= 0 && StringUtils.isNotBlank(ai.getDate()))
76
				.collect(Collectors.toList());
77
	}
78

    
79
	@CacheEvict(cacheNames = "datasources-mongo-cache", allEntries = true)
80
	public void dropCache() {
81
		log.info("dropped dsManager aggregation history cache");
82
	}
83

    
84
	private Function<Document, AggregationInfo> getMapper() {
85
		return new Function<Document, AggregationInfo>() {
86

    
87
			@Override
88
			public AggregationInfo apply(final Document d) {
89

    
90
				AggregationInfo info = null;
91
				final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
92
				switch (stage) {
93

    
94
				case COLLECT:
95
					CollectionInfo cInfo = new CollectionInfo();
96
					cInfo.setAggregationStage(stage);
97
					final String collectionMode = d.getString("system:node:SELECT_MODE:selection");
98
					cInfo.setCollectionMode(StringUtils.isNotBlank(collectionMode) ? CollectionMode.valueOf(collectionMode) : null);
99
					cInfo.setNumberOfRecords(getNumberOfRecords(d));
100
					cInfo.setDate(getDate(d));
101
					info = cInfo;
102
					break;
103
				case TRANSFORM:
104
					TransformationInfo tInfo = new TransformationInfo();
105
					tInfo.setAggregationStage(stage);
106
					tInfo.setNumberOfRecords(getNumberOfRecords(d));
107
					tInfo.setDate(getDate(d));
108
					info = tInfo;
109
					break;
110
				}
111
				return info;
112
			}
113

    
114
			private String getDate(final Document d) {
115
				final String dateString = d.getString("system:startHumanDate");
116
				if (StringUtils.isBlank(dateString)) {
117
					return "";
118
				}
119
				return DateFormatUtils.format(new DateUtils().parse(dateString), DatasourceFunctions.DATE_FORMAT);
120
			}
121

    
122
			private Integer getNumberOfRecords(final Document d) {
123
				final String sinkSize = d.getString("mainlog:sinkSize");
124
				final String total = d.getString("mainlog:total");
125

    
126
				if(StringUtils.isNotBlank(sinkSize)) {
127
					return Ints.tryParse(sinkSize);
128
				} else if(StringUtils.isNotBlank(total)) {
129
					return Ints.tryParse(total);
130
				} else {
131
					return -1;
132
				}
133
			}
134
		};
135
	}
136

    
137
	private static Bson getFields() {
138
		return Projections.fields(
139
				eq("system:wfName", 1),
140
				eq("system:node:SELECT_MODE:selection", 1),
141
				eq("mainlog:sinkSize", 1),
142
				eq("mainlog:writeOps", 1),
143
				eq("mainlog:total", 1),
144
				eq("system:startHumanDate", 1),
145
				eq("system:profileName", 1));
146
	}
147

    
148
	private static BasicDBObject dbo(final String key, final Object value) {
149
		return new BasicDBObject(key, value);
150
	}
151

    
152

    
153
}
(5-5/6)