Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.consistency;
2

    
3
import java.io.IOException;
4
import java.io.StringReader;
5
import java.util.Date;
6
import java.util.List;
7
import java.util.Objects;
8
import java.util.Set;
9
import java.util.stream.Collectors;
10

    
11
import javax.annotation.Resource;
12

    
13
import org.antlr.stringtemplate.StringTemplate;
14
import org.apache.commons.io.IOUtils;
15
import org.apache.commons.lang.math.NumberUtils;
16
import org.apache.commons.lang3.StringUtils;
17
import org.dom4j.Document;
18
import org.dom4j.DocumentException;
19
import org.dom4j.io.SAXReader;
20
import org.springframework.beans.factory.annotation.Autowired;
21

    
22
import com.googlecode.sarasvati.Arc;
23
import com.googlecode.sarasvati.NodeToken;
24

    
25
import eu.dnetlib.data.mdstore.MDStoreServiceException;
26
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
27
import eu.dnetlib.data.objectstore.modular.connector.ObjectStoreDao;
28
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
29
import eu.dnetlib.enabling.datasources.LocalOpenaireDatasourceManager;
30
import eu.dnetlib.enabling.datasources.common.Api;
31
import eu.dnetlib.enabling.datasources.common.ApiParam;
32
import eu.dnetlib.enabling.datasources.common.DsmException;
33
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
34
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
35
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
36
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
37
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
38
import eu.dnetlib.miscutils.datetime.DateUtils;
39
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
40
import eu.dnetlib.msro.workflows.util.ProgressProvider;
41
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
42

    
43
public class FixRepoMdstoreSizesJobNode extends SimpleJobNode implements ProgressProvider {
44

    
45
	@Autowired
46
	private LocalOpenaireDatasourceManager dsManager;
47

    
48
	@Autowired
49
	private UniqueServiceLocator serviceLocator;
50

    
51
	@Resource(name = "mongodbMDStoreDao")
52
	private MDStoreDao mdstoreDao;
53

    
54
	@Autowired
55
	private ObjectStoreDao objectStoreDao;
56

    
57
	private final DateUtils dateUtils = new DateUtils();
58

    
59
	private int current = 0;
60
	private int total = 0;
61

    
62
	private ISRegistryService registry;
63
	private ISLookUpService lookup;
64

    
65
	public void init(final int total) {
66
		this.current = 0;
67
		this.total = total;
68
		this.lookup = serviceLocator.getService(ISLookUpService.class);
69
		this.registry = serviceLocator.getService(ISRegistryService.class);
70
	}
71

    
72
	@Override
73
	protected String execute(final NodeToken token) throws Exception {
74
		final Set<String> list = dsManager.listManagedDatasourceIds();
75

    
76
		init(list.size());
77

    
78
		for (final String dsId : list) {
79
			current++;
80
			try {
81
				for (final Api<ApiParam> api : dsManager.getApis(dsId)) {
82
					verifyApi(dsId, api);
83
				}
84
			} catch (final Throwable e) {
85
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR + "::" + dsId, e.getMessage());
86
			}
87
		}
88

    
89
		return Arc.DEFAULT_ARC;
90
	}
91

    
92
	private void verifyApi(final String dsId, final Api<ApiParam> api)
93
			throws DsmException, ISRegistryException, IOException, ISLookUpException, MDStoreServiceException, ObjectStoreServiceException {
94

    
95
		for (final Document doc : listCollectionMdStores(dsId, api.getId())) {
96
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
97
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
98
			final int size = mdstoreDao.getMDStore(mdId).getSize();
99
			if (size != actualSize) {
100
				updateMdStoreProfile(mdId, doc, size);
101
				dsManager.setLastCollectionInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
102
			}
103
		}
104

    
105
		for (final Document doc : listTransformationMdStores(dsId, api.getId())) {
106
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
107
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
108
			final int size = mdstoreDao.getMDStore(mdId).getSize();
109
			if (size != actualSize) {
110
				updateMdStoreProfile(mdId, doc, size);
111
				dsManager.setLastAggregationInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
112
			}
113
		}
114

    
115
		for (final Document doc : listDownloadObjectStores(dsId, api.getId())) {
116
			final String objId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
117
			final int actualSize = NumberUtils.toInt(doc.valueOf("//STORE_SIZE"), 0);
118
			final int size = objectStoreDao.getObjectStore(objId).getSize();
119
			if (size != actualSize) {
120
				updateObjStoreProfile(objId, doc, size);
121
				dsManager.setLastDownloadInfo(dsId, api.getId(), objId, size, calculateLastDate(doc));
122
			}
123
		}
124
	}
125

    
126
	private Date calculateLastDate(final Document doc) {
127
		final String dateS = doc.valueOf("//LAST_STORAGE_DATE");
128
		final Date date = StringUtils.isNoneBlank(dateS) ? dateUtils.parse(dateS) : new Date();
129
		return date;
130
	}
131

    
132
	private List<Document> listCollectionMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
133
		return executeXquery("listCollectionMdStores.xquery.st", dsId, apiId);
134
	}
135

    
136
	private List<Document> listTransformationMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
137
		return executeXquery("listTransformationMdStores.xquery.st", dsId, apiId);
138
	}
139

    
140
	private List<Document> listDownloadObjectStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
141
		return executeXquery("listDownloadObjectStores.xquery.st", dsId, apiId);
142
	}
143

    
144
	private List<Document> executeXquery(final String template, final String dsId, final String apiId) throws ISLookUpException, IOException {
145
		final StringTemplate st = new StringTemplate(IOUtils.toString(getClass().getResourceAsStream(template)));
146
		st.setAttribute("dsId", dsId);
147
		st.setAttribute("apiId", apiId);
148

    
149
		final SAXReader reader = new SAXReader();
150

    
151
		return lookup.quickSearchProfile(st.toString())
152
				.stream()
153
				.map(s -> {
154
					try {
155
						return reader.read(new StringReader(s));
156
					} catch (final DocumentException e) {
157
						return null;
158
					}
159
				})
160
				.filter(Objects::nonNull)
161
				.collect(Collectors.toList());
162
	}
163

    
164
	private void updateMdStoreProfile(final String mdId, final Document doc, final int size) throws ISRegistryException {
165
		doc.selectSingleNode("//NUMBER_OF_RECORDS").setText(Integer.toString(size));
166
		registry.updateProfile(mdId, doc.asXML(), "MDStoreDSResourceType");
167
	}
168

    
169
	private void updateObjStoreProfile(final String objId, final Document doc, final int size) throws ISRegistryException {
170
		doc.selectSingleNode("//COUNT_STORE").setText(Integer.toString(size));
171
		doc.selectSingleNode("//STORE_SIZE").setText(Integer.toString(size));
172
		registry.updateProfile(objId, doc.asXML(), "ObjectStoreDSResourceType");
173
	}
174

    
175
	@Override
176
	public int getTotalValue() {
177
		return total;
178
	}
179

    
180
	@Override
181
	public int getCurrentValue() {
182
		return current;
183
	}
184

    
185
	@Override
186
	public boolean isInaccurate() {
187
		return false;
188
	}
189

    
190
}
(1-1/2)