1
|
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
|
2
|
|
3
|
import java.util.Queue;
|
4
|
|
5
|
import com.googlecode.sarasvati.Arc;
|
6
|
import com.googlecode.sarasvati.NodeToken;
|
7
|
import eu.dnetlib.msro.rmi.MSROException;
|
8
|
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
|
9
|
import org.apache.commons.httpclient.HttpStatus;
|
10
|
import org.apache.commons.logging.Log;
|
11
|
import org.apache.commons.logging.LogFactory;
|
12
|
import org.apache.http.client.methods.CloseableHttpResponse;
|
13
|
import org.apache.http.client.methods.HttpGet;
|
14
|
import org.apache.http.impl.client.CloseableHttpClient;
|
15
|
import org.apache.http.impl.client.HttpClients;
|
16
|
import org.springframework.beans.factory.annotation.Value;
|
17
|
|
18
|
/**
|
19
|
* SwitchIndexesJobNode guides the index switches on all search services whose ids are in the queue available in the parameter named 'searchService_ids'.
|
20
|
*
|
21
|
* @author claudio, alessia
|
22
|
* @see eu.dnetlib.msro.openaireplus.workflows.nodes.index.FindSearchServicesJobNode
|
23
|
* @see eu.dnetlib.msro.openaireplus.workflows.nodes.index.SwitchIndexJobNode
|
24
|
*/
|
25
|
public class SwitchIndexesJobNode extends SimpleJobNode {
|
26
|
|
27
|
private static final Log log = LogFactory.getLog(SwitchIndexesJobNode.class);
|
28
|
|
29
|
@Value("${dnet.openaire.index.api.cacheEvictUrl}")
|
30
|
private String indexApiCacheEvictUrl;
|
31
|
|
32
|
@Override
|
33
|
protected String execute(final NodeToken token) throws Exception {
|
34
|
Queue<String> q = (Queue<String>) token.getEnv().getTransientAttribute("searchService_ids");
|
35
|
log.debug("Got the searchService_ids queue: " + q.toString());
|
36
|
if (q == null) throw new MSROException("Transient param 'searchService_ids' with queue of string could not be found");
|
37
|
if (q.isEmpty()) {
|
38
|
log.info("searchService_ids queue consumed, now ending cycle and evicting direct indexing api cache on " + indexApiCacheEvictUrl);
|
39
|
final HttpGet get = new HttpGet(indexApiCacheEvictUrl);
|
40
|
|
41
|
try(CloseableHttpClient client = HttpClients.createDefault()) {
|
42
|
try (CloseableHttpResponse response = client.execute(get)) {
|
43
|
int statusCode = response.getStatusLine().getStatusCode();
|
44
|
log.info(String.format("GET on %s returned %s", indexApiCacheEvictUrl, statusCode));
|
45
|
if (statusCode == HttpStatus.SC_OK) {
|
46
|
return Arc.DEFAULT_ARC;
|
47
|
} else
|
48
|
throw new MSROException("Cannot evict cache of direct indexing API at: " + indexApiCacheEvictUrl);
|
49
|
}
|
50
|
}
|
51
|
} else {
|
52
|
//we have something to do: setting the xqueryForServiceIdParam for the SwitchIndexJobNode
|
53
|
String id = q.poll();
|
54
|
log.debug("Polled id: " + id);
|
55
|
log.debug("And now the queue is " + q);
|
56
|
token.getEnv().setAttribute("search_service_ID", id);
|
57
|
log.debug("Asking to switch on profile with id: " + id);
|
58
|
//updating the queue for next iteration
|
59
|
token.getEnv().setTransientAttribute("searchService_ids", q);
|
60
|
return "switch";
|
61
|
}
|
62
|
|
63
|
}
|
64
|
}
|