Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
2

    
3
import java.util.Queue;
4

    
5
import com.googlecode.sarasvati.Arc;
6
import com.googlecode.sarasvati.NodeToken;
7
import eu.dnetlib.msro.rmi.MSROException;
8
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
9
import org.apache.commons.httpclient.HttpStatus;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.apache.http.client.methods.CloseableHttpResponse;
13
import org.apache.http.client.methods.HttpGet;
14
import org.apache.http.impl.client.CloseableHttpClient;
15
import org.apache.http.impl.client.HttpClients;
16
import org.springframework.beans.factory.annotation.Value;
17

    
18
/**
19
 * SwitchIndexesJobNode guides the index switches on all search services whose ids are in the queue available in the parameter named 'searchService_ids'.
20
 *
21
 * @author claudio, alessia
22
 * @see eu.dnetlib.msro.openaireplus.workflows.nodes.index.FindSearchServicesJobNode
23
 * @see eu.dnetlib.msro.openaireplus.workflows.nodes.index.SwitchIndexJobNode
24
 */
25
public class SwitchIndexesJobNode extends SimpleJobNode {
26

    
27
	private static final Log log = LogFactory.getLog(SwitchIndexesJobNode.class);
28

    
29
	@Value("${dnet.openaire.index.api.cacheEvictUrl}")
30
	private String indexApiCacheEvictUrl;
31

    
32
	@Override
33
	protected String execute(final NodeToken token) throws Exception {
34
		Queue<String> q = (Queue<String>) token.getEnv().getTransientAttribute("searchService_ids");
35
		log.debug("Got the searchService_ids queue: " + q.toString());
36
		if (q == null) throw new MSROException("Transient param 'searchService_ids' with queue of string could not be found");
37
		if (q.isEmpty()) {
38
			log.info("searchService_ids queue consumed, now ending cycle and evicting direct indexing api cache on " + indexApiCacheEvictUrl);
39
			final HttpGet get = new HttpGet(indexApiCacheEvictUrl);
40

    
41
			try(CloseableHttpClient client = HttpClients.createDefault()) {
42
				try (CloseableHttpResponse response = client.execute(get)) {
43
					int statusCode = response.getStatusLine().getStatusCode();
44
					log.info(String.format("GET on %s returned %s", indexApiCacheEvictUrl, statusCode));
45
					if (statusCode == HttpStatus.SC_OK) {
46
						return Arc.DEFAULT_ARC;
47
					} else
48
						throw new MSROException("Cannot evict cache of direct indexing API at: " + indexApiCacheEvictUrl);
49
				}
50
			}
51
		} else {
52
			//we have something to do: setting the xqueryForServiceIdParam for the SwitchIndexJobNode
53
			String id = q.poll();
54
			log.debug("Polled id: " + id);
55
			log.debug("And now the queue is " + q);
56
			token.getEnv().setAttribute("search_service_ID", id);
57
			log.debug("Asking to switch on profile with id: " + id);
58
			//updating the queue for next iteration
59
			token.getEnv().setTransientAttribute("searchService_ids", q);
60
			return "switch";
61
		}
62

    
63
	}
64
}
(9-9/10)