Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.consistency;
2

    
3
import java.io.IOException;
4
import java.io.StringReader;
5
import java.util.Date;
6
import java.util.HashMap;
7
import java.util.List;
8
import java.util.Map;
9
import java.util.Objects;
10
import java.util.Set;
11
import java.util.stream.Collectors;
12

    
13
import javax.annotation.Resource;
14

    
15
import org.antlr.stringtemplate.StringTemplate;
16
import org.apache.commons.io.IOUtils;
17
import org.apache.commons.lang.math.NumberUtils;
18
import org.apache.commons.lang3.StringUtils;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.dom4j.Document;
22
import org.dom4j.DocumentException;
23
import org.dom4j.io.SAXReader;
24
import org.springframework.beans.factory.annotation.Autowired;
25

    
26
import com.googlecode.sarasvati.Arc;
27
import com.googlecode.sarasvati.NodeToken;
28

    
29
import eu.dnetlib.data.mdstore.MDStoreServiceException;
30
import eu.dnetlib.data.mdstore.modular.connector.MDStoreDao;
31
import eu.dnetlib.data.objectstore.modular.connector.ObjectStoreDao;
32
import eu.dnetlib.data.objectstore.rmi.ObjectStoreServiceException;
33
import eu.dnetlib.enabling.datasources.LocalOpenaireDatasourceManager;
34
import eu.dnetlib.enabling.datasources.common.Api;
35
import eu.dnetlib.enabling.datasources.common.ApiParam;
36
import eu.dnetlib.enabling.datasources.common.DsmException;
37
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
38
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
39
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
40
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
41
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
42
import eu.dnetlib.miscutils.datetime.DateUtils;
43
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
44
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
45
import eu.dnetlib.msro.workflows.util.ProgressProvider;
46
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
47

    
48
public class FixRepoMdstoreSizesJobNode extends SimpleJobNode implements ProgressJobNode {
49

    
50
	@Autowired
51
	private LocalOpenaireDatasourceManager dsManager;
52

    
53
	@Autowired
54
	private UniqueServiceLocator serviceLocator;
55

    
56
	@Resource(name = "mongodbMDStoreDao")
57
	private MDStoreDao mdstoreDao;
58

    
59
	@Autowired
60
	private ObjectStoreDao objectStoreDao;
61

    
62
	private final DateUtils dateUtils = new DateUtils();
63

    
64
	private int current = 0;
65
	private int total = 0;
66

    
67
	private ISRegistryService registry;
68
	private ISLookUpService lookup;
69

    
70
	private final Map<String, String> openaireIds = new HashMap<>();
71
	private boolean alwaysUpdate = false;
72

    
73
	private static final Log log = LogFactory.getLog(FixRepoMdstoreSizesJobNode.class);
74

    
75
	public void init(final int total) {
76
		this.current = 0;
77
		this.total = total;
78
		this.lookup = serviceLocator.getService(ISLookUpService.class);
79
		this.registry = serviceLocator.getService(ISRegistryService.class);
80
		try {
81
			openaireIds.putAll(lookup.quickSearchProfile(
82
					"for $x in collection('/db/DRIVER/RepositoryServiceResources/RepositoryServiceResourceType') return concat($x//DATASOURCE_ORIGINAL_ID, ' @@@ ', $x//RESOURCE_IDENTIFIER/@value)")
83
					.stream()
84
					.collect(Collectors.toMap(
85
							s -> StringUtils.substringBefore(s, "@@@").trim(),
86
							s -> StringUtils.substringAfter(s, "@@@").trim())));
87
		} catch (final ISLookUpException e) {
88
			// TODO Auto-generated catch block
89
			e.printStackTrace();
90
		}
91

    
92
	}
93

    
94
	@Override
95
	protected String execute(final NodeToken token) throws Exception {
96
		final Set<String> list = dsManager.listManagedDatasourceIds();
97

    
98
		init(list.size());
99

    
100
		for (final String dsId : list) {
101
			log.info("Processing ds: " + dsId);
102

    
103
			current++;
104

    
105
			try {
106
				for (final Api<ApiParam> api : dsManager.getApis(dsId)) {
107
					verifyApi(dsId, api);
108
				}
109
			} catch (final Throwable e) {
110
				log.error("Error processing ds: " + dsId, e);
111
				token.getEnv().setAttribute(WorkflowsConstants.SYSTEM_ERROR + "::" + dsId, e.getMessage());
112
			}
113
		}
114

    
115
		return Arc.DEFAULT_ARC;
116
	}
117

    
118
	private void verifyApi(final String dsId, final Api<ApiParam> api)
119
			throws DsmException, ISRegistryException, IOException, ISLookUpException, MDStoreServiceException, ObjectStoreServiceException {
120

    
121
		for (final Document doc : listCollectionMdStores(dsId, api.getId())) {
122
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
123
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
124
			final int size = mdstoreDao.getMDStore(mdId).getSize();
125
			if (alwaysUpdate || size != actualSize) {
126
				log.info("  -- Updating size of api " + api.getId() + ", new value = " + size);
127
				updateMdStoreProfile(mdId, doc, size);
128
				dsManager.setLastCollectionInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
129
			}
130
		}
131

    
132
		for (final Document doc : listTransformationMdStores(dsId, api.getId())) {
133
			final String mdId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
134
			final int actualSize = NumberUtils.toInt(doc.valueOf("//NUMBER_OF_RECORDS"), 0);
135
			final int size = mdstoreDao.getMDStore(mdId).getSize();
136
			if (alwaysUpdate || size != actualSize) {
137
				updateMdStoreProfile(mdId, doc, size);
138
				dsManager.setLastAggregationInfo(dsId, api.getId(), mdId, size, calculateLastDate(doc));
139
			}
140
		}
141

    
142
		for (final Document doc : listDownloadObjectStores(dsId, api.getId())) {
143
			final String objId = doc.valueOf("//RESOURCE_IDENTIFIER/@value");
144
			final int actualSize = NumberUtils.toInt(doc.valueOf("//STORE_SIZE"), 0);
145
			final int size = objectStoreDao.getObjectStore(objId).getSize();
146
			if (alwaysUpdate || size != actualSize) {
147
				updateObjStoreProfile(objId, doc, size);
148
				dsManager.setLastDownloadInfo(dsId, api.getId(), objId, size, calculateLastDate(doc));
149
			}
150
		}
151
	}
152

    
153
	private Date calculateLastDate(final Document doc) {
154
		final String dateS = doc.valueOf("//LAST_STORAGE_DATE");
155
		final Date date = StringUtils.isNoneBlank(dateS) ? dateUtils.parse(dateS) : new Date();
156
		return date;
157
	}
158

    
159
	private List<Document> listCollectionMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
160
		return executeXquery("listCollectionMdStores.xquery.st", dsId, apiId);
161
	}
162

    
163
	private List<Document> listTransformationMdStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
164
		return executeXquery("listTransformationMdStores.xquery.st", dsId, apiId);
165
	}
166

    
167
	private List<Document> listDownloadObjectStores(final String dsId, final String apiId) throws ISLookUpException, IOException {
168
		return executeXquery("listDownloadObjectStores.xquery.st", dsId, apiId);
169
	}
170

    
171
	private List<Document> executeXquery(final String template, final String dsId, final String apiId) throws ISLookUpException, IOException {
172
		final StringTemplate st = new StringTemplate(IOUtils.toString(getClass().getResourceAsStream(template)));
173
		st.setAttribute("dsId", openaireIds.get(dsId));
174
		st.setAttribute("apiId", apiId);
175

    
176
		final SAXReader reader = new SAXReader();
177

    
178
		return lookup.quickSearchProfile(st.toString())
179
				.stream()
180
				.map(s -> {
181
					try {
182
						return reader.read(new StringReader(s));
183
					} catch (final DocumentException e) {
184
						return null;
185
					}
186
				})
187
				.filter(Objects::nonNull)
188
				.collect(Collectors.toList());
189
	}
190

    
191
	private void updateMdStoreProfile(final String mdId, final Document doc, final int size) throws ISRegistryException {
192
		doc.selectSingleNode("//NUMBER_OF_RECORDS").setText(Integer.toString(size));
193
		registry.updateProfile(mdId, doc.asXML(), "MDStoreDSResourceType");
194
	}
195

    
196
	private void updateObjStoreProfile(final String objId, final Document doc, final int size) throws ISRegistryException {
197
		doc.selectSingleNode("//COUNT_STORE").setText(Integer.toString(size));
198
		doc.selectSingleNode("//STORE_SIZE").setText(Integer.toString(size));
199
		registry.updateProfile(objId, doc.asXML(), "ObjectStoreDSResourceType");
200
	}
201

    
202
	public boolean isAlwaysUpdate() {
203
		return alwaysUpdate;
204
	}
205

    
206
	public void setAlwaysUpdate(final boolean alwaysUpdate) {
207
		this.alwaysUpdate = alwaysUpdate;
208
	}
209

    
210
	@Override
211
	public ProgressProvider getProgressProvider() {
212
		return new ProgressProvider() {
213

    
214
			@Override
215
			public int getTotalValue() {
216
				return total;
217
			}
218

    
219
			@Override
220
			public int getCurrentValue() {
221
				return current;
222
			}
223

    
224
			@Override
225
			public boolean isInaccurate() {
226
				return false;
227
			}
228
		};
229
	}
230

    
231
}
(1-1/2)