Revision 61279
Added by Alessia Bardi almost 3 years ago
FeedMissingClaimsJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.index; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
3 | 4 |
import java.io.StringReader; |
4 | 5 |
import java.util.ArrayList; |
5 | 6 |
import java.util.List; |
... | ... | |
32 | 33 |
*/ |
33 | 34 |
public class FeedMissingClaimsJobNode extends AsyncJobNode { |
34 | 35 |
|
35 |
private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class); |
|
36 |
public static final int BATCH_SIZE = 1000; |
|
37 |
private RecentResultsQueue queue; |
|
38 |
private OafToIndexRecordFactory oafToIndexRecordFactory; |
|
36 |
private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class); |
|
37 |
public static final int BATCH_SIZE = 1000; |
|
38 |
public static final int ATTEMPTS = 3; |
|
39 |
public static final int SLEEP_MS_SOLR_CLIENT = 5000; |
|
40 |
private RecentResultsQueue queue; |
|
41 |
private OafToIndexRecordFactory oafToIndexRecordFactory; |
|
39 | 42 |
|
40 |
@Resource
|
|
41 |
private UniqueServiceLocator serviceLocator;
|
|
43 |
@Resource
|
|
44 |
private UniqueServiceLocator serviceLocator;
|
|
42 | 45 |
|
43 |
@Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
|
|
44 |
private ClassPathResource findSolrIndexUrl;
|
|
46 |
@Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
|
|
47 |
private ClassPathResource findSolrIndexUrl;
|
|
45 | 48 |
|
46 |
@Override
|
|
47 |
protected String execute(final NodeToken nodeToken) throws Exception {
|
|
49 |
@Override
|
|
50 |
protected String execute(final NodeToken nodeToken) throws Exception {
|
|
48 | 51 |
|
49 |
final String format =
|
|
50 |
nodeToken.getEnv().hasAttribute("format") ? nodeToken.getEnv().getAttribute("format") : nodeToken.getFullEnv().getAttribute("format");
|
|
51 |
final String coll = format + "-index-openaire";
|
|
52 |
final String indexDsId = nodeToken.getEnv().getAttribute("index_id");
|
|
53 |
final String baseUrl = calculateIndexBaseUrl();
|
|
52 |
final String format =
|
|
53 |
nodeToken.getEnv().hasAttribute("format") ? nodeToken.getEnv().getAttribute("format") : nodeToken.getFullEnv().getAttribute("format");
|
|
54 |
final String coll = format + "-index-openaire";
|
|
55 |
final String indexDsId = nodeToken.getEnv().getAttribute("index_id");
|
|
56 |
final String baseUrl = calculateIndexBaseUrl();
|
|
54 | 57 |
|
55 |
CloudIndexClient idxClient = null;
|
|
58 |
CloudIndexClient idxClient = null;
|
|
56 | 59 |
|
57 |
try {
|
|
58 |
final List<SolrInputDocument> toFeed = new ArrayList<SolrInputDocument>();
|
|
59 |
final List<String> toDeleteFromCache = new ArrayList<String>();
|
|
60 |
try {
|
|
61 |
final List<SolrInputDocument> toFeed = new ArrayList<SolrInputDocument>();
|
|
62 |
final List<String> toDeleteFromCache = new ArrayList<String>();
|
|
60 | 63 |
|
61 |
final SAXReader reader = new SAXReader();
|
|
62 |
final ApplyXslt xslt = oafToIndexRecordFactory.newTransformer(format);
|
|
64 |
final SAXReader reader = new SAXReader();
|
|
65 |
final ApplyXslt xslt = oafToIndexRecordFactory.newTransformer(format);
|
|
63 | 66 |
|
64 |
idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false); |
|
65 |
log.info("Starting to feed claims in index collection "+coll); |
|
66 |
int count = 0; |
|
67 |
for (String record : queue) { |
|
68 |
try { |
|
69 |
final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']"); |
|
70 |
count++; |
|
71 |
if (log.isDebugEnabled()) { |
|
72 |
log.debug("Processing record " + count); |
|
73 |
} |
|
74 |
if (idxClient.isRecordIndexed(id)) { |
|
75 |
toDeleteFromCache.add(id); |
|
76 |
} else { |
|
77 |
toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt)); |
|
78 |
} |
|
79 |
if (count % BATCH_SIZE == 0) processLists(idxClient, toFeed, toDeleteFromCache); |
|
80 |
}catch(CloudIndexClientException cie){ |
|
81 |
log.error("Error feeding missing claims", cie); |
|
82 |
if (idxClient != null) { |
|
83 |
idxClient.close(); |
|
84 |
} |
|
85 |
idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false); |
|
86 |
log.info("Got new CloudIndexClient"); |
|
87 |
} |
|
88 |
} |
|
89 |
if(!toFeed.isEmpty() || !toDeleteFromCache.isEmpty()) processLists(idxClient, toFeed, toDeleteFromCache); |
|
90 |
log.info(String.format("Finished feeding of claims in index collection %s, total: %d", coll, count)); |
|
67 |
idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false); |
|
68 |
log.info("Starting to feed claims in index collection " + coll); |
|
69 |
int count = 0; |
|
70 |
for (String record : queue) { |
|
71 |
int max_attempts = ATTEMPTS; |
|
72 |
count++; |
|
73 |
final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']"); |
|
74 |
if (log.isDebugEnabled()) { |
|
75 |
log.debug(String.format("Processing record %s, number: %d", id, count)); |
|
76 |
} |
|
77 |
if (isRecordIndexed(idxClient, baseUrl, coll, id, max_attempts)) { |
|
78 |
toDeleteFromCache.add(id); |
|
79 |
} else { |
|
80 |
max_attempts = ATTEMPTS; |
|
81 |
toFeed.add(prepareSolrDoc(idxClient, baseUrl, coll, id, record, indexDsId, xslt, max_attempts)); |
|
82 |
} |
|
83 |
if (count % BATCH_SIZE == 0) processLists(idxClient, baseUrl, coll, toFeed, toDeleteFromCache); |
|
84 |
} |
|
85 |
if (!toFeed.isEmpty() || !toDeleteFromCache.isEmpty()) processLists(idxClient, baseUrl, coll, toFeed, toDeleteFromCache); |
|
86 |
log.info(String.format("Finished feeding of claims in index collection %s, total: %d", coll, count)); |
|
91 | 87 |
|
92 |
} catch(Throwable e) {
|
|
93 |
log.error("Error feeding missing claims", e);
|
|
94 |
throw new MSROException("Error feeding missing claims: " + e.getMessage(), e);
|
|
95 |
} finally {
|
|
96 |
if (idxClient != null) {
|
|
97 |
idxClient.close();
|
|
98 |
}
|
|
99 |
log.info("Closed Solr index client");
|
|
100 |
}
|
|
101 |
log.info("Now proceeding to Arc.DEFAULT_ARC");
|
|
102 |
return Arc.DEFAULT_ARC;
|
|
103 |
}
|
|
88 |
} catch (Throwable e) {
|
|
89 |
log.error("Error feeding missing claims", e);
|
|
90 |
throw e;
|
|
91 |
} finally {
|
|
92 |
if (idxClient != null) {
|
|
93 |
idxClient.close();
|
|
94 |
}
|
|
95 |
log.info("Closed Solr index client");
|
|
96 |
}
|
|
97 |
log.info("Now proceeding to Arc.DEFAULT_ARC");
|
|
98 |
return Arc.DEFAULT_ARC;
|
|
99 |
}
|
|
104 | 100 |
|
101 |
protected boolean isRecordIndexed(CloudIndexClient idxClient, String baseUrl, String coll, String id, int attempt) throws IOException, CloudIndexClientException, MSROException, InterruptedException { |
|
102 |
try { |
|
103 |
return idxClient.isRecordIndexed(id); |
|
104 |
} catch (CloudIndexClientException cie) { |
|
105 |
log.error(String.format("Error querying for %s, message: %s. Trying again, remaining attempts:", id, cie, attempt)); |
|
106 |
idxClient = resetCloudIndexClient(idxClient, baseUrl, coll); |
|
107 |
if (attempt > 0) return isRecordIndexed(idxClient, baseUrl, coll, id, --attempt); |
|
108 |
else { |
|
109 |
String msg = String.format("Too many attempts %d to recreate the index client for checking if record %s exists.", ATTEMPTS, id); |
|
110 |
log.error(msg); |
|
111 |
throw new MSROException(cie); |
|
112 |
} |
|
113 |
} |
|
114 |
} |
|
105 | 115 |
|
116 |
protected SolrInputDocument prepareSolrDoc(CloudIndexClient idxClient, String baseUrl, String coll, String recordId, String record, String indexDsId, ApplyXslt xslt, int attempt) throws IOException, CloudIndexClientException, MSROException, InterruptedException { |
|
117 |
try { |
|
118 |
return idxClient.prepareSolrDocument(record, indexDsId, xslt); |
|
119 |
} catch (CloudIndexClientException cie) { |
|
120 |
log.error(String.format("Error preparing Solr doc for %s, message: %s. Trying again, remaining attempts:", recordId, cie, attempt)); |
|
121 |
idxClient = resetCloudIndexClient(idxClient, baseUrl, coll); |
|
122 |
if (attempt > 0) |
|
123 |
return prepareSolrDoc(idxClient, baseUrl, coll, recordId, record, indexDsId, xslt, --attempt); |
|
124 |
else { |
|
125 |
String msg = String.format("Too many attempts %d to recreate the index client for preparing SolrDocument for %s", ATTEMPTS, id); |
|
126 |
log.error(msg); |
|
127 |
throw new MSROException(cie); |
|
128 |
} |
|
129 |
} |
|
130 |
} |
|
106 | 131 |
|
107 |
private void processLists(final CloudIndexClient idxClient, final List<SolrInputDocument> toFeed, final List<String> toDeleteFromCache) throws CloudIndexClientException{ |
|
108 |
idxClient.feed(toFeed, null); |
|
109 |
queue.remove(toDeleteFromCache); |
|
110 |
log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size())); |
|
111 |
toFeed.clear(); |
|
112 |
toDeleteFromCache.clear(); |
|
113 |
log.info("Cleaned temporary lists"); |
|
114 |
} |
|
132 |
protected void tryToFeed(CloudIndexClient idxClient, String baseUrl, String coll, List<SolrInputDocument> toFeed, int attempt) throws MSROException, IOException, CloudIndexClientException, InterruptedException { |
|
133 |
try { |
|
134 |
idxClient.feed(toFeed, null); |
|
135 |
} catch (CloudIndexClientException cie) { |
|
136 |
log.error(String.format("Error feeding Solr in attempt number %d", attempt)); |
|
137 |
idxClient = resetCloudIndexClient(idxClient, baseUrl, coll); |
|
138 |
if (attempt > 0) tryToFeed(idxClient, baseUrl, coll, toFeed, --attempt); |
|
139 |
else { |
|
140 |
String msg = String.format("Too many attempts %d to recreate the index client for feeding Solr", ATTEMPTS); |
|
141 |
log.error(msg); |
|
142 |
throw new MSROException(cie); |
|
143 |
} |
|
144 |
} |
|
145 |
} |
|
115 | 146 |
|
116 |
public RecentResultsQueue getQueue() { |
|
117 |
return queue; |
|
118 |
} |
|
147 |
private CloudIndexClient resetCloudIndexClient(CloudIndexClient idxClient, String baseUrl, String coll) throws IOException, CloudIndexClientException, InterruptedException { |
|
148 |
if (idxClient != null) { |
|
149 |
idxClient.close(); |
|
150 |
} |
|
151 |
Thread.sleep(SLEEP_MS_SOLR_CLIENT); |
|
152 |
CloudIndexClient newclient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false); |
|
153 |
log.info("Got new CloudIndexClient"); |
|
154 |
return newclient; |
|
155 |
} |
|
119 | 156 |
|
120 |
@Required |
|
121 |
public void setQueue(final RecentResultsQueue queue) { |
|
122 |
this.queue = queue; |
|
123 |
} |
|
124 | 157 |
|
125 |
public OafToIndexRecordFactory getOafToIndexRecordFactory() { |
|
126 |
return oafToIndexRecordFactory; |
|
127 |
} |
|
158 |
private void processLists(final CloudIndexClient idxClient, String baseUrl, String coll, final List<SolrInputDocument> toFeed, final List<String> toDeleteFromCache) throws CloudIndexClientException, MSROException, IOException, InterruptedException { |
|
159 |
int max_attempts = ATTEMPTS; |
|
160 |
tryToFeed(idxClient, baseUrl, coll, toFeed, max_attempts); |
|
161 |
queue.remove(toDeleteFromCache); |
|
162 |
log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size())); |
|
163 |
toFeed.clear(); |
|
164 |
toDeleteFromCache.clear(); |
|
165 |
log.info("Cleaned temporary lists"); |
|
166 |
} |
|
128 | 167 |
|
129 |
@Required |
|
130 |
public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) { |
|
131 |
this.oafToIndexRecordFactory = oafToIndexRecordFactory; |
|
132 |
} |
|
168 |
public RecentResultsQueue getQueue() { |
|
169 |
return queue; |
|
170 |
} |
|
133 | 171 |
|
134 |
private String calculateIndexBaseUrl() throws Exception { |
|
135 |
final String query = IOUtils.toString(findSolrIndexUrl.getInputStream()); |
|
136 |
return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query); |
|
137 |
} |
|
172 |
@Required |
|
173 |
public void setQueue(final RecentResultsQueue queue) { |
|
174 |
this.queue = queue; |
|
175 |
} |
|
176 |
|
|
177 |
public OafToIndexRecordFactory getOafToIndexRecordFactory() { |
|
178 |
return oafToIndexRecordFactory; |
|
179 |
} |
|
180 |
|
|
181 |
@Required |
|
182 |
public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) { |
|
183 |
this.oafToIndexRecordFactory = oafToIndexRecordFactory; |
|
184 |
} |
|
185 |
|
|
186 |
private String calculateIndexBaseUrl() throws Exception { |
|
187 |
final String query = IOUtils.toString(findSolrIndexUrl.getInputStream()); |
|
188 |
return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query); |
|
189 |
} |
|
138 | 190 |
} |
Also available in: Unified diff
catch CloudIndexClientException trying to avoid connection problems between SolrJClient and ZooKeeper