Revision 30176
Added by Claudio Atzori about 10 years ago
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/PrepareIndexDataJobNode.java | ||
---|---|---|
4 | 4 |
import java.io.InputStream; |
5 | 5 |
import java.io.StringReader; |
6 | 6 |
import java.io.StringWriter; |
7 |
import java.net.URI; |
|
8 |
import java.util.List; |
|
9 | 7 |
|
10 | 8 |
import javax.annotation.Resource; |
11 | 9 |
import javax.xml.transform.Transformer; |
... | ... | |
21 | 19 |
import org.springframework.beans.factory.annotation.Required; |
22 | 20 |
import org.springframework.core.io.ClassPathResource; |
23 | 21 |
|
24 |
import com.google.common.collect.Iterables; |
|
25 | 22 |
import com.googlecode.sarasvati.Arc; |
26 | 23 |
import com.googlecode.sarasvati.NodeToken; |
27 | 24 |
|
... | ... | |
72 | 69 |
token.getEnv().setAttribute(getRottenRecordsPathParam(), "/tmp" + getFileName(token, "rottenrecords")); |
73 | 70 |
} |
74 | 71 |
|
75 |
token.getEnv().setAttribute("index.solr.url", getIndexSolrUrl(token)); |
|
72 |
token.getEnv().setAttribute("index.solr.url", getIndexSolrUrlZk()); |
|
73 |
token.getEnv().setAttribute("index.solr.collection", getCollectionName(token)); |
|
76 | 74 |
|
77 | 75 |
token.getEnv().setAttribute("index.shutdown.wait.time", getIndexSolrShutdownWait()); |
78 | 76 |
token.getEnv().setAttribute("index.buffer.flush.threshold", getIndexBufferFlushTreshold()); |
... | ... | |
85 | 83 |
return Arc.DEFAULT_ARC; |
86 | 84 |
} |
87 | 85 |
|
88 |
private String getIndexSolrUrl(final NodeToken token) throws ISLookUpException { |
|
89 |
String baseUrl = "http://" + getSolrServerHost() + ":" + getSolrServerPort() + "/" + getSolrWebappContext() + "/" + getCollectionName(token); |
|
90 |
URI uri = URI.create(baseUrl); |
|
91 |
return uri.toString(); |
|
86 |
public String getIndexSolrUrlZk() throws ISLookUpException { |
|
87 |
return getServiceConfigValue("for $x in /RESOURCE_PROFILE[.//RESOURCE_TYPE/@value='IndexServiceResourceType'] return $x//PROTOCOL[./@name='solr']/@address/string()"); |
|
92 | 88 |
} |
93 | 89 |
|
94 |
private String getSolrServerHost() throws ISLookUpException { |
|
95 |
return queryForServiceProperty("solr:host"); |
|
96 |
} |
|
97 |
|
|
98 |
private String getSolrServerPort() throws ISLookUpException { |
|
99 |
return queryForServiceProperty("solr:port"); |
|
100 |
} |
|
101 |
|
|
102 |
private String getSolrWebappContext() throws ISLookUpException { |
|
103 |
return queryForServiceProperty("solr:webContext"); |
|
104 |
} |
|
105 |
|
|
106 | 90 |
public String getIndexSolrShutdownWait() throws ISLookUpException { |
107 | 91 |
return queryForServiceProperty("solr:feedingShutdownTolerance"); |
108 | 92 |
} |
... | ... | |
122 | 106 |
|
123 | 107 |
private String getServiceConfigValue(final String xquery) throws ISLookUpException { |
124 | 108 |
log.debug("quering for service property: " + xquery); |
125 |
final List<String> urls = lookupLocator.getService().quickSearchProfile(xquery);
|
|
126 |
if ((urls == null) || (urls.size() != 1)) { throw new IllegalStateException("unable to find unique service property, xquery: " + xquery); }
|
|
127 |
return Iterables.getOnlyElement(urls);
|
|
109 |
final String res = lookupLocator.getService().getResourceProfileByQuery(xquery);
|
|
110 |
if (StringUtils.isBlank(res)) throw new IllegalStateException("unable to find unique service property, xquery: " + xquery);
|
|
111 |
return res;
|
|
128 | 112 |
} |
129 | 113 |
|
130 | 114 |
private String getFileName(final NodeToken token, final String fileNamePrefix) { |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/FinalizeIndexJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.index; |
2 | 2 |
|
3 |
import static java.lang.String.format; |
|
4 |
|
|
3 | 5 |
import org.apache.commons.lang.StringUtils; |
4 | 6 |
import org.apache.commons.logging.Log; |
5 | 7 |
import org.apache.commons.logging.LogFactory; |
6 |
import org.apache.solr.client.solrj.SolrServer; |
|
7 |
import org.apache.solr.client.solrj.impl.HttpSolrServer; |
|
8 |
import org.apache.solr.client.solrj.impl.CloudSolrServer; |
|
8 | 9 |
import org.apache.solr.client.solrj.response.UpdateResponse; |
9 | 10 |
|
10 | 11 |
import com.googlecode.sarasvati.Arc; |
... | ... | |
22 | 23 |
@Override |
23 | 24 |
protected String execute(final NodeToken token) throws Exception { |
24 | 25 |
|
25 |
final String version = token.getEnv().getAttribute("index.feed.timestamp"); |
|
26 |
final String version = getEnvParam(token, "index.feed.timestamp"); |
|
27 |
final String solrBaseURL = getEnvParam(token, "index.solr.url"); |
|
28 |
final String collection = getEnvParam(token, "index.solr.collection"); |
|
26 | 29 |
|
27 |
if (StringUtils.isBlank(version))
|
|
28 |
throw new MSROException("unable to finalize index feeding, cannot find property 'index.feed.timestamp' in the workflow env.");
|
|
30 |
final CloudSolrServer solrServer = new CloudSolrServer(solrBaseURL);
|
|
31 |
solrServer.setDefaultCollection(collection);
|
|
29 | 32 |
|
30 |
final String solrBaseURL = token.getEnv().getAttribute("index.solr.url"); |
|
31 |
|
|
32 |
if (StringUtils.isBlank(solrBaseURL)) |
|
33 |
throw new MSROException("unable to finalize index feeding, cannot find property 'index.solr.url' in the workflow env."); |
|
34 |
|
|
35 |
final SolrServer solrServer = new HttpSolrServer(solrBaseURL); |
|
36 |
|
|
37 | 33 |
final String query = "__dsversion:{* TO " + InputDocumentFactory.getParsedDateField(version) + "}"; |
38 | 34 |
|
39 | 35 |
log.info("delete by query: " + query); |
... | ... | |
44 | 40 |
rsp = solrServer.commit(); |
45 | 41 |
log.info("index commit completed in: " + HumanTime.exactly(rsp.getElapsedTime())); |
46 | 42 |
|
43 |
solrServer.shutdown(); |
|
44 |
|
|
47 | 45 |
return Arc.DEFAULT_ARC; |
48 | 46 |
} |
49 | 47 |
|
48 |
private String getEnvParam(final NodeToken token, final String name) throws MSROException { |
|
49 |
final String value = token.getEnv().getAttribute(name); |
|
50 |
|
|
51 |
if (StringUtils.isBlank(value)) |
|
52 |
throw new MSROException(format("unable to finalize index feeding, cannot find property '%s' in the workflow env.", name)); |
|
53 |
|
|
54 |
return value; |
|
55 |
} |
|
56 |
|
|
50 | 57 |
} |
Also available in: Unified diff
reverted changes to working version