Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource.clients;
2

    
3
import java.util.List;
4
import java.util.function.Function;
5
import java.util.stream.Collectors;
6

    
7
import com.google.common.collect.Streams;
8
import com.google.common.primitives.Ints;
9
import com.mongodb.BasicDBObject;
10
import com.mongodb.MongoClient;
11
import com.mongodb.client.MongoCollection;
12
import com.mongodb.client.MongoCursor;
13
import com.mongodb.client.model.Projections;
14
import eu.dnetlib.OpenaireExporterConfig;
15
import eu.dnetlib.OpenaireExporterConfig.Datasource;
16
import eu.dnetlib.Utils;
17
import eu.dnetlib.miscutils.datetime.DateUtils;
18
import eu.dnetlib.openaire.exporter.datasource.ApiException;
19
import eu.dnetlib.openaire.exporter.datasource.clients.utils.DatasourceFunctions;
20
import eu.dnetlib.openaire.exporter.model.datasource.*;
21
import org.apache.commons.lang.StringUtils;
22
import org.apache.commons.lang.time.DateFormatUtils;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.bson.Document;
26
import org.bson.conversions.Bson;
27
import org.springframework.beans.factory.annotation.Autowired;
28
import org.springframework.cache.annotation.CacheEvict;
29
import org.springframework.cache.annotation.Cacheable;
30
import org.springframework.stereotype.Component;
31

    
32
import static com.mongodb.client.model.Filters.*;
33

    
34
/**
35
 * Created by claudio on 20/10/2016.
36
 */
37
@Component
38
public class MongoLoggerClient {
39

    
40
	private static final Log log = LogFactory.getLog(MongoLoggerClient.class);
41

    
42
	@Autowired
43
	private MongoClient datasourcePublisherMongoClient;
44

    
45
	@Autowired
46
	private OpenaireExporterConfig config;
47

    
48
	private static Bson fields = getFields();
49

    
50
	private static MongoCollection<Document> collection = null;
51

    
52
	private Bson getQuery(final String dsId) {
53
		return and(
54
				eq("parentDatasourceId", dsId),
55
				eq("system:profileFamily", "aggregator"),
56
				eq("system:isCompletedSuccessfully", "true"),
57
				regex("system:wfName", "(collect|transform)", "i"));
58
	}
59

    
60
	private synchronized MongoCollection<Document> getCollection() {
61
		if (collection == null) {
62
			log.info("inizializing mongodb collection ...");
63
			final Datasource conf = config.getDatasource();
64
			collection = datasourcePublisherMongoClient.getDatabase(conf.getMongoDbName()).getCollection(conf.getMongoCollectionName());
65
		}
66
		return collection;
67
	}
68

    
69
	@Cacheable("datasources-mongo-cache")
70
	public List<AggregationInfo> getAggregationHistory(final String dsId) throws ApiException {
71

    
72
		log.warn(String.format("getAggregationHistory(dsId = %s): not using cache", dsId));
73
		final Datasource conf = config.getDatasource();
74
		final Bson query = getQuery(dsId);
75
		final MongoCursor<Document> cursor = getCollection().find(query)
76
				.projection(fields)
77
				.limit(conf.getMongoQueryLimit())
78
				.sort(dbo("system:startHumanDate", -1))
79
				.iterator();
80
		return Streams.stream(cursor)
81
				.map(getMapper())
82
				.filter(ai -> ai.getNumberOfRecords() >= 0 && StringUtils.isNotBlank(ai.getDate()))
83
				.collect(Collectors.toList());
84
	}
85

    
86
	@CacheEvict(cacheNames = "datasources-mongo-cache", allEntries = true)
87
	public void dropCache() {
88
		log.info("dropped dsManager aggregation history cache");
89
	}
90

    
91
	private Function<Document, AggregationInfo> getMapper() {
92
		return new Function<Document, AggregationInfo>() {
93

    
94
			@Override
95
			public AggregationInfo apply(final Document d) {
96

    
97
				AggregationInfo info = null;
98
				final AggregationStage stage = AggregationStage.parse(d.getString("system:wfName"));
99
				switch (stage) {
100

    
101
				case COLLECT:
102
					CollectionInfo cInfo = new CollectionInfo();
103
					cInfo.setAggregationStage(stage);
104
					final String collectionMode = d.getString("system:node:SELECT_MODE:selection");
105
					cInfo.setCollectionMode(StringUtils.isNotBlank(collectionMode) ? CollectionMode.valueOf(collectionMode) : null);
106
					cInfo.setNumberOfRecords(getNumberOfRecords(d));
107
					cInfo.setDate(getDate(d));
108
					info = cInfo;
109
					break;
110
				case TRANSFORM:
111
					TransformationInfo tInfo = new TransformationInfo();
112
					tInfo.setAggregationStage(stage);
113
					tInfo.setNumberOfRecords(getNumberOfRecords(d));
114
					tInfo.setDate(getDate(d));
115
					info = tInfo;
116
					break;
117
				}
118
				return info;
119
			}
120

    
121
			private String getDate(final Document d) {
122
				final String dateString = d.getString("system:startHumanDate");
123
				if (StringUtils.isBlank(dateString)) {
124
					return "";
125
				}
126
				return DateFormatUtils.format(new DateUtils().parse(dateString), DatasourceFunctions.DATE_FORMAT);
127
			}
128

    
129
			private Integer getNumberOfRecords(final Document d) {
130
				final String sinkSize = d.getString("mainlog:sinkSize");
131
				final String total = d.getString("mainlog:total");
132

    
133
				if(StringUtils.isNotBlank(sinkSize)) {
134
					return Ints.tryParse(sinkSize);
135
				} else if(StringUtils.isNotBlank(total)) {
136
					return Ints.tryParse(total);
137
				} else {
138
					return -1;
139
				}
140
			}
141
		};
142
	}
143

    
144
	private static Bson getFields() {
145
		return Projections.fields(
146
				eq("system:wfName", 1),
147
				eq("system:node:SELECT_MODE:selection", 1),
148
				eq("mainlog:sinkSize", 1),
149
				eq("mainlog:writeOps", 1),
150
				eq("mainlog:total", 1),
151
				eq("system:startHumanDate", 1),
152
				eq("system:profileName", 1));
153
	}
154

    
155
	private static BasicDBObject dbo(final String key, final Object value) {
156
		return new BasicDBObject(key, value);
157
	}
158

    
159

    
160
}
(4-4/6)