Project

General

Profile

1 53478 michele.ar
package eu.dnetlib.msro.openaireplus.workflows.nodes.consistency;
2
3
import java.io.IOException;
4
import java.io.StringReader;
5
import java.util.Date;
6 53508 michele.ar
import java.util.HashMap;
7 53478 michele.ar
import java.util.List;
8 53508 michele.ar
import java.util.Map;
9 53478 michele.ar
import java.util.Objects;
10
import java.util.Set;
11
import java.util.stream.Collectors;
12
13
import javax.annotation.Resource;
14
15
import org.antlr.stringtemplate.StringTemplate;
16
import org.apache.commons.io.IOUtils;
17 53485 michele.ar
import org.apache.commons.lang.math.NumberUtils;
18 53478 michele.ar
import org.apache.commons.lang3.StringUtils;
19 53498 michele.ar
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21 53478 michele.ar
import org.dom4j.Document;
22
import org.dom4j.DocumentException;
23
import org.dom4j.io.SAXReader;
24
import org.springframework.beans.factory.annotation.Autowired;
25
26
import com.googlecode.sarasvati.Arc;
27
import com.googlecode.sarasvati.NodeToken;
28
29
import eu.dnetlib.data.mdstore.MDStoreServiceException;
30
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
31
import eu.dnetlib.data.objectstore.modular.connector.ObjectStoreDao;
32
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
33
import eu.dnetlib.enabling.datasources.LocalOpenaireDatasourceManager;
34
import eu.dnetlib.enabling.datasources.common.Api;
35
import eu.dnetlib.enabling.datasources.common.ApiParam;
36
import eu.dnetlib.enabling.datasources.common.DsmException;
37
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
38
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
39
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
40
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
41
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
42
import eu.dnetlib.miscutils.datetime.DateUtils;
43 53502 michele.ar
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
44 53478 michele.ar
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
45
import eu.dnetlib.msro.workflows.util.ProgressProvider;
46
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
47
48 53502 michele.ar
public class FixRepoMdstoreSizesJobNode extends SimpleJobNode implements ProgressJobNode {
49 53478 michele.ar
50
	@Autowired
51
	private LocalOpenaireDatasourceManager dsManager;
52
53
	@Autowired
54
	private UniqueServiceLocator serviceLocator;
55
56
	@Resource(name = "mongodbMDStoreDao")
57
	private MDStoreDao mdstoreDao;
58
59
	@Autowired
60
	private ObjectStoreDao objectStoreDao;
61
62
	private final DateUtils dateUtils = new DateUtils();
63
64
	private int current = 0;
65
	private int total = 0;
66
67
	private ISRegistryService registry;
68
	private ISLookUpService lookup;
69
70 53508 michele.ar
	private final Map<String, String> openaireIds = new HashMap<>();
71 53486 michele.ar
	private boolean alwaysUpdate = false;
72
73 53498 michele.ar
	private static final Log log = LogFactory.getLog(FixRepoMdstoreSizesJobNode.class);
74
75 53478 michele.ar
	public void init(final int total) {
76
		this.current = 0;
77
		this.total = total;
78
		this.lookup = serviceLocator.getService(ISLookUpService.class);
79
		this.registry = serviceLocator.getService(ISRegistryService.class);
80 53508 michele.ar
		try {
81
			openaireIds.putAll(lookup.quickSearchProfile(
82
					"for $x in collection('/db/DRIVER/RepositoryServiceResources/RepositoryServiceResourceType') return concat($x//DATASOURCE_ORIGINAL_ID, ' @@@ ', $x//RESOURCE_IDENTIFIER/@value)")
83
					.stream()
84
					.collect(Collectors.toMap(
85
							s -> StringUtils.substringBefore(s, "@@@").trim(),
86
							s -> StringUtils.substringAfter(s, "@@@").trim())));
87
		} catch (final ISLookUpException e) {
88
			// TODO Auto-generated catch block
89
			e.printStackTrace();
90
		}
91
92 53478 michele.ar
	}
93
94
	@Override
95
	protected String execute(final NodeToken token) throws Exception {
96
		final Set<String> list = dsManager.listManagedDatasourceIds();
97
98
		init(list.size());
99
100
		for (final String dsId : list) {
101 53498 michele.ar
			log.info("Processing ds: " + dsId);
102
103 53478 michele.ar
			current++;
104 53508 michele.ar
105 53478 michele.ar
			try {
106
				for (final Api<ApiParam> api : dsManager.getApis(dsId)) {
107
					verifyApi(dsId, api);
108
				}
109
			} catch (final Throwable e) {
110 53498 michele.ar
				log.error("Error processing ds: " + dsId, e);
111 53521 michele.ar
				token.getEnv().setAttribute(WorkflowsConstants.MAIN_LOG_PREFIX + dsId, e.getMessage());
112 53478 michele.ar
			}
113
		}
114
115
		return Arc.DEFAULT_ARC;
116
	}
117
118
	private void verifyApi(final String dsId, final Api<ApiParam> api)
119
			throws DsmException, ISRegistryException, IOException, ISLookUpException, MDStoreServiceException, ObjectStoreServiceException {
120
121
		for (final Document doc : listCollectionMdStores(dsId, api.getId())) {
122
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
123 53485 michele.ar
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
124 53478 michele.ar
			final int size = mdstoreDao.getMDStore(mdId).getSize();
125 53486 michele.ar
			if (alwaysUpdate || size != actualSize) {
126 53498 michele.ar
				log.info("  -- Updating size of api " + api.getId() + ", new value = " + size);
127 53485 michele.ar
				updateMdStoreProfile(mdId, doc, size);
128
				dsManager.setLastCollectionInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
129
			}
130 53478 michele.ar
		}
131
132
		for (final Document doc : listTransformationMdStores(dsId, api.getId())) {
133
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
134 53485 michele.ar
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
135 53478 michele.ar
			final int size = mdstoreDao.getMDStore(mdId).getSize();
136 53486 michele.ar
			if (alwaysUpdate || size != actualSize) {
137 53485 michele.ar
				updateMdStoreProfile(mdId, doc, size);
138
				dsManager.setLastAggregationInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
139
			}
140 53478 michele.ar
		}
141
142
		for (final Document doc : listDownloadObjectStores(dsId, api.getId())) {
143
			final String objId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
144 53485 michele.ar
			final int actualSize = NumberUtils.toInt(doc.valueOf("//STORE_SIZE"), 0);
145 53478 michele.ar
			final int size = objectStoreDao.getObjectStore(objId).getSize();
146 53486 michele.ar
			if (alwaysUpdate || size != actualSize) {
147 53485 michele.ar
				updateObjStoreProfile(objId, doc, size);
148
				dsManager.setLastDownloadInfo(dsId, api.getId(), objId, size, calculateLastDate(doc));
149
			}
150 53478 michele.ar
		}
151
	}
152
153
	private Date calculateLastDate(final Document doc) {
154
		final String dateS = doc.valueOf("//LAST_STORAGE_DATE");
155
		final Date date = StringUtils.isNoneBlank(dateS) ? dateUtils.parse(dateS) : new Date();
156
		return date;
157
	}
158
159
	private List<Document> listCollectionMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
160
		return executeXquery("listCollectionMdStores.xquery.st", dsId, apiId);
161
	}
162
163
	private List<Document> listTransformationMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
164
		return executeXquery("listTransformationMdStores.xquery.st", dsId, apiId);
165
	}
166
167
	private List<Document> listDownloadObjectStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
168
		return executeXquery("listDownloadObjectStores.xquery.st", dsId, apiId);
169
	}
170
171
	private List<Document> executeXquery(final String template, final String dsId, final String apiId) throws ISLookUpException, IOException {
172
		final StringTemplate st = new StringTemplate(IOUtils.toString(getClass().getResourceAsStream(template)));
173 53508 michele.ar
		st.setAttribute("dsId", openaireIds.get(dsId));
174 53478 michele.ar
		st.setAttribute("apiId", apiId);
175
176
		final SAXReader reader = new SAXReader();
177
178
		return lookup.quickSearchProfile(st.toString())
179
				.stream()
180
				.map(s -> {
181
					try {
182
						return reader.read(new StringReader(s));
183
					} catch (final DocumentException e) {
184
						return null;
185
					}
186
				})
187
				.filter(Objects::nonNull)
188
				.collect(Collectors.toList());
189
	}
190
191
	private void updateMdStoreProfile(final String mdId, final Document doc, final int size) throws ISRegistryException {
192
		doc.selectSingleNode("//NUMBER_OF_RECORDS").setText(Integer.toString(size));
193
		registry.updateProfile(mdId, doc.asXML(), "MDStoreDSResourceType");
194
	}
195
196
	private void updateObjStoreProfile(final String objId, final Document doc, final int size) throws ISRegistryException {
197
		doc.selectSingleNode("//COUNT_STORE").setText(Integer.toString(size));
198
		doc.selectSingleNode("//STORE_SIZE").setText(Integer.toString(size));
199
		registry.updateProfile(objId, doc.asXML(), "ObjectStoreDSResourceType");
200
	}
201
202 53502 michele.ar
	public boolean isAlwaysUpdate() {
203
		return alwaysUpdate;
204 53478 michele.ar
	}
205
206 53502 michele.ar
	public void setAlwaysUpdate(final boolean alwaysUpdate) {
207
		this.alwaysUpdate = alwaysUpdate;
208 53478 michele.ar
	}
209
210
	@Override
211 53502 michele.ar
	public ProgressProvider getProgressProvider() {
212
		return new ProgressProvider() {
213 53478 michele.ar
214 53502 michele.ar
			@Override
215
			public int getTotalValue() {
216
				return total;
217
			}
218 53486 michele.ar
219 53502 michele.ar
			@Override
220
			public int getCurrentValue() {
221
				return current;
222
			}
223
224
			@Override
225
			public boolean isInaccurate() {
226
				return false;
227
			}
228
		};
229 53486 michele.ar
	}
230
231 53478 michele.ar
}