Project

General

Profile

1 53478 michele.ar
package eu.dnetlib.msro.openaireplus.workflows.nodes.consistency;
2
3
import java.io.IOException;
4
import java.io.StringReader;
5
import java.util.Date;
6
import java.util.List;
7
import java.util.Objects;
8
import java.util.Set;
9
import java.util.stream.Collectors;
10
11
import javax.annotation.Resource;
12
13
import org.antlr.stringtemplate.StringTemplate;
14
import org.apache.commons.io.IOUtils;
15 53485 michele.ar
import org.apache.commons.lang.math.NumberUtils;
16 53478 michele.ar
import org.apache.commons.lang3.StringUtils;
17 53498 michele.ar
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19 53478 michele.ar
import org.dom4j.Document;
20
import org.dom4j.DocumentException;
21
import org.dom4j.io.SAXReader;
22
import org.springframework.beans.factory.annotation.Autowired;
23
24
import com.googlecode.sarasvati.Arc;
25
import com.googlecode.sarasvati.NodeToken;
26
27
import eu.dnetlib.data.mdstore.MDStoreServiceException;
28
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
29
import eu.dnetlib.data.objectstore.modular.connector.ObjectStoreDao;
30
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
31
import eu.dnetlib.enabling.datasources.LocalOpenaireDatasourceManager;
32
import eu.dnetlib.enabling.datasources.common.Api;
33
import eu.dnetlib.enabling.datasources.common.ApiParam;
34
import eu.dnetlib.enabling.datasources.common.DsmException;
35
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
36
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
37
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
38
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
39
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
40
import eu.dnetlib.miscutils.datetime.DateUtils;
41 53502 michele.ar
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
42 53478 michele.ar
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
43
import eu.dnetlib.msro.workflows.util.ProgressProvider;
44
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
45
46 53502 michele.ar
public class FixRepoMdstoreSizesJobNode extends SimpleJobNode implements ProgressJobNode {
47 53478 michele.ar
48
	@Autowired
49
	private LocalOpenaireDatasourceManager dsManager;
50
51
	@Autowired
52
	private UniqueServiceLocator serviceLocator;
53
54
	@Resource(name = "mongodbMDStoreDao")
55
	private MDStoreDao mdstoreDao;
56
57
	@Autowired
58
	private ObjectStoreDao objectStoreDao;
59
60
	private final DateUtils dateUtils = new DateUtils();
61
62
	private int current = 0;
63
	private int total = 0;
64
65
	private ISRegistryService registry;
66
	private ISLookUpService lookup;
67
68 53486 michele.ar
	private boolean alwaysUpdate = false;
69
70 53498 michele.ar
	private static final Log log = LogFactory.getLog(FixRepoMdstoreSizesJobNode.class);
71
72 53478 michele.ar
	public void init(final int total) {
73
		this.current = 0;
74
		this.total = total;
75
		this.lookup = serviceLocator.getService(ISLookUpService.class);
76
		this.registry = serviceLocator.getService(ISRegistryService.class);
77
	}
78
79
	@Override
80
	protected String execute(final NodeToken token) throws Exception {
81
		final Set<String> list = dsManager.listManagedDatasourceIds();
82
83
		init(list.size());
84
85
		for (final String dsId : list) {
86 53498 michele.ar
			log.info("Processing ds: " + dsId);
87
88 53478 michele.ar
			current++;
89
			try {
90
				for (final Api<ApiParam> api : dsManager.getApis(dsId)) {
91
					verifyApi(dsId, api);
92
				}
93
			} catch (final Throwable e) {
94 53498 michele.ar
				log.error("Error processing ds: " + dsId, e);
95 53478 michele.ar
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR + "::" + dsId, e.getMessage());
96
			}
97
		}
98
99
		return Arc.DEFAULT_ARC;
100
	}
101
102
	private void verifyApi(final String dsId, final Api<ApiParam> api)
103
			throws DsmException, ISRegistryException, IOException, ISLookUpException, MDStoreServiceException, ObjectStoreServiceException {
104
105
		for (final Document doc : listCollectionMdStores(dsId, api.getId())) {
106
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
107 53485 michele.ar
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
108 53478 michele.ar
			final int size = mdstoreDao.getMDStore(mdId).getSize();
109 53486 michele.ar
			if (alwaysUpdate || size != actualSize) {
110 53498 michele.ar
				log.info("  -- Updating size of api " + api.getId() + ", new value = " + size);
111 53485 michele.ar
				updateMdStoreProfile(mdId, doc, size);
112
				dsManager.setLastCollectionInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
113
			}
114 53478 michele.ar
		}
115
116
		for (final Document doc : listTransformationMdStores(dsId, api.getId())) {
117
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
118 53485 michele.ar
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
119 53478 michele.ar
			final int size = mdstoreDao.getMDStore(mdId).getSize();
120 53486 michele.ar
			if (alwaysUpdate || size != actualSize) {
121 53485 michele.ar
				updateMdStoreProfile(mdId, doc, size);
122
				dsManager.setLastAggregationInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
123
			}
124 53478 michele.ar
		}
125
126
		for (final Document doc : listDownloadObjectStores(dsId, api.getId())) {
127
			final String objId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
128 53485 michele.ar
			final int actualSize = NumberUtils.toInt(doc.valueOf("//STORE_SIZE"), 0);
129 53478 michele.ar
			final int size = objectStoreDao.getObjectStore(objId).getSize();
130 53486 michele.ar
			if (alwaysUpdate || size != actualSize) {
131 53485 michele.ar
				updateObjStoreProfile(objId, doc, size);
132
				dsManager.setLastDownloadInfo(dsId, api.getId(), objId, size, calculateLastDate(doc));
133
			}
134 53478 michele.ar
		}
135
	}
136
137
	private Date calculateLastDate(final Document doc) {
138
		final String dateS = doc.valueOf("//LAST_STORAGE_DATE");
139
		final Date date = StringUtils.isNoneBlank(dateS) ? dateUtils.parse(dateS) : new Date();
140
		return date;
141
	}
142
143
	private List<Document> listCollectionMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
144
		return executeXquery("listCollectionMdStores.xquery.st", dsId, apiId);
145
	}
146
147
	private List<Document> listTransformationMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
148
		return executeXquery("listTransformationMdStores.xquery.st", dsId, apiId);
149
	}
150
151
	private List<Document> listDownloadObjectStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
152
		return executeXquery("listDownloadObjectStores.xquery.st", dsId, apiId);
153
	}
154
155
	private List<Document> executeXquery(final String template, final String dsId, final String apiId) throws ISLookUpException, IOException {
156
		final StringTemplate st = new StringTemplate(IOUtils.toString(getClass().getResourceAsStream(template)));
157
		st.setAttribute("dsId", dsId);
158
		st.setAttribute("apiId", apiId);
159
160
		final SAXReader reader = new SAXReader();
161
162
		return lookup.quickSearchProfile(st.toString())
163
				.stream()
164
				.map(s -> {
165
					try {
166
						return reader.read(new StringReader(s));
167
					} catch (final DocumentException e) {
168
						return null;
169
					}
170
				})
171
				.filter(Objects::nonNull)
172
				.collect(Collectors.toList());
173
	}
174
175
	private void updateMdStoreProfile(final String mdId, final Document doc, final int size) throws ISRegistryException {
176
		doc.selectSingleNode("//NUMBER_OF_RECORDS").setText(Integer.toString(size));
177
		registry.updateProfile(mdId, doc.asXML(), "MDStoreDSResourceType");
178
	}
179
180
	private void updateObjStoreProfile(final String objId, final Document doc, final int size) throws ISRegistryException {
181
		doc.selectSingleNode("//COUNT_STORE").setText(Integer.toString(size));
182
		doc.selectSingleNode("//STORE_SIZE").setText(Integer.toString(size));
183
		registry.updateProfile(objId, doc.asXML(), "ObjectStoreDSResourceType");
184
	}
185
186 53502 michele.ar
	public boolean isAlwaysUpdate() {
187
		return alwaysUpdate;
188 53478 michele.ar
	}
189
190 53502 michele.ar
	public void setAlwaysUpdate(final boolean alwaysUpdate) {
191
		this.alwaysUpdate = alwaysUpdate;
192 53478 michele.ar
	}
193
194
	@Override
195 53502 michele.ar
	public ProgressProvider getProgressProvider() {
196
		return new ProgressProvider() {
197 53478 michele.ar
198 53502 michele.ar
			@Override
199
			public int getTotalValue() {
200
				return total;
201
			}
202 53486 michele.ar
203 53502 michele.ar
			@Override
204
			public int getCurrentValue() {
205
				return current;
206
			}
207
208
			@Override
209
			public boolean isInaccurate() {
210
				return false;
211
			}
212
		};
213 53486 michele.ar
	}
214
215 53478 michele.ar
}