Project

General

Profile

« Previous | Next » 

Revision 54128

index clients reuse the connection. Using Guava's AbstractScheduledService for the commit task

View differences:

OpenaireResultSubmitter.java
1 1
package eu.dnetlib.openaire.directindex.api;
2 2

  
3
import com.google.common.collect.Lists;
4
import com.google.common.collect.Maps;
5 3
import com.google.gson.Gson;
6 4
import eu.dnetlib.common.rmi.DNetRestDocumentation;
7 5
import eu.dnetlib.data.index.CloudIndexClient;
8
import eu.dnetlib.data.index.CloudIndexClientException;
9
import eu.dnetlib.data.index.CloudIndexClientFactory;
10
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
11 6
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
12 7
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13 8
import eu.dnetlib.openaire.directindex.objects.ResultEntry;
14 9
import eu.dnetlib.openaire.directindex.utils.OafToIndexRecordFactory;
15
import org.apache.commons.io.IOUtils;
16 10
import org.apache.commons.lang.exception.ExceptionUtils;
17 11
import org.apache.commons.logging.Log;
18 12
import org.apache.commons.logging.LogFactory;
19 13
import org.apache.velocity.app.VelocityEngine;
20 14
import org.springframework.beans.factory.annotation.Value;
21
import org.springframework.core.io.ClassPathResource;
22 15
import org.springframework.http.HttpStatus;
23 16
import org.springframework.stereotype.Controller;
24 17
import org.springframework.web.bind.annotation.*;
25 18

  
26 19
import javax.annotation.Resource;
27
import java.io.IOException;
28
import java.util.List;
29 20
import java.util.Map;
30
import java.util.concurrent.*;
31
import java.util.concurrent.atomic.AtomicBoolean;
32 21

  
33 22
/**
34 23
 * Created by michele on 11/11/15.
......
39 28

  
40 29
	private static final Log log = LogFactory.getLog(OpenaireResultSubmitter.class);
41 30

  
42
	public class IndexDsInfo {
43

  
44
		private final String indexBaseUrl;
45
		private final String indexDsId;
46
		private final String format;
47
		private final String coll;
48

  
49
		public IndexDsInfo(final String indexBaseUrl, final String indexDsId, final String format, final String coll) {
50
			this.indexBaseUrl = indexBaseUrl;
51
			this.indexDsId = indexDsId;
52
			this.format = format;
53
			this.coll = coll;
54
		}
55

  
56
		public String getIndexBaseUrl() {
57
			return indexBaseUrl;
58
		}
59

  
60
		public String getIndexDsId() {
61
			return indexDsId;
62
		}
63

  
64
		public String getFormat() {
65
			return format;
66
		}
67

  
68
		public String getColl() {
69
			return coll;
70
		}
71

  
72
	}
73

  
74 31
	@Value(value = "oaf.schema.location")
75 32
	private String oafSchemaLocation;
76 33

  
......
86 43
	@Resource(name = "openaireplusApisVelocityEngine")
87 44
	private VelocityEngine velocityEngine;
88 45

  
89
	@Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
90
	private ClassPathResource findSolrIndexUrl;
46
	@Resource
47
	private IndexClientMap clientMap;
91 48

  
92
	@Value(value = "${openaire.api.directindex.findIndexDsInfo.xquery}")
93
	private ClassPathResource findIndexDsInfo;
49
	@Resource
50
	private ResultSubmitterService submitterService;
94 51

  
95
	/**
96
	 * Autocommit feature activation flag
97
	 */
98
	@Value(value = "${openaire.api.directindex.autocommit.active}")
99
	private boolean autocommitactive;
100

  
101
	/**
102
	 * Autocommit frequency (Seconds)
103
	 */
104
	@Value(value = "${openaire.api.directindex.autocommit.frequency}")
105
	private long commitfrquency = 60;
106

  
107
	/**
108
	 * Minimum autocommit frequency
109
	 */
110
	private static long COMMIT_FREQUENCY_MINVALUE = 5;
111

  
112
	/**
113
	 * Autocommit executor service
114
	 */
115
	private ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
116

  
117
	/**
118
	 * Autocommit task handle
119
	 */
120
	private ScheduledFuture task = null;
121

  
122

  
123
	private AtomicBoolean commitSuccessful;
124

  
125
	public OpenaireResultSubmitter() {
126

  
127
		commitSuccessful = new AtomicBoolean(false);
128
		updateSchedule();
129
	}
130

  
131
	private void updateSchedule() {
132

  
133
		if (task != null) {
134
			task.cancel(true);
135
		}
136
		task = executor.scheduleAtFixedRate(doCommit(), 0, getCommitfrquency(), TimeUnit.SECONDS);
137

  
138
	}
139

  
140
	private Runnable doCommit() {
141
		return () -> {
142
			//TODO perform the commit only if there are pending documents, e.g. query /solr/admin/metrics?group=core&prefix=UPDATE.updateHandler to inspect if there are any.
143
			if (isAutocommitactive()) {
144
				try {
145
					for (Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
146
						final IndexDsInfo i = e.getKey();
147
						try(final CloudIndexClient client = e.getValue()) {
148

  
149
							log.info("performing commit on " + i.getColl());
150
							client.commit();
151
						}
152
					}
153
					commitSuccessful.set(true);
154
				} catch (DirecIndexApiException | CloudIndexClientException | IOException e) {
155
					commitSuccessful.set(false);
156
					//TODO perhaps we could send an email to notify this failure.
157
					throw new RuntimeException(e);
158
				}
159
			}
160
		};
161
	}
162

  
163 52
	@RequestMapping(value = { "/api/admin/autocommit/active" }, method = RequestMethod.GET)
164 53
	public @ResponseBody Boolean getAutocommit() throws DirecIndexApiException {
165
		return isAutocommitactive();
54
		return submitterService.isAutocommitactive();
166 55
	}
167 56

  
168 57
	@RequestMapping(value = { "/api/admin/autocommit/active" }, method = RequestMethod.POST)
169 58
	public @ResponseBody Boolean setAutocommit(@RequestParam(value = "active", required = true) final Boolean active) throws DirecIndexApiException {
170
		setAutocommitactive(active);
171
		log.info(String.format("automatic commit, active '%s', frequency '%s'", isAutocommitactive(), getCommitfrquency()));
172
		return isAutocommitactive();
59
		submitterService.setAutocommitactive(active);
60
		log.info(String.format("automatic commit, active '%s', frequency '%s'", submitterService.isAutocommitactive(), submitterService.getCommitfrquency()));
61
		return submitterService.isAutocommitactive();
173 62
	}
174 63

  
175 64
	@RequestMapping(value = { "/api/admin/autocommit/frequency" }, method = RequestMethod.GET)
176 65
	public @ResponseBody long getAutocommitFrequency() throws DirecIndexApiException {
177
		return getCommitfrquency();
66
		return submitterService.getCommitfrquency();
178 67
	}
179 68

  
180 69
	@RequestMapping(value = { "/api/admin/autocommit/frequency" }, method = RequestMethod.POST)
181 70
	public @ResponseBody long setAutocommitFrequency(@RequestParam(value = "frequency", required = true) final Integer frequency) throws DirecIndexApiException {
182
		setCommitfrquency(frequency);
183
		log.info(String.format("automatic commit, active '%s', frequency '%s'", isAutocommitactive(), getCommitfrquency()));
184
		return getCommitfrquency();
71
		submitterService.setCommitfrquency(frequency);
72
		log.info(String.format("automatic commit, active '%s', frequency '%s'", submitterService.isAutocommitactive(), submitterService.getCommitfrquency()));
73
		return submitterService.getCommitfrquency();
185 74
	}
186 75

  
187 76
	@Deprecated
188 77
	@RequestMapping(value = { "/api/publications/feedJson", "/api/results/feedJson" }, method = RequestMethod.POST)
189 78
	public @ResponseBody String feedObjectJson(@RequestParam(value = "json", required = true) final String json,
190
											   @RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws DirecIndexApiException {
79
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit) throws DirecIndexApiException {
191 80
		final ResultEntry pub = new Gson().fromJson(json, ResultEntry.class);
192 81
		return feedObject(pub, commit);
193 82
	}
194 83

  
195 84
	@RequestMapping(value = { "/api/results/feedObject" }, method = RequestMethod.POST)
196 85
	public @ResponseBody String feedResult(@RequestBody final ResultEntry pub,
197
										   @RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
86
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
198 87
			throws DirecIndexApiException {
199 88
		return feed(pub, commit);
200 89
	}
......
202 91
	@Deprecated
203 92
	@RequestMapping(value = { "/api/publications/feedObject" }, method = RequestMethod.POST)
204 93
	public @ResponseBody String feedObject(@RequestBody final ResultEntry pub,
205
										   @RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
94
			@RequestParam(value = "commit", required = false, defaultValue = "true") final boolean commit)
206 95
			throws DirecIndexApiException {
207 96
		return feed(pub, commit);
208 97
	}
......
246 135
		try {
247 136
			final String oafRecord = pub.asOafRecord(velocityEngine, serviceLocator.getService(ISLookUpService.class), oafSchemaLocation);
248 137

  
249
			for(Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
138
			for(Map.Entry<IndexDsInfo, CloudIndexClient> e : clientMap.getClients().entrySet()) {
250 139
				final IndexDsInfo idx = e.getKey();
251
				try(final CloudIndexClient client = e.getValue()) {
252
					client.feed(oafRecord, idx.getIndexDsId(), oafToIndexRecordFactory.newTransformer(idx.getFormat()), false);
253
				}
140
				final CloudIndexClient client = e.getValue();
141
				client.feed(oafRecord, idx.getIndexDsId(), oafToIndexRecordFactory.newTransformer(idx.getFormat()), false);
254 142
			}
255 143

  
256 144
			recentResultsQueue.add(oafRecord);
......
260 148
			log.error("Error saving record", e);
261 149
			log.debug(pub.toString());
262 150
			throw new DirecIndexApiException("Error adding publication: " + e.getMessage(), e);
263
		} finally {
264
			if (commitSuccessful.get() == false) {
265
				log.info("commit thread died, rescheduling ...");
266
				updateSchedule();
267
			}
268 151
		}
269 152
	}
270 153

  
271 154
	private boolean deleteResult(final String openaireId, final boolean commit) throws DirecIndexApiException {
272 155
		try {
273
			for(Map.Entry<IndexDsInfo, CloudIndexClient> e : getClients().entrySet()) {
156
			for(Map.Entry<IndexDsInfo, CloudIndexClient> e : clientMap.getClients().entrySet()) {
274 157
				final IndexDsInfo idx = e.getKey();
275
				try(final CloudIndexClient client = e.getValue()) {
276
					client.remove(openaireId, false);
277
					log.info("Deleted result with id: " + openaireId + " from: " + idx.getIndexBaseUrl());
278
				}
158
				final CloudIndexClient client = e.getValue();
159
				client.remove(openaireId, false);
160
				log.info("Deleted result with id: " + openaireId + " from: " + idx.getIndexBaseUrl());
279 161
			}
280 162

  
281 163
			recentResultsQueue.remove(openaireId);
......
285 167
		}
286 168
	}
287 169

  
288
	private Map<IndexDsInfo, CloudIndexClient> getClients() throws DirecIndexApiException {
289
		try {
290
			final List<IndexDsInfo> idxList = calculateCurrentIndexDsInfo();
291

  
292
			if (idxList == null || idxList.isEmpty()) {
293
				throw new DirecIndexApiException("Cannot commit index: no public Search Service found");
294
			}
295
			if (idxList.size() > 1) {
296
				log.warn("Found more than 1 public search service");
297
			}
298

  
299
			final Map<IndexDsInfo, CloudIndexClient> clients = Maps.newHashMap();
300
			for(IndexDsInfo i : idxList) {
301
				clients.put(i, CloudIndexClientFactory.newIndexClient(i.getIndexBaseUrl(), i.getColl(), false));
302
			}
303
			return clients;
304

  
305
		} catch (IOException | ISLookUpException | CloudIndexClientException e) {
306
			throw new DirecIndexApiException(e);
307
		}
308
	}
309

  
310
	private List<IndexDsInfo> calculateCurrentIndexDsInfo() throws IOException, ISLookUpException {
311
		final List<IndexDsInfo> list = Lists.newArrayList();
312

  
313
		final String queryUrl = IOUtils.toString(findSolrIndexUrl.getInputStream());
314
		final String queryDs = IOUtils.toString(findIndexDsInfo.getInputStream());
315

  
316
		final ISLookUpService lu = serviceLocator.getService(ISLookUpService.class);
317
		final String indexBaseUrl = lu.getResourceProfileByQuery(queryUrl);
318

  
319
		final List<String> idxDs = lu.quickSearchProfile(queryDs);
320
		for (final String idx : idxDs) {
321
			final String[] arr = idx.split("@@@");
322
			list.add(new IndexDsInfo(indexBaseUrl, arr[0].trim(), arr[1].trim(), arr[2].trim()));
323
		}
324
		return list;
325
	}
326

  
327 170
	@ExceptionHandler(Exception.class)
328 171
	@ResponseStatus(value = HttpStatus.INTERNAL_SERVER_ERROR)
329 172
	public @ResponseBody ErrorMessage handleException(final Exception e) {
......
355 198

  
356 199
	}
357 200

  
358
	public boolean isAutocommitactive() {
359
		return autocommitactive;
360
	}
361

  
362
	public synchronized void setAutocommitactive(boolean autocommitactive) {
363
		this.autocommitactive = autocommitactive;
364
	}
365

  
366
	public long getCommitfrquency() {
367
		return commitfrquency;
368
	}
369

  
370
	public synchronized void setCommitfrquency(long commitfrquency) {
371
		if (commitfrquency < COMMIT_FREQUENCY_MINVALUE) {
372
			throw new RuntimeException("cannot set autocommit frequency: minimum accepted value (inclusive) is " + COMMIT_FREQUENCY_MINVALUE);
373
		}
374
		this.commitfrquency = commitfrquency;
375
		updateSchedule();
376
	}
377 201
}

Also available in: Unified diff