Project

General

Profile

« Previous | Next » 

Revision 54020

do not commit for every direct index request, instead delegate the commit to a periodic task

View differences:

modules/dnet-directindex-api/branches/solr75/src/main/java/eu/dnetlib/openaire/directindex/api/OpenaireResultSubmitter.java
1 1
package eu.dnetlib.openaire.directindex.api;
2 2

  
3
import java.io.IOException;
4
import java.util.Arrays;
5
import java.util.List;
6
import javax.annotation.Resource;
7

  
8 3
import com.google.common.collect.Lists;
4
import com.google.common.collect.Maps;
9 5
import com.google.gson.Gson;
10 6
import eu.dnetlib.common.rmi.DNetRestDocumentation;
11 7
import eu.dnetlib.data.index.CloudIndexClient;
8
import eu.dnetlib.data.index.CloudIndexClientException;
12 9
import eu.dnetlib.data.index.CloudIndexClientFactory;
13 10
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
14 11
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
......
16 13
import eu.dnetlib.openaire.directindex.objects.ResultEntry;
17 14
import eu.dnetlib.openaire.directindex.utils.OafToIndexRecordFactory;
18 15
import org.apache.commons.io.IOUtils;
19
import org.apache.commons.lang3.exception.ExceptionUtils;
16
import org.apache.commons.lang.exception.ExceptionUtils;
20 17
import org.apache.commons.logging.Log;
21 18
import org.apache.commons.logging.LogFactory;
22 19
import org.apache.velocity.app.VelocityEngine;
23 20
import org.springframework.beans.factory.annotation.Value;
24
import org.springframework.context.annotation.Profile;
25 21
import org.springframework.core.io.ClassPathResource;
26 22
import org.springframework.http.HttpStatus;
27 23
import org.springframework.stereotype.Controller;
28 24
import org.springframework.web.bind.annotation.*;
29 25

  
26
import javax.annotation.Resource;
27
import java.io.IOException;
28
import java.util.List;
29
import java.util.Map;
30
import java.util.concurrent.ExecutorService;
31
import java.util.concurrent.Executors;
32

  
30 33
/**
31 34
 * Created by michele on 11/11/15.
32 35
 */
33 36
@Controller
34 37
@DNetRestDocumentation
35
@Profile("provision")
36 38
public class OpenaireResultSubmitter {
37 39

  
38
	@RequestMapping(value = { "/api/test" }, method = RequestMethod.GET)
39
	public @ResponseBody List<String> test() {
40
		return Arrays.asList("a", "b");
41
	}
42

  
43 40
	private static final Log log = LogFactory.getLog(OpenaireResultSubmitter.class);
44 41

  
45 42
	public class IndexDsInfo {
......
95 92
	@Value(value = "${openaire.api.directindex.findIndexDsInfo.xquery}")
96 93
	private ClassPathResource findIndexDsInfo;
97 94

  
95
	private ExecutorService executor = Executors.newSingleThreadExecutor();
96

  
97
	private boolean paused = false;
98

  
99
	@Value(value = "${openaire.api.directindex.commit.frequency}")
100
	private int commitfrquency = 60;
101

  
102
	public OpenaireResultSubmitter() {
103

  
104
		executor.submit(() -> {
105
			while(!paused) {
106
				try {
107
					Thread.sleep(commitfrquency * 1000);
108
					for(Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
109
						final IndexDsInfo i = e.getKey();
110
						final CloudIndexClient client = e.getValue();
111

  
112
						log.info("performing commit on " + i.getColl());
113

  
114
						client.commit();
115
					}
116
				} catch (DirecIndexApiException | CloudIndexClientException e) {
117

  
118
					//TODO perhaps we could send an email to notify this failure.
119
					throw new RuntimeException(e);
120

  
121
				} catch (InterruptedException e) {
122
					log.warn("commit thread iterrupted");
123
				}
124
			}
125
		});
126
	}
127

  
128
	@RequestMapping(value = { "/api/admin/autocommit/active" }, method = RequestMethod.GET)
129
	public @ResponseBody Boolean getAutocommit() throws DirecIndexApiException {
130
		return isPaused();
131
	}
132

  
133
	@RequestMapping(value = { "/api/admin/autocommit/active" }, method = RequestMethod.POST)
134
	public @ResponseBody Boolean setAutocommit(@RequestParam(value = "active", required = true) final Boolean active) throws DirecIndexApiException {
135
		setPaused(active);
136
		log.info(String.format("automatic commit, active '%s', frequency '%s'", isPaused(), getCommitfrquency()));
137
		return isPaused();
138
	}
139

  
140
	@RequestMapping(value = { "/api/admin/autocommit/frequency" }, method = RequestMethod.GET)
141
	public @ResponseBody int getAutocommitFrequency() throws DirecIndexApiException {
142
		return getCommitfrquency();
143
	}
144

  
145
	@RequestMapping(value = { "/api/admin/autocommit/frequency" }, method = RequestMethod.POST)
146
	public @ResponseBody int setAutocommitFrequency(@RequestParam(value = "frequency", required = true) final Integer frequency) throws DirecIndexApiException {
147
		setCommitfrquency(frequency);
148
		log.info(String.format("automatic commit, active '%s', frequency '%s'", isPaused(), getCommitfrquency()));
149
		return getCommitfrquency();
150
	}
151

  
98 152
	@Deprecated
99 153
	@RequestMapping(value = { "/api/publications/feedJson", "/api/results/feedJson" }, method = RequestMethod.POST)
100 154
	public @ResponseBody String feedObjectJson(@RequestParam(value = "json", required = true) final String json,
101
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws DirecIndexApiException {
155
											   @RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws DirecIndexApiException {
102 156
		final ResultEntry pub = new Gson().fromJson(json, ResultEntry.class);
103 157
		return feedObject(pub, commit);
104 158
	}
105 159

  
106 160
	@RequestMapping(value = { "/api/results/feedObject" }, method = RequestMethod.POST)
107 161
	public @ResponseBody String feedResult(@RequestBody final ResultEntry pub,
108
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
162
										   @RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
109 163
			throws DirecIndexApiException {
110 164
		return feed(pub, commit);
111 165
	}
......
113 167
	@Deprecated
114 168
	@RequestMapping(value = { "/api/publications/feedObject" }, method = RequestMethod.POST)
115 169
	public @ResponseBody String feedObject(@RequestBody final ResultEntry pub,
116
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
170
										   @RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
117 171
			throws DirecIndexApiException {
118 172
		return feed(pub, commit);
119 173
	}
120 174

  
175

  
121 176
	@RequestMapping(value = "/api/result/{openaireId}", method = RequestMethod.DELETE)
122 177
	public @ResponseBody boolean deleteResultWithOpenaireId(
123 178
			@PathVariable(value = "openaireId") final String openaireId,
......
147 202
		return deleteResult(openaireId, commit);
148 203
	}
149 204

  
205
	@Deprecated
150 206
	private String feed(final ResultEntry pub, final boolean commit) throws DirecIndexApiException {
151
		final List<IndexDsInfo> idxList;
207
		return feed(pub);
208
	}
209

  
210
	private String feed(final ResultEntry pub) throws DirecIndexApiException {
152 211
		try {
153
			idxList = calculateCurrentIndexDsInfo();
154
			if (idxList == null
155
					|| idxList.isEmpty()) { throw new DirecIndexApiException("Cannot add result: " + pub.getAnyId() + " : No public Search Service found"); }
156
			if (idxList.size() > 1) {
157
				log.warn("Found more than 1 public search service");
158
			}
159 212
			final String oafRecord = pub.asOafRecord(velocityEngine, serviceLocator.getService(ISLookUpService.class), oafSchemaLocation);
160 213

  
161
			for (final IndexDsInfo idx : idxList) {
162
				CloudIndexClient idxClient = null;
163
				try {
164
					idxClient = CloudIndexClientFactory.newIndexClient(idx.getIndexBaseUrl(), idx.getColl(), false);
165
					idxClient.feed(oafRecord, idx.getIndexDsId(), oafToIndexRecordFactory.newTransformer(idx.getFormat()), commit);
166
				} catch (final Throwable e) {
167
					throw new DirecIndexApiException("Error adding publication: " + e.getMessage(), e);
168
				} finally {
169
					if (idxClient != null) {
170
						idxClient.close();
171
					}
172
				}
214
			for(Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
215
				final IndexDsInfo idx = e.getKey();
216
				final CloudIndexClient client = e.getValue();
217
				client.feed(oafRecord, idx.getIndexDsId(), oafToIndexRecordFactory.newTransformer(idx.getFormat()), false);
218
				client.close();
173 219
			}
220

  
174 221
			recentResultsQueue.add(oafRecord);
222

  
175 223
			return pub.getOpenaireId();
176 224
		} catch (final Throwable e) {
177 225
			log.error("Error saving record", e);
......
181 229
	}
182 230

  
183 231
	private boolean deleteResult(final String openaireId, final boolean commit) throws DirecIndexApiException {
232
		try {
233
			for(Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
234
				final IndexDsInfo idx = e.getKey();
235
				final CloudIndexClient client = e.getValue();
236
				client.remove(openaireId, false);
237
				log.info("Deleted result with id: " + openaireId + " from: " + idx.getIndexBaseUrl());
238
				client.close();
239
			}
184 240

  
185
		final List<IndexDsInfo> idxList;
241
			recentResultsQueue.remove(openaireId);
242
			return true;
243
		} catch (Throwable e) {
244
			throw new DirecIndexApiException("Error deleting publication: " + e.getMessage(), e);
245
		}
246
	}
247

  
248
	private Map<IndexDsInfo, CloudIndexClient> getClients() throws DirecIndexApiException {
186 249
		try {
187
			idxList = calculateCurrentIndexDsInfo();
250
			final List<IndexDsInfo> idxList = calculateCurrentIndexDsInfo();
188 251

  
189
			if (idxList == null
190
					|| idxList.isEmpty()) { throw new DirecIndexApiException("Cannot delete result: " + openaireId + " : No public Search Service found"); }
252
			if (idxList == null || idxList.isEmpty()) {
253
				throw new DirecIndexApiException("Cannot commit index: no public Search Service found");
254
			}
191 255
			if (idxList.size() > 1) {
192 256
				log.warn("Found more than 1 public search service");
193 257
			}
194 258

  
195
			// final String objId = ResultEntry.calculateOpenaireId(originalId, collectedFromId,
196
			// serviceLocator.getService(ISLookUpService.class));
259
			final Map<IndexDsInfo, CloudIndexClient> clients = Maps.newHashMap();
260
			for(IndexDsInfo i : idxList) {
261
				clients.put(i, CloudIndexClientFactory.newIndexClient(i.getIndexBaseUrl(), i.getColl(), false));
262
			}
263
			return clients;
197 264

  
198
			for (final IndexDsInfo idx : idxList) {
199
				CloudIndexClient idxClient = null;
200
				try {
201
					idxClient = CloudIndexClientFactory.newIndexClient(idx.getIndexBaseUrl(), idx.getColl(), false);
202
					idxClient.remove(openaireId, commit);
203
					log.info("Deleted result with id: " + openaireId + " from: " + idx.getIndexBaseUrl());
204
				} catch (final Throwable e) {
205
					throw new DirecIndexApiException("Error deleting publication: " + e.getMessage(), e);
206
				} finally {
207
					if (idxClient != null) {
208
						idxClient.close();
209
					}
210
				}
211
			}
212
			recentResultsQueue.remove(openaireId);
213
			return true;
214
		} catch (IOException | ISLookUpException e) {
215
			throw new DirecIndexApiException("Error deleting publication: " + e.getMessage(), e);
265
		} catch (IOException | ISLookUpException | CloudIndexClientException e) {
266
			throw new DirecIndexApiException(e);
216 267
		}
217 268
	}
218 269

  
219
	@ExceptionHandler(Exception.class)
220
	@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR)
221
	public @ResponseBody ErrorMessage handleException(final Exception e) {
222
		log.error("Error in direct index API", e);
223
		return new ErrorMessage(e);
224
	}
225

  
226 270
	private List<IndexDsInfo> calculateCurrentIndexDsInfo() throws IOException, ISLookUpException {
227 271
		final List<IndexDsInfo> list = Lists.newArrayList();
228 272

  
......
240 284
		return list;
241 285
	}
242 286

  
287
	@ExceptionHandler(Exception.class)
288
	@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR)
289
	public @ResponseBody ErrorMessage handleException(final Exception e) {
290
		log.error("Error in direct index API", e);
291
		return new ErrorMessage(e);
292
	}
293

  
243 294
	public class ErrorMessage {
244 295

  
245 296
		private final String message;
......
264 315

  
265 316
	}
266 317

  
318
	public boolean isPaused() {
319
		return paused;
320
	}
321

  
322
	public void setPaused(boolean paused) {
323
		this.paused = paused;
324
	}
325

  
326
	public int getCommitfrquency() {
327
		return commitfrquency;
328
	}
329

  
330
	public void setCommitfrquency(int commitfrquency) {
331
		this.commitfrquency = commitfrquency;
332
	}
267 333
}
modules/dnet-directindex-api/branches/solr75/src/main/resources/eu/dnetlib/openaire/directindex/applicationContext-api.properties
4 4
openaire.api.directindex.mongo.collection=recent_publications
5 5
openaire.api.directindex.layoutToRecord.xslt=/eu/dnetlib/msro/openaireplus/workflows/index/openaireLayoutToRecordStylesheet.xsl
6 6
openaire.api.directindex.findSolrIndexUrl.xquery=/eu/dnetlib/openaire/directindex/findSolrIndexUrl.xquery
7
openaire.api.directindex.findIndexDsInfo.xquery=/eu/dnetlib/openaire/directindex/findIndexDsInfo.xquery
7
openaire.api.directindex.findIndexDsInfo.xquery=/eu/dnetlib/openaire/directindex/findIndexDsInfo.xquery
8
openaire.api.directindex.commit.frequency=60

Also available in: Unified diff