Project

General

Profile

1
package eu.dnetlib.openaire.exporter.datasource.clients;
2

    
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Queue;
8
import java.util.concurrent.LinkedBlockingQueue;
9
import java.util.function.Function;
10
import java.util.stream.Collectors;
11

    
12
import com.google.common.collect.Lists;
13
import eu.dnetlib.OpenaireExporterConfig;
14
import eu.dnetlib.enabling.datasources.common.DsmException;
15
import eu.dnetlib.enabling.datasources.common.DsmForbiddenException;
16
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
17
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
18
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryException;
19
import eu.dnetlib.enabling.is.registry.rmi.ISRegistryService;
20
import eu.dnetlib.openaire.exporter.OperationManager;
21
import eu.dnetlib.openaire.exporter.datasource.clients.utils.IndexDsInfo;
22
import eu.dnetlib.openaire.exporter.funders.context.Context;
23
import eu.dnetlib.openaire.exporter.funders.context.MappingUtils;
24
import eu.dnetlib.openaire.exporter.model.datasource.ApiDetails;
25
import eu.dnetlib.openaire.exporter.model.datasource.DatasourceDetails;
26
import org.apache.commons.io.IOUtils;
27
import org.apache.commons.lang3.StringUtils;
28
import org.apache.commons.logging.Log;
29
import org.apache.commons.logging.LogFactory;
30
import org.apache.http.HttpStatus;
31
import org.springframework.beans.factory.annotation.Autowired;
32
import org.springframework.cache.annotation.CacheEvict;
33
import org.springframework.cache.annotation.Cacheable;
34
import org.springframework.core.io.ClassPathResource;
35
import org.springframework.stereotype.Component;
36

    
37
import static eu.dnetlib.openaire.exporter.model.ConversionUtils.asRepositoryInterfce;
38
import static eu.dnetlib.openaire.exporter.model.ConversionUtils.asRepositoryProfile;
39

    
40
/**
41
 * Created by claudio on 20/10/2016.
42
 */
43
@Component
44
public class ISClientImpl implements ISClient {
45

    
46
 	private static final Log log = LogFactory.getLog(ISClientImpl.class);
47

    
48
	@Autowired
49
	private OpenaireExporterConfig config;
50

    
51
	@Autowired
52
	private ISLookUpService isLookUpService;
53

    
54
	@Autowired
55
	private ISRegistryService isRegistryService;
56

    
57
	@Autowired
58
	private OperationManager operationManager;
59

    
60
	@Override
61
	@Cacheable("indexdsinfo-cache")
62
	public IndexDsInfo calculateCurrentIndexDsInfo() throws DsmException {
63
		log.warn("calculateCurrentIndexDsInfo(): not using cache");
64
		final String[] arr;
65
		try {
66
			arr = _isLookUp(_getQuery(config.getFindIndexDsInfo())).split("@@@");
67
			return new IndexDsInfo(
68
				_isLookUp(_getQuery(config.getFindSolrIndexUrl())),
69
				arr[0].trim(), arr[1].trim(), arr[2].trim());
70
		} catch (IOException | ISLookUpException e) {
71
			throw new DsmException("unable fetch index DS information from IS");
72
		}
73
	}
74

    
75
	@Override
76
	@Cacheable("objectstoreid-cache")
77
	public String getObjectStoreId(final String dsId, final Queue<Throwable> errors) throws IOException {
78
		log.warn(String.format("getObjectStoreId(%s): not using cache", dsId));
79
		final String xqueryTemplate = _getQuery(config.getFindObjectStore());
80
		try {
81
			return _isLookUp(String.format(xqueryTemplate, dsId));
82
		} catch (ISLookUpException e) {
83
			errors.add(new DsmException("unable to find objectStore for datasource " + dsId));
84
			return "";
85
		}
86
	}
87

    
88
	@Override
89
	@Cacheable("context-cache")
90
	public Map<String, Context> getFunderContextMap() throws IOException {
91
		return _processContext(
92
				new LinkedBlockingQueue<>(),
93
				_getQuery(config.getFindFunderContexts()));
94
	}
95

    
96
	@Override
97
	@Cacheable("context-cache")
98
	public Map<String, Context> getCommunityContextMap() throws IOException {
99
		return _processContext(
100
				new LinkedBlockingQueue<>(),
101
				_getQuery(config.getFindCommunityContexts()));
102
	}
103

    
104
	@Override
105
	public void addConcept(final String id, final String categoryId, final String data) {
106
		final Queue<Throwable> errors = Lists.newLinkedList();
107
		final String xquery = String.format(
108
				"update insert %s into collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" +
109
						"/RESOURCE_PROFILE/BODY/CONFIGURATION/context[./@id = '%s']/category[./@id = '%s']", data, id, categoryId);
110
		_quickSeachProfile(xquery, errors);
111

    
112
		if (!errors.isEmpty()) {
113
			errors.forEach(e -> log.error(e));
114
		}
115
	}
116

    
117
	@Override
118
	public void removeConcept(final String id, final String categoryId, final String conceptId) {
119
		final Queue<Throwable> errors = Lists.newLinkedList();
120
		_quickSeachProfile(String.format(
121
				"for $concept in collection('/db/DRIVER/ContextDSResources/ContextDSResourceType')" +
122
						"/RESOURCE_PROFILE/BODY/CONFIGURATION/context[./@id = '%s']" +
123
						"/category[./@id = '%s']/concept[./@id = '%s']" +
124
						"return update delete $concept", id, categoryId, conceptId),
125
				errors);
126
		if (!errors.isEmpty()) {
127
			errors.forEach(e -> log.error(e.getMessage()));
128
		}
129
	}
130

    
131
	@Override
132
	public void updateDatasourceFields(final String dsId, final Map<String, String> changes) {
133
		final Queue<Throwable> errors = Lists.newLinkedList();
134
		operationManager.addOperation(() -> {
135
			Thread.currentThread().setName("update-ds:" + dsId);
136
			changes.forEach((xpath, value) -> {
137
				try {
138
					_isLookUp(String.format(
139
							"for $x in collection('/db/DRIVER/RepositoryServiceResources/RepositoryServiceResourceType')\n" +
140
							"where $x/RESOURCE_PROFILE/BODY/CONFIGURATION/DATASOURCE_ORIGINAL_ID[@provenance='OPENAIRE']/text() = '%s'\n" +
141
							"return update value $x%s with '%s'", dsId, xpath, value));
142
				} catch (ISLookUpException e) {
143
					errors.add(e);
144
				}
145
			});
146
		});
147

    
148
		if (!errors.isEmpty()) {
149
			errors.forEach(e -> log.error(e.getMessage()));
150
		}
151
	}
152

    
153
	@Override
154
	public void updateAPIField(final String dsId, final String apiId, final Map<String, String> changes) {
155
		final Queue<Throwable> errors = Lists.newLinkedList();
156
		operationManager.addOperation(() -> {
157
			Thread.currentThread().setName("update-api:" + dsId);
158
			changes.forEach((xpath, value) -> {
159
				try {
160
					_isLookUp(String.format(
161
							"let $x:=/RESOURCE_PROFILE/BODY/CONFIGURATION/DATASOURCE_ORIGINAL_ID[@provenance='OPENAIRE' and ./text() = '%s']\n" +
162
							"return update value $x/..//INTERFACE[./@id = '%s']%s with '%s'",
163
							dsId, apiId, xpath, value));
164
				} catch (ISLookUpException e) {
165
					errors.add(e);
166
				}
167
			});
168
		});
169

    
170
		if (!errors.isEmpty()) {
171
			errors.forEach(e -> log.error(e.getMessage()));
172
		}
173
	}
174

    
175
	@Override
176
	public void registerDS(final DatasourceDetails d) {
177
		final Queue<Throwable> errors = Lists.newLinkedList();
178
		operationManager.addOperation(() -> {
179
			Thread.currentThread().setName("save-ds:" + d.getId());
180
			try {
181
				final String id = isRegistryService.registerProfile(asRepositoryProfile(d));
182
				log.debug(String.format("registered DS profile %s", id));
183
			} catch (ISRegistryException e) {
184
				errors.add(e);
185
			}
186
		});
187
		if (!errors.isEmpty()) {
188
			errors.forEach(e -> log.error(e.getMessage()));
189
		}
190
	}
191

    
192
	@Override
193
	public void registerAPI(final ApiDetails api) {
194
		final Queue<Throwable> errors = Lists.newLinkedList();
195
		operationManager.addOperation(() -> {
196
			Thread.currentThread().setName("save-api:" + api.getId());
197
			try {
198
				final String dsId = api.getDatasource();
199
				final String iface = asRepositoryInterfce(api);
200
				_isLookUp(String.format(
201
						"let $x:=/RESOURCE_PROFILE/BODY/CONFIGURATION/DATASOURCE_ORIGINAL_ID[@provenance='OPENAIRE' and ./text() = '%s']\n" +
202
						"return update insert %s into $x/../INTERFACES", dsId, iface));
203

    
204
				log.debug(String.format("registered API %s", api.getId()));
205
			} catch (ISLookUpException e) {
206
				errors.add(e);
207
			}
208
		});
209
		if (!errors.isEmpty()) {
210
			errors.forEach(e -> log.error(e.getMessage()));
211
		}
212
	}
213

    
214
	@Override
215
	public void removeAPI(final String apiId) throws DsmForbiddenException {
216
		final Queue<Throwable> errors = Lists.newLinkedList();
217

    
218
		try {
219
			final List<String> metaWorkflows = _quickSeachProfile(String.format(
220
					"distinct-values(for $x in collection('/db/DRIVER/MetaWorkflowDSResources/MetaWorkflowDSResourceType')\n" +
221
					"where $x/RESOURCE_PROFILE/BODY/DATAPROVIDER[./@interface = '%s']\n" +
222
					"return $x/RESOURCE_PROFILE/BODY/DATAPROVIDER/@id/string())", apiId),
223
					errors);
224
			if (!metaWorkflows.isEmpty()) {
225
				throw new DsmForbiddenException(
226
						HttpStatus.SC_FORBIDDEN,
227
						String.format("cannot remove api '%s', it has workflows associated", apiId));
228
			}
229
			_isLookUp(String.format(
230
					"update delete /RESOURCE_PROFILE/BODY/CONFIGURATION/INTERFACES/INTERFACE[./@id = '%s']", apiId));
231

    
232
			log.debug(String.format("deleted API %s", apiId));
233
		} catch (ISLookUpException e) {
234
			errors.add(e);
235
		}
236

    
237
		if (!errors.isEmpty()) {
238
			errors.forEach(e -> log.error(e.getMessage()));
239
		}
240
	}
241

    
242
	/// HELPERS
243

    
244
	private Map<String, Context> _processContext(final Queue<Throwable> errors, final String xquery) throws IOException {
245
		try {
246
			return getContextProfiles(errors, xquery).stream()
247
					.filter(StringUtils::isNotBlank)
248
					.map(s -> MappingUtils.parseContext(s, errors))
249
					.collect(Collectors.toMap(
250
							Context::getId,
251
							Function.identity()));
252
		} finally {
253
			if (!errors.isEmpty()) {
254
				log.error(errors);
255
				errors.forEach(e -> e.printStackTrace());
256
			}
257
		}
258
	}
259

    
260
	private List<String> getContextProfiles(final Queue<Throwable> errors, final String xquery) throws IOException {
261
		log.warn("getContextProfiles(): not using cache");
262
		return _quickSeachProfile(xquery, errors);
263
	}
264

    
265
	private String _getQuery(final ClassPathResource resource) throws IOException {
266
		return IOUtils.toString(resource.getInputStream(), Charset.defaultCharset());
267
	}
268

    
269
	private String _isLookUp(final String xquery) throws ISLookUpException {
270
		log.debug(String.format("running xquery:\n%s", xquery));
271
		final String res = isLookUpService.getResourceProfileByQuery(xquery);
272
		//log.debug(String.format("query result: %s", res));
273
		return res;
274
	}
275

    
276
	private List<String> _quickSeachProfile(final String xquery, final Queue<Throwable> errors) {
277
		final List<String> res = Lists.newArrayList();
278
		try {
279
			log.debug(String.format("running xquery:\n%s", xquery));
280
			final List<String> list = isLookUpService.quickSearchProfile(xquery);
281
			if (list != null) {
282
				res.addAll(list);
283
			}
284
			log.debug(String.format("query result size: %s", res.size()));
285
		} catch (Throwable e) {
286
			errors.add(e);
287
			return Lists.newArrayList();
288
		} finally {
289
			return res;
290
		}
291
	}
292

    
293
	@CacheEvict(cacheNames = "datasources-is-cache", allEntries = true)
294
	public void dropCache() {
295
		log.info("dropped dsManager IS cache");
296
	}
297

    
298
}
(6-6/13)