Project

General

Profile

1
package eu.dnetlib.openaire.directindex.api;
2

    
3
import com.google.common.collect.Lists;
4
import com.google.common.collect.Maps;
5
import com.google.gson.Gson;
6
import eu.dnetlib.common.rmi.DNetRestDocumentation;
7
import eu.dnetlib.data.index.CloudIndexClient;
8
import eu.dnetlib.data.index.CloudIndexClientException;
9
import eu.dnetlib.data.index.CloudIndexClientFactory;
10
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
11
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13
import eu.dnetlib.openaire.directindex.objects.ResultEntry;
14
import eu.dnetlib.openaire.directindex.utils.OafToIndexRecordFactory;
15
import org.apache.commons.io.IOUtils;
16
import org.apache.commons.lang.exception.ExceptionUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.velocity.app.VelocityEngine;
20
import org.springframework.beans.factory.annotation.Value;
21
import org.springframework.core.io.ClassPathResource;
22
import org.springframework.http.HttpStatus;
23
import org.springframework.stereotype.Controller;
24
import org.springframework.web.bind.annotation.*;
25

    
26
import javax.annotation.Resource;
27
import java.io.IOException;
28
import java.util.List;
29
import java.util.Map;
30
import java.util.concurrent.*;
31

    
32
/**
33
 * Created by michele on 11/11/15.
34
 */
35
@Controller
36
@DNetRestDocumentation
37
public class OpenaireResultSubmitter {
38

    
39
	private static final Log log = LogFactory.getLog(OpenaireResultSubmitter.class);
40

    
41
	public class IndexDsInfo {
42

    
43
		private final String indexBaseUrl;
44
		private final String indexDsId;
45
		private final String format;
46
		private final String coll;
47

    
48
		public IndexDsInfo(final String indexBaseUrl, final String indexDsId, final String format, final String coll) {
49
			this.indexBaseUrl = indexBaseUrl;
50
			this.indexDsId = indexDsId;
51
			this.format = format;
52
			this.coll = coll;
53
		}
54

    
55
		public String getIndexBaseUrl() {
56
			return indexBaseUrl;
57
		}
58

    
59
		public String getIndexDsId() {
60
			return indexDsId;
61
		}
62

    
63
		public String getFormat() {
64
			return format;
65
		}
66

    
67
		public String getColl() {
68
			return coll;
69
		}
70

    
71
	}
72

    
73
	@Value(value = "oaf.schema.location")
74
	private String oafSchemaLocation;
75

    
76
	@Resource
77
	private UniqueServiceLocator serviceLocator;
78

    
79
	@Resource
80
	private OafToIndexRecordFactory oafToIndexRecordFactory;
81

    
82
	@Resource
83
	private RecentResultsQueue recentResultsQueue;
84

    
85
	@Resource(name = "openaireplusApisVelocityEngine")
86
	private VelocityEngine velocityEngine;
87

    
88
	@Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
89
	private ClassPathResource findSolrIndexUrl;
90

    
91
	@Value(value = "${openaire.api.directindex.findIndexDsInfo.xquery}")
92
	private ClassPathResource findIndexDsInfo;
93

    
94
	/**
95
	 * Autocommit feature activation flag
96
	 */
97
	@Value(value = "${openaire.api.directindex.autocommit.active}")
98
	private boolean autocommitactive;
99

    
100
	/**
101
	 * Autocommit frequency (Seconds)
102
	 */
103
	@Value(value = "${openaire.api.directindex.autocommit.frequency}")
104
	private long commitfrquency = 60;
105

    
106
	/**
107
	 * Minimum autocommit frequency
108
	 */
109
	private static long COMMIT_FREQUENCY_MINVALUE = 5;
110

    
111
	/**
112
	 * Autocommit executor service
113
	 */
114
	private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
115

    
116
	/**
117
	 * Autocommit task handle
118
	 */
119
	private ScheduledFuture task = null;
120

    
121
	public OpenaireResultSubmitter() {
122

    
123
		updateSchedule();
124
	}
125

    
126
	private void updateSchedule() {
127

    
128
		if (task != null) {
129
			task.cancel(true);
130
		}
131
		task = executor.scheduleAtFixedRate(doCommit(), 0, getCommitfrquency(), TimeUnit.SECONDS);
132
	}
133

    
134
	private Runnable doCommit() {
135
		return () -> {
136
			//TODO perform the commit only if there are pending documents, e.g. query /solr/admin/metrics?group=core&prefix=UPDATE.updateHandler to inspect if there are any.
137
			if (isAutocommitactive()) {
138
				try {
139
					for (Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
140
						final IndexDsInfo i = e.getKey();
141
						try(final CloudIndexClient client = e.getValue()) {
142

    
143
							log.info("performing commit on " + i.getColl());
144
							client.commit();
145
						}
146
					}
147
				} catch (DirecIndexApiException | CloudIndexClientException | IOException e) {
148
					//TODO perhaps we could send an email to notify this failure.
149
					throw new RuntimeException(e);
150
				}
151
			}
152
		};
153
	}
154

    
155

    
156
	@RequestMapping(value = { "/api/admin/autocommit/active" }, method = RequestMethod.GET)
157
	public @ResponseBody Boolean getAutocommit() throws DirecIndexApiException {
158
		return isAutocommitactive();
159
	}
160

    
161
	@RequestMapping(value = { "/api/admin/autocommit/active" }, method = RequestMethod.POST)
162
	public @ResponseBody Boolean setAutocommit(@RequestParam(value = "active", required = true) final Boolean active) throws DirecIndexApiException {
163
		setAutocommitactive(active);
164
		log.info(String.format("automatic commit, active '%s', frequency '%s'", isAutocommitactive(), getCommitfrquency()));
165
		return isAutocommitactive();
166
	}
167

    
168
	@RequestMapping(value = { "/api/admin/autocommit/frequency" }, method = RequestMethod.GET)
169
	public @ResponseBody long getAutocommitFrequency() throws DirecIndexApiException {
170
		return getCommitfrquency();
171
	}
172

    
173
	@RequestMapping(value = { "/api/admin/autocommit/frequency" }, method = RequestMethod.POST)
174
	public @ResponseBody long setAutocommitFrequency(@RequestParam(value = "frequency", required = true) final Integer frequency) throws DirecIndexApiException {
175
		setCommitfrquency(frequency);
176
		log.info(String.format("automatic commit, active '%s', frequency '%s'", isAutocommitactive(), getCommitfrquency()));
177
		return getCommitfrquency();
178
	}
179

    
180
	@Deprecated
181
	@RequestMapping(value = { "/api/publications/feedJson", "/api/results/feedJson" }, method = RequestMethod.POST)
182
	public @ResponseBody String feedObjectJson(@RequestParam(value = "json", required = true) final String json,
183
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws DirecIndexApiException {
184
		final ResultEntry pub = new Gson().fromJson(json, ResultEntry.class);
185
		return feedObject(pub, commit);
186
	}
187

    
188
	@RequestMapping(value = { "/api/results/feedObject" }, method = RequestMethod.POST)
189
	public @ResponseBody String feedResult(@RequestBody final ResultEntry pub,
190
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
191
			throws DirecIndexApiException {
192
		return feed(pub, commit);
193
	}
194

    
195
	@Deprecated
196
	@RequestMapping(value = { "/api/publications/feedObject" }, method = RequestMethod.POST)
197
	public @ResponseBody String feedObject(@RequestBody final ResultEntry pub,
198
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
199
			throws DirecIndexApiException {
200
		return feed(pub, commit);
201
	}
202

    
203

    
204
	@RequestMapping(value = "/api/result/{openaireId}", method = RequestMethod.DELETE)
205
	public @ResponseBody boolean deleteResultWithOpenaireId(
206
			@PathVariable(value = "openaireId") final String openaireId,
207
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws DirecIndexApiException {
208

    
209
		return deleteResult(openaireId, commit);
210
	}
211

    
212
	@RequestMapping(value = "/api/results", method = RequestMethod.DELETE)
213
	public @ResponseBody boolean deleteResultWithOriginalId(
214
			@RequestParam(value = "originalId", required = true) final String originalId,
215
			@RequestParam(value = "collectedFromId", required = true) final String collectedFromId,
216
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws Exception {
217

    
218
		final String openaireId = ResultEntry.calculateOpenaireId(originalId, collectedFromId, serviceLocator.getService(ISLookUpService.class));
219
		return deleteResult(openaireId, commit);
220
	}
221

    
222
	@Deprecated
223
	@RequestMapping(value = { "/api/publications/deleteObject", "/api/results/delete" }, method = RequestMethod.POST)
224
	public @ResponseBody boolean deleteResultPost(
225
			@RequestParam(value = "originalId", required = true) final String originalId,
226
			@RequestParam(value = "collectedFromId", required = true) final String collectedFromId,
227
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws Exception {
228

    
229
		final String openaireId = ResultEntry.calculateOpenaireId(originalId, collectedFromId, serviceLocator.getService(ISLookUpService.class));
230
		return deleteResult(openaireId, commit);
231
	}
232

    
233
	@Deprecated
234
	private String feed(final ResultEntry pub, final boolean commit) throws DirecIndexApiException {
235
		return feed(pub);
236
	}
237

    
238
	private String feed(final ResultEntry pub) throws DirecIndexApiException {
239
		try {
240
			final String oafRecord = pub.asOafRecord(velocityEngine, serviceLocator.getService(ISLookUpService.class), oafSchemaLocation);
241

    
242
			for(Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
243
				final IndexDsInfo idx = e.getKey();
244
				try(final CloudIndexClient client = e.getValue()) {
245
					client.feed(oafRecord, idx.getIndexDsId(), oafToIndexRecordFactory.newTransformer(idx.getFormat()), false);
246
				}
247
			}
248

    
249
			recentResultsQueue.add(oafRecord);
250

    
251
			return pub.getOpenaireId();
252
		} catch (final Throwable e) {
253
			log.error("Error saving record", e);
254
			log.debug(pub.toString());
255
			throw new DirecIndexApiException("Error adding publication: " + e.getMessage(), e);
256
		}
257
	}
258

    
259
	private boolean deleteResult(final String openaireId, final boolean commit) throws DirecIndexApiException {
260
		try {
261
			for(Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
262
				final IndexDsInfo idx = e.getKey();
263
				try(final CloudIndexClient client = e.getValue()) {
264
					client.remove(openaireId, false);
265
					log.info("Deleted result with id: " + openaireId + " from: " + idx.getIndexBaseUrl());
266
				}
267
			}
268

    
269
			recentResultsQueue.remove(openaireId);
270
			return true;
271
		} catch (Throwable e) {
272
			throw new DirecIndexApiException("Error deleting publication: " + e.getMessage(), e);
273
		}
274
	}
275

    
276
	private Map<IndexDsInfo, CloudIndexClient> getClients() throws DirecIndexApiException {
277
		try {
278
			final List<IndexDsInfo> idxList = calculateCurrentIndexDsInfo();
279

    
280
			if (idxList == null || idxList.isEmpty()) {
281
				throw new DirecIndexApiException("Cannot commit index: no public Search Service found");
282
			}
283
			if (idxList.size() > 1) {
284
				log.warn("Found more than 1 public search service");
285
			}
286

    
287
			final Map<IndexDsInfo, CloudIndexClient> clients = Maps.newHashMap();
288
			for(IndexDsInfo i : idxList) {
289
				clients.put(i, CloudIndexClientFactory.newIndexClient(i.getIndexBaseUrl(), i.getColl(), false));
290
			}
291
			return clients;
292

    
293
		} catch (IOException | ISLookUpException | CloudIndexClientException e) {
294
			throw new DirecIndexApiException(e);
295
		}
296
	}
297

    
298
	private List<IndexDsInfo> calculateCurrentIndexDsInfo() throws IOException, ISLookUpException {
299
		final List<IndexDsInfo> list = Lists.newArrayList();
300

    
301
		final String queryUrl = IOUtils.toString(findSolrIndexUrl.getInputStream());
302
		final String queryDs = IOUtils.toString(findIndexDsInfo.getInputStream());
303

    
304
		final ISLookUpService lu = serviceLocator.getService(ISLookUpService.class);
305
		final String indexBaseUrl = lu.getResourceProfileByQuery(queryUrl);
306

    
307
		final List<String> idxDs = lu.quickSearchProfile(queryDs);
308
		for (final String idx : idxDs) {
309
			final String[] arr = idx.split("@@@");
310
			list.add(new IndexDsInfo(indexBaseUrl, arr[0].trim(), arr[1].trim(), arr[2].trim()));
311
		}
312
		return list;
313
	}
314

    
315
	@ExceptionHandler(Exception.class)
316
	@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR)
317
	public @ResponseBody ErrorMessage handleException(final Exception e) {
318
		log.error("Error in direct index API", e);
319
		return new ErrorMessage(e);
320
	}
321

    
322
	public class ErrorMessage {
323

    
324
		private final String message;
325
		private final String stacktrace;
326

    
327
		public ErrorMessage(final Exception e) {
328
			this(e.getMessage(), ExceptionUtils.getStackTrace(e));
329
		}
330

    
331
		public ErrorMessage(final String message, final String stacktrace) {
332
			this.message = message;
333
			this.stacktrace = stacktrace;
334
		}
335

    
336
		public String getMessage() {
337
			return this.message;
338
		}
339

    
340
		public String getStacktrace() {
341
			return this.stacktrace;
342
		}
343

    
344
	}
345

    
346
	public boolean isAutocommitactive() {
347
		return autocommitactive;
348
	}
349

    
350
	public synchronized void setAutocommitactive(boolean autocommitactive) {
351
		this.autocommitactive = autocommitactive;
352
	}
353

    
354
	public long getCommitfrquency() {
355
		return commitfrquency;
356
	}
357

    
358
	public synchronized void setCommitfrquency(long commitfrquency) {
359
		if (commitfrquency < COMMIT_FREQUENCY_MINVALUE) {
360
			throw new RuntimeException("cannot set autocommit frequency: minimum accepted value (inclusive) is " + COMMIT_FREQUENCY_MINVALUE);
361
		}
362
		this.commitfrquency = commitfrquency;
363
		updateSchedule();
364
	}
365
}
(3-3/4)