Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.consistency;
2

    
3
import java.io.IOException;
4
import java.io.StringReader;
5
import java.util.Date;
6
import java.util.List;
7
import java.util.Objects;
8
import java.util.Set;
9
import java.util.stream.Collectors;
10

    
11
import javax.annotation.Resource;
12

    
13
import org.antlr.stringtemplate.StringTemplate;
14
import org.apache.commons.io.IOUtils;
15
import org.apache.commons.lang.math.NumberUtils;
16
import org.apache.commons.lang3.StringUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.dom4j.Document;
20
import org.dom4j.DocumentException;
21
import org.dom4j.io.SAXReader;
22
import org.springframework.beans.factory.annotation.Autowired;
23

    
24
import com.googlecode.sarasvati.Arc;
25
import com.googlecode.sarasvati.NodeToken;
26

    
27
import eu.dnetlib.data.mdstore.MDStoreServiceException;
28
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
29
import eu.dnetlib.data.objectstore.modular.connector.ObjectStoreDao;
30
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
31
import eu.dnetlib.enabling.datasources.LocalOpenaireDatasourceManager;
32
import eu.dnetlib.enabling.datasources.common.Api;
33
import eu.dnetlib.enabling.datasources.common.ApiParam;
34
import eu.dnetlib.enabling.datasources.common.DsmException;
35
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
36
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
37
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
38
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
39
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
40
import eu.dnetlib.miscutils.datetime.DateUtils;
41
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
42
import eu.dnetlib.msro.workflows.util.ProgressProvider;
43
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
44

    
45
public class FixRepoMdstoreSizesJobNode extends SimpleJobNode implements ProgressProvider {
46

    
47
	@Autowired
48
	private LocalOpenaireDatasourceManager dsManager;
49

    
50
	@Autowired
51
	private UniqueServiceLocator serviceLocator;
52

    
53
	@Resource(name = "mongodbMDStoreDao")
54
	private MDStoreDao mdstoreDao;
55

    
56
	@Autowired
57
	private ObjectStoreDao objectStoreDao;
58

    
59
	private final DateUtils dateUtils = new DateUtils();
60

    
61
	private int current = 0;
62
	private int total = 0;
63

    
64
	private ISRegistryService registry;
65
	private ISLookUpService lookup;
66

    
67
	private boolean alwaysUpdate = false;
68

    
69
	private static final Log log = LogFactory.getLog(FixRepoMdstoreSizesJobNode.class);
70

    
71
	public void init(final int total) {
72
		this.current = 0;
73
		this.total = total;
74
		this.lookup = serviceLocator.getService(ISLookUpService.class);
75
		this.registry = serviceLocator.getService(ISRegistryService.class);
76
	}
77

    
78
	@Override
79
	protected String execute(final NodeToken token) throws Exception {
80
		final Set<String> list = dsManager.listManagedDatasourceIds();
81

    
82
		init(list.size());
83

    
84
		for (final String dsId : list) {
85
			log.info("Processing ds: " + dsId);
86

    
87
			current++;
88
			try {
89
				for (final Api<ApiParam> api : dsManager.getApis(dsId)) {
90
					verifyApi(dsId, api);
91
				}
92
			} catch (final Throwable e) {
93
				log.error("Error processing ds: " + dsId, e);
94
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR + "::" + dsId, e.getMessage());
95
			}
96
		}
97

    
98
		return Arc.DEFAULT_ARC;
99
	}
100

    
101
	private void verifyApi(final String dsId, final Api<ApiParam> api)
102
			throws DsmException, ISRegistryException, IOException, ISLookUpException, MDStoreServiceException, ObjectStoreServiceException {
103

    
104
		for (final Document doc : listCollectionMdStores(dsId, api.getId())) {
105
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
106
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
107
			final int size = mdstoreDao.getMDStore(mdId).getSize();
108
			if (alwaysUpdate || size != actualSize) {
109
				log.info("  -- Updating size of api " + api.getId() + ", new value = " + size);
110
				updateMdStoreProfile(mdId, doc, size);
111
				dsManager.setLastCollectionInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
112
			}
113
		}
114

    
115
		for (final Document doc : listTransformationMdStores(dsId, api.getId())) {
116
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
117
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
118
			final int size = mdstoreDao.getMDStore(mdId).getSize();
119
			if (alwaysUpdate || size != actualSize) {
120
				updateMdStoreProfile(mdId, doc, size);
121
				dsManager.setLastAggregationInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
122
			}
123
		}
124

    
125
		for (final Document doc : listDownloadObjectStores(dsId, api.getId())) {
126
			final String objId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
127
			final int actualSize = NumberUtils.toInt(doc.valueOf("//STORE_SIZE"), 0);
128
			final int size = objectStoreDao.getObjectStore(objId).getSize();
129
			if (alwaysUpdate || size != actualSize) {
130
				updateObjStoreProfile(objId, doc, size);
131
				dsManager.setLastDownloadInfo(dsId, api.getId(), objId, size, calculateLastDate(doc));
132
			}
133
		}
134
	}
135

    
136
	private Date calculateLastDate(final Document doc) {
137
		final String dateS = doc.valueOf("//LAST_STORAGE_DATE");
138
		final Date date = StringUtils.isNoneBlank(dateS) ? dateUtils.parse(dateS) : new Date();
139
		return date;
140
	}
141

    
142
	private List<Document> listCollectionMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
143
		return executeXquery("listCollectionMdStores.xquery.st", dsId, apiId);
144
	}
145

    
146
	private List<Document> listTransformationMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
147
		return executeXquery("listTransformationMdStores.xquery.st", dsId, apiId);
148
	}
149

    
150
	private List<Document> listDownloadObjectStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
151
		return executeXquery("listDownloadObjectStores.xquery.st", dsId, apiId);
152
	}
153

    
154
	private List<Document> executeXquery(final String template, final String dsId, final String apiId) throws ISLookUpException, IOException {
155
		final StringTemplate st = new StringTemplate(IOUtils.toString(getClass().getResourceAsStream(template)));
156
		st.setAttribute("dsId", dsId);
157
		st.setAttribute("apiId", apiId);
158

    
159
		final SAXReader reader = new SAXReader();
160

    
161
		return lookup.quickSearchProfile(st.toString())
162
				.stream()
163
				.map(s -> {
164
					try {
165
						return reader.read(new StringReader(s));
166
					} catch (final DocumentException e) {
167
						return null;
168
					}
169
				})
170
				.filter(Objects::nonNull)
171
				.collect(Collectors.toList());
172
	}
173

    
174
	private void updateMdStoreProfile(final String mdId, final Document doc, final int size) throws ISRegistryException {
175
		doc.selectSingleNode("//NUMBER_OF_RECORDS").setText(Integer.toString(size));
176
		registry.updateProfile(mdId, doc.asXML(), "MDStoreDSResourceType");
177
	}
178

    
179
	private void updateObjStoreProfile(final String objId, final Document doc, final int size) throws ISRegistryException {
180
		doc.selectSingleNode("//COUNT_STORE").setText(Integer.toString(size));
181
		doc.selectSingleNode("//STORE_SIZE").setText(Integer.toString(size));
182
		registry.updateProfile(objId, doc.asXML(), "ObjectStoreDSResourceType");
183
	}
184

    
185
	@Override
186
	public int getTotalValue() {
187
		return total;
188
	}
189

    
190
	@Override
191
	public int getCurrentValue() {
192
		return current;
193
	}
194

    
195
	@Override
196
	public boolean isInaccurate() {
197
		return false;
198
	}
199

    
200
	public boolean isAlwaysUpdate() {
201
		return alwaysUpdate;
202
	}
203

    
204
	public void setAlwaysUpdate(final boolean alwaysUpdate) {
205
		this.alwaysUpdate = alwaysUpdate;
206
	}
207

    
208
}
(1-1/2)