Project

General

Profile

1
package eu.dnetlib.openaire.directindex.api;
2

    
3
import com.google.common.collect.Lists;
4
import com.google.common.collect.Maps;
5
import com.google.gson.Gson;
6
import eu.dnetlib.common.rmi.DNetRestDocumentation;
7
import eu.dnetlib.data.index.CloudIndexClient;
8
import eu.dnetlib.data.index.CloudIndexClientException;
9
import eu.dnetlib.data.index.CloudIndexClientFactory;
10
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
11
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13
import eu.dnetlib.openaire.directindex.objects.ResultEntry;
14
import eu.dnetlib.openaire.directindex.utils.OafToIndexRecordFactory;
15
import org.apache.commons.io.IOUtils;
16
import org.apache.commons.lang.exception.ExceptionUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19
import org.apache.velocity.app.VelocityEngine;
20
import org.springframework.beans.factory.annotation.Value;
21
import org.springframework.core.io.ClassPathResource;
22
import org.springframework.http.HttpStatus;
23
import org.springframework.stereotype.Controller;
24
import org.springframework.web.bind.annotation.*;
25

    
26
import javax.annotation.Resource;
27
import java.io.IOException;
28
import java.util.List;
29
import java.util.Map;
30
import java.util.concurrent.ExecutorService;
31
import java.util.concurrent.Executors;
32

    
33
/**
34
 * Created by michele on 11/11/15.
35
 */
36
@Controller
37
@DNetRestDocumentation
38
public class OpenaireResultSubmitter {
39

    
40
	private static final Log log = LogFactory.getLog(OpenaireResultSubmitter.class);
41

    
42
	public class IndexDsInfo {
43

    
44
		private final String indexBaseUrl;
45
		private final String indexDsId;
46
		private final String format;
47
		private final String coll;
48

    
49
		public IndexDsInfo(final String indexBaseUrl, final String indexDsId, final String format, final String coll) {
50
			this.indexBaseUrl = indexBaseUrl;
51
			this.indexDsId = indexDsId;
52
			this.format = format;
53
			this.coll = coll;
54
		}
55

    
56
		public String getIndexBaseUrl() {
57
			return indexBaseUrl;
58
		}
59

    
60
		public String getIndexDsId() {
61
			return indexDsId;
62
		}
63

    
64
		public String getFormat() {
65
			return format;
66
		}
67

    
68
		public String getColl() {
69
			return coll;
70
		}
71

    
72
	}
73

    
74
	@Value(value = "oaf.schema.location")
75
	private String oafSchemaLocation;
76

    
77
	@Resource
78
	private UniqueServiceLocator serviceLocator;
79

    
80
	@Resource
81
	private OafToIndexRecordFactory oafToIndexRecordFactory;
82

    
83
	@Resource
84
	private RecentResultsQueue recentResultsQueue;
85

    
86
	@Resource(name = "openaireplusApisVelocityEngine")
87
	private VelocityEngine velocityEngine;
88

    
89
	@Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
90
	private ClassPathResource findSolrIndexUrl;
91

    
92
	@Value(value = "${openaire.api.directindex.findIndexDsInfo.xquery}")
93
	private ClassPathResource findIndexDsInfo;
94

    
95
	private ExecutorService executor = Executors.newSingleThreadExecutor();
96

    
97
	@Value(value = "${openaire.api.directindex.autocommit.active}")
98
	private boolean autocommitactive;
99

    
100
	@Value(value = "${openaire.api.directindex.autocommit.frequency}")
101
	private int commitfrquency = 60;
102

    
103
	public OpenaireResultSubmitter() {
104

    
105
		executor.submit(() -> {
106
			while(autocommitactive) {
107
				try {
108
					Thread.sleep(commitfrquency * 1000);
109
					for(Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
110
						final IndexDsInfo i = e.getKey();
111
						final CloudIndexClient client = e.getValue();
112

    
113
						log.info("performing commit on " + i.getColl());
114

    
115
						client.commit();
116
					}
117
				} catch (DirecIndexApiException | CloudIndexClientException e) {
118

    
119
					//TODO perhaps we could send an email to notify this failure.
120
					throw new RuntimeException(e);
121

    
122
				} catch (InterruptedException e) {
123
					log.warn("commit thread iterrupted");
124
				}
125
			}
126
		});
127
	}
128

    
129
	@RequestMapping(value = { "/api/admin/autocommit/active" }, method = RequestMethod.GET)
130
	public @ResponseBody Boolean getAutocommit() throws DirecIndexApiException {
131
		return isAutocommitactive();
132
	}
133

    
134
	@RequestMapping(value = { "/api/admin/autocommit/active" }, method = RequestMethod.POST)
135
	public @ResponseBody Boolean setAutocommit(@RequestParam(value = "active", required = true) final Boolean active) throws DirecIndexApiException {
136
		setAutocommitactive(active);
137
		log.info(String.format("automatic commit, active '%s', frequency '%s'", isAutocommitactive(), getCommitfrquency()));
138
		return isAutocommitactive();
139
	}
140

    
141
	@RequestMapping(value = { "/api/admin/autocommit/frequency" }, method = RequestMethod.GET)
142
	public @ResponseBody int getAutocommitFrequency() throws DirecIndexApiException {
143
		return getCommitfrquency();
144
	}
145

    
146
	@RequestMapping(value = { "/api/admin/autocommit/frequency" }, method = RequestMethod.POST)
147
	public @ResponseBody int setAutocommitFrequency(@RequestParam(value = "frequency", required = true) final Integer frequency) throws DirecIndexApiException {
148
		setCommitfrquency(frequency);
149
		log.info(String.format("automatic commit, active '%s', frequency '%s'", isAutocommitactive(), getCommitfrquency()));
150
		return getCommitfrquency();
151
	}
152

    
153
	@Deprecated
154
	@RequestMapping(value = { "/api/publications/feedJson", "/api/results/feedJson" }, method = RequestMethod.POST)
155
	public @ResponseBody String feedObjectJson(@RequestParam(value = "json", required = true) final String json,
156
											   @RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws DirecIndexApiException {
157
		final ResultEntry pub = new Gson().fromJson(json, ResultEntry.class);
158
		return feedObject(pub, commit);
159
	}
160

    
161
	@RequestMapping(value = { "/api/results/feedObject" }, method = RequestMethod.POST)
162
	public @ResponseBody String feedResult(@RequestBody final ResultEntry pub,
163
										   @RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
164
			throws DirecIndexApiException {
165
		return feed(pub, commit);
166
	}
167

    
168
	@Deprecated
169
	@RequestMapping(value = { "/api/publications/feedObject" }, method = RequestMethod.POST)
170
	public @ResponseBody String feedObject(@RequestBody final ResultEntry pub,
171
										   @RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
172
			throws DirecIndexApiException {
173
		return feed(pub, commit);
174
	}
175

    
176

    
177
	@RequestMapping(value = "/api/result/{openaireId}", method = RequestMethod.DELETE)
178
	public @ResponseBody boolean deleteResultWithOpenaireId(
179
			@PathVariable(value = "openaireId") final String openaireId,
180
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws DirecIndexApiException {
181

    
182
		return deleteResult(openaireId, commit);
183
	}
184

    
185
	@RequestMapping(value = "/api/results", method = RequestMethod.DELETE)
186
	public @ResponseBody boolean deleteResultWithOriginalId(
187
			@RequestParam(value = "originalId", required = true) final String originalId,
188
			@RequestParam(value = "collectedFromId", required = true) final String collectedFromId,
189
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws Exception {
190

    
191
		final String openaireId = ResultEntry.calculateOpenaireId(originalId, collectedFromId, serviceLocator.getService(ISLookUpService.class));
192
		return deleteResult(openaireId, commit);
193
	}
194

    
195
	@Deprecated
196
	@RequestMapping(value = { "/api/publications/deleteObject", "/api/results/delete" }, method = RequestMethod.POST)
197
	public @ResponseBody boolean deleteResultPost(
198
			@RequestParam(value = "originalId", required = true) final String originalId,
199
			@RequestParam(value = "collectedFromId", required = true) final String collectedFromId,
200
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws Exception {
201

    
202
		final String openaireId = ResultEntry.calculateOpenaireId(originalId, collectedFromId, serviceLocator.getService(ISLookUpService.class));
203
		return deleteResult(openaireId, commit);
204
	}
205

    
206
	@Deprecated
207
	private String feed(final ResultEntry pub, final boolean commit) throws DirecIndexApiException {
208
		return feed(pub);
209
	}
210

    
211
	private String feed(final ResultEntry pub) throws DirecIndexApiException {
212
		try {
213
			final String oafRecord = pub.asOafRecord(velocityEngine, serviceLocator.getService(ISLookUpService.class), oafSchemaLocation);
214

    
215
			for(Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
216
				final IndexDsInfo idx = e.getKey();
217
				final CloudIndexClient client = e.getValue();
218
				client.feed(oafRecord, idx.getIndexDsId(), oafToIndexRecordFactory.newTransformer(idx.getFormat()), false);
219
				client.close();
220
			}
221

    
222
			recentResultsQueue.add(oafRecord);
223

    
224
			return pub.getOpenaireId();
225
		} catch (final Throwable e) {
226
			log.error("Error saving record", e);
227
			log.debug(pub.toString());
228
			throw new DirecIndexApiException("Error adding publication: " + e.getMessage(), e);
229
		}
230
	}
231

    
232
	private boolean deleteResult(final String openaireId, final boolean commit) throws DirecIndexApiException {
233
		try {
234
			for(Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
235
				final IndexDsInfo idx = e.getKey();
236
				final CloudIndexClient client = e.getValue();
237
				client.remove(openaireId, false);
238
				log.info("Deleted result with id: " + openaireId + " from: " + idx.getIndexBaseUrl());
239
				client.close();
240
			}
241

    
242
			recentResultsQueue.remove(openaireId);
243
			return true;
244
		} catch (Throwable e) {
245
			throw new DirecIndexApiException("Error deleting publication: " + e.getMessage(), e);
246
		}
247
	}
248

    
249
	private Map<IndexDsInfo, CloudIndexClient> getClients() throws DirecIndexApiException {
250
		try {
251
			final List<IndexDsInfo> idxList = calculateCurrentIndexDsInfo();
252

    
253
			if (idxList == null || idxList.isEmpty()) {
254
				throw new DirecIndexApiException("Cannot commit index: no public Search Service found");
255
			}
256
			if (idxList.size() > 1) {
257
				log.warn("Found more than 1 public search service");
258
			}
259

    
260
			final Map<IndexDsInfo, CloudIndexClient> clients = Maps.newHashMap();
261
			for(IndexDsInfo i : idxList) {
262
				clients.put(i, CloudIndexClientFactory.newIndexClient(i.getIndexBaseUrl(), i.getColl(), false));
263
			}
264
			return clients;
265

    
266
		} catch (IOException | ISLookUpException | CloudIndexClientException e) {
267
			throw new DirecIndexApiException(e);
268
		}
269
	}
270

    
271
	private List<IndexDsInfo> calculateCurrentIndexDsInfo() throws IOException, ISLookUpException {
272
		final List<IndexDsInfo> list = Lists.newArrayList();
273

    
274
		final String queryUrl = IOUtils.toString(findSolrIndexUrl.getInputStream());
275
		final String queryDs = IOUtils.toString(findIndexDsInfo.getInputStream());
276

    
277
		final ISLookUpService lu = serviceLocator.getService(ISLookUpService.class);
278
		final String indexBaseUrl = lu.getResourceProfileByQuery(queryUrl);
279

    
280
		final List<String> idxDs = lu.quickSearchProfile(queryDs);
281
		for (final String idx : idxDs) {
282
			final String[] arr = idx.split("@@@");
283
			list.add(new IndexDsInfo(indexBaseUrl, arr[0].trim(), arr[1].trim(), arr[2].trim()));
284
		}
285
		return list;
286
	}
287

    
288
	@ExceptionHandler(Exception.class)
289
	@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR)
290
	public @ResponseBody ErrorMessage handleException(final Exception e) {
291
		log.error("Error in direct index API", e);
292
		return new ErrorMessage(e);
293
	}
294

    
295
	public class ErrorMessage {
296

    
297
		private final String message;
298
		private final String stacktrace;
299

    
300
		public ErrorMessage(final Exception e) {
301
			this(e.getMessage(), ExceptionUtils.getStackTrace(e));
302
		}
303

    
304
		public ErrorMessage(final String message, final String stacktrace) {
305
			this.message = message;
306
			this.stacktrace = stacktrace;
307
		}
308

    
309
		public String getMessage() {
310
			return this.message;
311
		}
312

    
313
		public String getStacktrace() {
314
			return this.stacktrace;
315
		}
316

    
317
	}
318

    
319
	public boolean isAutocommitactive() {
320
		return autocommitactive;
321
	}
322

    
323
	public void setAutocommitactive(boolean autocommitactive) {
324
		this.autocommitactive = autocommitactive;
325
	}
326

    
327
	public int getCommitfrquency() {
328
		return commitfrquency;
329
	}
330

    
331
	public void setCommitfrquency(int commitfrquency) {
332
		this.commitfrquency = commitfrquency;
333
	}
334
}
(3-3/4)