Project

General

Profile

1
package eu.dnetlib.openaire.common;
2

    
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Queue;
8
import java.util.concurrent.LinkedBlockingQueue;
9
import java.util.function.Function;
10
import java.util.stream.Collectors;
11

    
12
import com.google.common.collect.Lists;
13
import com.google.common.escape.Escaper;
14
import com.google.common.xml.XmlEscapers;
15
import eu.dnetlib.OpenaireExporterConfig;
16
import eu.dnetlib.enabling.datasources.common.DsmException;
17
import eu.dnetlib.enabling.datasources.common.DsmForbiddenException;
18
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
19
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
20
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
21
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
22
import eu.dnetlib.openaire.dsm.dao.utils.IndexDsInfo;
23
import eu.dnetlib.openaire.context.Context;
24
import eu.dnetlib.openaire.context.ContextMappingUtils;
25
import eu.dnetlib.openaire.dsm.domain.ApiDetails;
26
import eu.dnetlib.openaire.dsm.domain.DatasourceDetails;
27
import org.apache.commons.io.IOUtils;
28
import org.apache.commons.lang3.StringUtils;
29
import org.apache.commons.logging.Log;
30
import org.apache.commons.logging.LogFactory;
31
import org.apache.http.HttpStatus;
32
import org.springframework.beans.factory.annotation.Autowired;
33
import org.springframework.cache.annotation.CacheEvict;
34
import org.springframework.cache.annotation.Cacheable;
35
import org.springframework.core.io.ClassPathResource;
36
import org.springframework.stereotype.Component;
37

    
38
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.asRepositoryInterfce;
39
import static eu.dnetlib.openaire.dsm.dao.utils.DsmMappingUtils.asRepositoryProfile;
40
import static eu.dnetlib.openaire.common.Utils.escape;
41

    
42
/**
43
 * Created by claudio on 20/10/2016.
44
 */
45
@Component
46
public class ISClientImpl implements ISClient {
47

    
48
 	private static final Log log = LogFactory.getLog(ISClientImpl.class);
49

    
50
	@Autowired
51
	private OpenaireExporterConfig config;
52

    
53
	@Autowired
54
	private ISLookUpService isLookUpService;
55

    
56
	@Autowired
57
	private ISRegistryService isRegistryService;
58

    
59
	@Autowired
60
	private OperationManager operationManager;
61

    
62
	@Override
63
	@Cacheable("indexdsinfo-cache")
64
	public IndexDsInfo calculateCurrentIndexDsInfo() throws DsmException {
65
		log.warn("calculateCurrentIndexDsInfo(): not using cache");
66
		final String[] arr;
67
		try {
68
			arr = _isLookUp(_getQuery(config.getFindIndexDsInfo())).split("@@@");
69
			return new IndexDsInfo(
70
				_isLookUp(_getQuery(config.getFindSolrIndexUrl())),
71
				arr[0].trim(), arr[1].trim(), arr[2].trim());
72
		} catch (IOException | ISLookUpException e) {
73
			throw new DsmException("unable fetch index DS information from IS");
74
		}
75
	}
76

    
77
	@Override
78
	@Cacheable("objectstoreid-cache")
79
	public String getObjectStoreId(final String dsId, final Queue<Throwable> errors) throws IOException {
80
		log.warn(String.format("getObjectStoreId(%s): not using cache", dsId));
81
		final String xqueryTemplate = _getQuery(config.getFindObjectStore());
82
		try {
83
			return _isLookUp(String.format(xqueryTemplate, dsId));
84
		} catch (ISLookUpException e) {
85
			errors.add(new DsmException("unable to find objectStore for datasource " + dsId));
86
			return "";
87
		}
88
	}
89

    
90
	@Override
91
	@Cacheable("context-cache")
92
	public Map<String, Context> getFunderContextMap() throws IOException {
93
		return _processContext(_getQuery(config.getFindFunderContexts()));
94
	}
95

    
96
	@Override
97
	@Cacheable("context-cache")
98
	public Map<String, Context> getCommunityContextMap() throws IOException {
99
		return _processContext(_getQuery(config.getFindCommunityContexts()));
100
	}
101

    
102
	@Override
103
	@Cacheable("context-cache")
104
	public Map<String, Context> getContextMap() throws IOException {
105
		return _processContext(_getQuery(config.getFindContextProfiles()));
106
	}
107

    
108
	@Override
109
	@CacheEvict(value = "context-cache", allEntries = true)
110
	public void updateContextParam(final String id, final String name, final String value) {
111
		final Queue<Throwable> errors = Lists.newLinkedList();
112
		_quickSeachProfile(getXQuery(id, name, value), errors);
113

    
114
		if (!errors.isEmpty()) {
115
			errors.forEach(e -> log.error(e));
116
		}
117
	}
118

    
119
	@Override
120
	@CacheEvict(value = "context-cache", allEntries = true)
121
	public void updateContextAttribute(final String id, final String name, final String value) {
122
		final Queue<Throwable> errors = Lists.newLinkedList();
123
		final Escaper esc = XmlEscapers.xmlAttributeEscaper();
124
		_quickSeachProfile(String.format(
125
				"update value collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" +
126
						"/RESOURCE_PROFILE/BODY/CONFIGURATION/context[./@id = '%s']/@%s with '%s'", id, name, escape(esc, value)),
127
				errors);
128

    
129
		if (!errors.isEmpty()) {
130
			errors.forEach(e -> log.error(e));
131
		}
132
	}
133

    
134
	@Override
135
	@CacheEvict(value = "context-cache", allEntries = true)
136
	public void addConcept(final String id, final String categoryId, final String data) {
137
		final Queue<Throwable> errors = Lists.newLinkedList();
138
		_quickSeachProfile(String.format(
139
				"update insert %s into collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" +
140
				"/RESOURCE_PROFILE/BODY/CONFIGURATION/context[./@id = '%s']/category[./@id = '%s']", data, id, categoryId), errors);
141

    
142
		if (!errors.isEmpty()) {
143
			errors.forEach(e -> log.error(e));
144
		}
145
	}
146

    
147
	@Override
148
	@CacheEvict(value = "context-cache", allEntries = true)
149
	public void removeConcept(final String id, final String categoryId, final String conceptId) {
150
		final Queue<Throwable> errors = Lists.newLinkedList();
151
		_quickSeachProfile(String.format(
152
				"for $concept in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" +
153
						"/RESOURCE_PROFILE/BODY/CONFIGURATION/context[./@id = '%s']" +
154
						"/category[./@id = '%s']/concept[./@id = '%s']" +
155
						"return update delete $concept", id, categoryId, conceptId),
156
				errors);
157
		if (!errors.isEmpty()) {
158
			errors.forEach(e -> log.error(e.getMessage()));
159
		}
160
	}
161

    
162
	@Override
163
	public void updateDatasourceFields(final String dsId, final Map<String, String> changes) {
164
		final Queue<Throwable> errors = Lists.newLinkedList();
165
		operationManager.addOperation(() -> {
166
			Thread.currentThread().setName("update-ds:" + dsId);
167
			changes.forEach((xpath, value) -> {
168
				try {
169
					_isLookUp(String.format(
170
							"for $x in collection('/db/DRIVER/RepositoryServiceResources/RepositoryServiceResourceType')\n" +
171
							"where $x/RESOURCE_PROFILE/BODY/CONFIGURATION/DATASOURCE_ORIGINAL_ID[@provenance='OPENAIRE']/text() = '%s'\n" +
172
							"return update value $x%s with '%s'", dsId, xpath, value));
173
				} catch (ISLookUpException e) {
174
					errors.add(e);
175
				}
176
			});
177
		});
178

    
179
		if (!errors.isEmpty()) {
180
			errors.forEach(e -> log.error(e.getMessage()));
181
		}
182
	}
183

    
184
	@Override
185
	public void addAPIAttribute(final String dsId, final String apiId, final Map<String, String> changes) {
186
		final Queue<Throwable> errors = Lists.newLinkedList();
187
		operationManager.addOperation(() -> {
188
			Thread.currentThread().setName("update-api:" + dsId);
189
			changes.forEach((xpath, value) -> {
190
				try {
191
					final String attribute = StringUtils.substringAfter(xpath, "@");
192
					final String parentElement = StringUtils.substringBeforeLast(xpath, "/");
193
					_isLookUp(String.format(
194
							"let $x:=/RESOURCE_PROFILE/BODY/CONFIGURATION/DATASOURCE_ORIGINAL_ID[@provenance='OPENAIRE' and ./text() = '%s']\n" +
195
									"return update insert attribute %s {'%s'} into $x/..//INTERFACE[./@id = '%s']%s",
196
							dsId, attribute, value, apiId, parentElement));
197
				} catch (ISLookUpException e) {
198
					errors.add(e);
199
				}
200
			});
201
		});
202

    
203
		if (!errors.isEmpty()) {
204
			errors.forEach(e -> log.error(e.getMessage()));
205
		}
206
	}
207

    
208
	@Override
209
	public void updateAPIField(final String dsId, final String apiId, final Map<String, String> changes) {
210
		final Queue<Throwable> errors = Lists.newLinkedList();
211
		operationManager.addOperation(() -> {
212
			Thread.currentThread().setName("update-api:" + dsId);
213
			changes.forEach((xpath, value) -> {
214
				try {
215
					_isLookUp(String.format(
216
							"let $x:=/RESOURCE_PROFILE/BODY/CONFIGURATION/DATASOURCE_ORIGINAL_ID[@provenance='OPENAIRE' and ./text() = '%s']\n" +
217
							"return update value $x/..//INTERFACE[./@id = '%s']%s with '%s'",
218
							dsId, apiId, xpath, value));
219
				} catch (ISLookUpException e) {
220
					errors.add(e);
221
				}
222
			});
223
		});
224

    
225
		if (!errors.isEmpty()) {
226
			errors.forEach(e -> log.error(e.getMessage()));
227
		}
228
	}
229

    
230
	@Override
231
	public void registerDS(final DatasourceDetails d) {
232
		final Queue<Throwable> errors = Lists.newLinkedList();
233
		operationManager.addOperation(() -> {
234
			Thread.currentThread().setName("save-ds:" + d.getId());
235
			try {
236
				final String id = isRegistryService.registerProfile(asRepositoryProfile(d));
237
				log.debug(String.format("registered DS profile %s", id));
238
			} catch (ISRegistryException e) {
239
				errors.add(e);
240
			}
241
		});
242
		if (!errors.isEmpty()) {
243
			errors.forEach(e -> log.error(e.getMessage()));
244
		}
245
	}
246

    
247
	@Override
248
	public void registerAPI(final ApiDetails api) {
249
		final Queue<Throwable> errors = Lists.newLinkedList();
250
		operationManager.addOperation(() -> {
251
			Thread.currentThread().setName("save-api:" + api.getId());
252
			try {
253
				final String dsId = api.getDatasource();
254
				final String iface = asRepositoryInterfce(api);
255
				_isLookUp(String.format(
256
						"let $x:=/RESOURCE_PROFILE/BODY/CONFIGURATION/DATASOURCE_ORIGINAL_ID[@provenance='OPENAIRE' and ./text() = '%s']\n" +
257
						"return update insert %s into $x/../INTERFACES", dsId, iface));
258

    
259
				log.debug(String.format("registered API %s", api.getId()));
260
			} catch (ISLookUpException e) {
261
				errors.add(e);
262
			}
263
		});
264
		if (!errors.isEmpty()) {
265
			errors.forEach(e -> log.error(e.getMessage()));
266
		}
267
	}
268

    
269
	@Override
270
	public void removeAPI(final String apiId) throws DsmForbiddenException {
271
		final Queue<Throwable> errors = Lists.newLinkedList();
272

    
273
		try {
274
			final List<String> metaWorkflows = _quickSeachProfile(String.format(
275
					"distinct-values(for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType')\n" +
276
					"where $x/RESOURCE_PROFILE/BODY/DATAPROVIDER[./@interface = '%s']\n" +
277
					"return $x/RESOURCE_PROFILE/BODY/DATAPROVIDER/@id/string())", apiId),
278
					errors);
279
			if (!metaWorkflows.isEmpty()) {
280
				throw new DsmForbiddenException(
281
						HttpStatus.SC_FORBIDDEN,
282
						String.format("cannot remove api '%s', it has workflows associated", apiId));
283
			}
284
			_isLookUp(String.format(
285
					"update delete /RESOURCE_PROFILE/BODY/CONFIGURATION/INTERFACES/INTERFACE[./@id = '%s']", apiId));
286

    
287
			log.debug(String.format("deleted API %s", apiId));
288
		} catch (ISLookUpException e) {
289
			errors.add(e);
290
		}
291

    
292
		if (!errors.isEmpty()) {
293
			errors.forEach(e -> log.error(e.getMessage()));
294
		}
295
	}
296

    
297
	/// HELPERS
298

    
299
	private String getXQuery(final String id, final String name, final String value) {
300
		final Escaper esc = XmlEscapers.xmlContentEscaper();
301
		if (StringUtils.isNotBlank(value)) {
302
			return String.format(
303
					"update replace collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" +
304
							"/RESOURCE_PROFILE/BODY/CONFIGURATION/context[./@id = '%s']/param[./@name = '%s'] with <param name='%s'>%s</param>", id, name, name,
305
					escape(esc, value));
306
		} else {
307
			return String.format(
308
					"update replace collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" +
309
							"/RESOURCE_PROFILE/BODY/CONFIGURATION/context[./@id = '%s']/param[./@name = '%s'] with <param name='%s'/>", id, name, name);
310
		}
311
	}
312

    
313
	private Map<String, Context> _processContext(final String xquery) throws IOException {
314
		return _processContext(new LinkedBlockingQueue<>(), xquery);
315
	}
316

    
317
	private Map<String, Context> _processContext(final Queue<Throwable> errors, final String xquery) throws IOException {
318
		try {
319
			return getContextProfiles(errors, xquery).stream()
320
					.filter(StringUtils::isNotBlank)
321
					.map(s -> ContextMappingUtils.parseContext(s, errors))
322
					.collect(Collectors.toMap(
323
							Context::getId,
324
							Function.identity(),
325
							(c1, c2) -> {
326
								log.warn(String.format("found duplicate context profile '%s'", c1.getId()));
327
								return c1;
328
							}));
329
		} finally {
330
			if (!errors.isEmpty()) {
331
				log.error(errors);
332
				errors.forEach(e -> e.printStackTrace());
333
			}
334
		}
335
	}
336

    
337
	private List<String> getContextProfiles(final Queue<Throwable> errors, final String xquery) throws IOException {
338
		log.warn("getContextProfiles(): not using cache");
339
		return _quickSeachProfile(xquery, errors);
340
	}
341

    
342
	private String _getQuery(final ClassPathResource resource) throws IOException {
343
		return IOUtils.toString(resource.getInputStream(), Charset.defaultCharset());
344
	}
345

    
346
	private String _isLookUp(final String xquery) throws ISLookUpException {
347
		log.debug(String.format("running xquery:\n%s", xquery));
348
		final String res = isLookUpService.getResourceProfileByQuery(xquery);
349
		//log.debug(String.format("query result: %s", res));
350
		return res;
351
	}
352

    
353
	private List<String> _quickSeachProfile(final String xquery, final Queue<Throwable> errors) {
354
		final List<String> res = Lists.newArrayList();
355
		try {
356
			log.debug(String.format("running xquery:\n%s", xquery));
357
			final List<String> list = isLookUpService.quickSearchProfile(xquery);
358
			if (list != null) {
359
				res.addAll(list);
360
			}
361
			log.debug(String.format("query result size: %s", res.size()));
362
		} catch (Throwable e) {
363
			errors.add(e);
364
			return Lists.newArrayList();
365
		} finally {
366
			return res;
367
		}
368
	}
369

    
370
	@CacheEvict(cacheNames = { "context-cache", "indexdsinfo-cache", "objectstoreid-cache" }, allEntries = true)
371
	public void dropCache() {
372
		log.info("dropped dsManager IS cache");
373
	}
374

    
375
}
(6-6/9)