1
|
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.io.StringReader;
|
5
|
import java.util.ArrayList;
|
6
|
import java.util.List;
|
7
|
import javax.annotation.Resource;
|
8
|
|
9
|
import com.googlecode.sarasvati.Arc;
|
10
|
import com.googlecode.sarasvati.NodeToken;
|
11
|
import eu.dnetlib.data.index.CloudIndexClient;
|
12
|
import eu.dnetlib.data.index.CloudIndexClientFactory;
|
13
|
import eu.dnetlib.data.index.CloudIndexClientException;
|
14
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
15
|
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
|
16
|
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
|
17
|
import eu.dnetlib.msro.rmi.MSROException;
|
18
|
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
|
19
|
import eu.dnetlib.openaire.directindex.api.RecentResultsQueue;
|
20
|
import eu.dnetlib.openaire.directindex.utils.OafToIndexRecordFactory;
|
21
|
|
22
|
import org.apache.commons.io.IOUtils;
|
23
|
import org.apache.commons.logging.Log;
|
24
|
import org.apache.commons.logging.LogFactory;
|
25
|
import org.apache.solr.common.SolrInputDocument;
|
26
|
import org.dom4j.io.SAXReader;
|
27
|
import org.springframework.beans.factory.annotation.Required;
|
28
|
import org.springframework.beans.factory.annotation.Value;
|
29
|
import org.springframework.core.io.ClassPathResource;
|
30
|
|
31
|
/**
|
32
|
* Created by michele on 15/12/15.
|
33
|
*/
|
34
|
public class FeedMissingClaimsJobNode extends AsyncJobNode {
|
35
|
|
36
|
private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class);
|
37
|
public static final int BATCH_SIZE = 1000;
|
38
|
public static final int ATTEMPTS = 3;
|
39
|
public static final int SLEEP_MS_SOLR_CLIENT = 5000;
|
40
|
private RecentResultsQueue queue;
|
41
|
private OafToIndexRecordFactory oafToIndexRecordFactory;
|
42
|
|
43
|
@Resource
|
44
|
private UniqueServiceLocator serviceLocator;
|
45
|
|
46
|
@Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
|
47
|
private ClassPathResource findSolrIndexUrl;
|
48
|
|
49
|
@Override
|
50
|
protected String execute(final NodeToken nodeToken) throws Exception {
|
51
|
|
52
|
final String format =
|
53
|
nodeToken.getEnv().hasAttribute("format") ? nodeToken.getEnv().getAttribute("format") : nodeToken.getFullEnv().getAttribute("format");
|
54
|
final String coll = format + "-index-openaire";
|
55
|
final String indexDsId = nodeToken.getEnv().getAttribute("index_id");
|
56
|
final String baseUrl = calculateIndexBaseUrl();
|
57
|
|
58
|
CloudIndexClient idxClient = null;
|
59
|
|
60
|
try {
|
61
|
final List<SolrInputDocument> toFeed = new ArrayList<SolrInputDocument>();
|
62
|
final List<String> toDeleteFromCache = new ArrayList<String>();
|
63
|
|
64
|
final SAXReader reader = new SAXReader();
|
65
|
final ApplyXslt xslt = oafToIndexRecordFactory.newTransformer(format);
|
66
|
|
67
|
idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
|
68
|
log.info("Starting to feed claims in index collection " + coll);
|
69
|
int count = 0;
|
70
|
for (String record : queue) {
|
71
|
int max_attempts = ATTEMPTS;
|
72
|
count++;
|
73
|
final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']");
|
74
|
if (log.isDebugEnabled()) {
|
75
|
log.debug(String.format("Processing record %s, number: %d", id, count));
|
76
|
}
|
77
|
if (isRecordIndexed(idxClient, baseUrl, coll, id, max_attempts)) {
|
78
|
toDeleteFromCache.add(id);
|
79
|
} else {
|
80
|
max_attempts = ATTEMPTS;
|
81
|
toFeed.add(prepareSolrDoc(idxClient, baseUrl, coll, id, record, indexDsId, xslt, max_attempts));
|
82
|
}
|
83
|
if (count % BATCH_SIZE == 0) processLists(idxClient, baseUrl, coll, toFeed, toDeleteFromCache);
|
84
|
}
|
85
|
if (!toFeed.isEmpty() || !toDeleteFromCache.isEmpty()) processLists(idxClient, baseUrl, coll, toFeed, toDeleteFromCache);
|
86
|
log.info(String.format("Finished feeding of claims in index collection %s, total: %d", coll, count));
|
87
|
|
88
|
} catch (Throwable e) {
|
89
|
log.error("Error feeding missing claims", e);
|
90
|
throw e;
|
91
|
} finally {
|
92
|
if (idxClient != null) {
|
93
|
idxClient.close();
|
94
|
}
|
95
|
log.info("Closed Solr index client");
|
96
|
}
|
97
|
log.info("Now proceeding to Arc.DEFAULT_ARC");
|
98
|
return Arc.DEFAULT_ARC;
|
99
|
}
|
100
|
|
101
|
protected boolean isRecordIndexed(CloudIndexClient idxClient, String baseUrl, String coll, String id, int attempt) throws IOException, CloudIndexClientException, MSROException, InterruptedException {
|
102
|
try {
|
103
|
return idxClient.isRecordIndexed(id);
|
104
|
} catch (CloudIndexClientException cie) {
|
105
|
log.error(String.format("Error querying for %s, message: %s. Trying again, remaining attempts:", id, cie, attempt));
|
106
|
idxClient = resetCloudIndexClient(idxClient, baseUrl, coll);
|
107
|
if (attempt > 0) return isRecordIndexed(idxClient, baseUrl, coll, id, --attempt);
|
108
|
else {
|
109
|
String msg = String.format("Too many attempts %d to recreate the index client for checking if record %s exists.", ATTEMPTS, id);
|
110
|
log.error(msg);
|
111
|
throw new MSROException(cie);
|
112
|
}
|
113
|
}
|
114
|
}
|
115
|
|
116
|
protected SolrInputDocument prepareSolrDoc(CloudIndexClient idxClient, String baseUrl, String coll, String recordId, String record, String indexDsId, ApplyXslt xslt, int attempt) throws IOException, CloudIndexClientException, MSROException, InterruptedException {
|
117
|
try {
|
118
|
return idxClient.prepareSolrDocument(record, indexDsId, xslt);
|
119
|
} catch (CloudIndexClientException cie) {
|
120
|
log.error(String.format("Error preparing Solr doc for %s, message: %s. Trying again, remaining attempts:", recordId, cie, attempt));
|
121
|
idxClient = resetCloudIndexClient(idxClient, baseUrl, coll);
|
122
|
if (attempt > 0)
|
123
|
return prepareSolrDoc(idxClient, baseUrl, coll, recordId, record, indexDsId, xslt, --attempt);
|
124
|
else {
|
125
|
String msg = String.format("Too many attempts %d to recreate the index client for preparing SolrDocument for %s", ATTEMPTS, id);
|
126
|
log.error(msg);
|
127
|
throw new MSROException(cie);
|
128
|
}
|
129
|
}
|
130
|
}
|
131
|
|
132
|
protected void tryToFeed(CloudIndexClient idxClient, String baseUrl, String coll, List<SolrInputDocument> toFeed, int attempt) throws MSROException, IOException, CloudIndexClientException, InterruptedException {
|
133
|
try {
|
134
|
idxClient.feed(toFeed, null);
|
135
|
} catch (CloudIndexClientException cie) {
|
136
|
log.error(String.format("Error feeding Solr in attempt number %d", attempt));
|
137
|
idxClient = resetCloudIndexClient(idxClient, baseUrl, coll);
|
138
|
if (attempt > 0) tryToFeed(idxClient, baseUrl, coll, toFeed, --attempt);
|
139
|
else {
|
140
|
String msg = String.format("Too many attempts %d to recreate the index client for feeding Solr", ATTEMPTS);
|
141
|
log.error(msg);
|
142
|
throw new MSROException(cie);
|
143
|
}
|
144
|
}
|
145
|
}
|
146
|
|
147
|
private CloudIndexClient resetCloudIndexClient(CloudIndexClient idxClient, String baseUrl, String coll) throws IOException, CloudIndexClientException, InterruptedException {
|
148
|
if (idxClient != null) {
|
149
|
idxClient.close();
|
150
|
}
|
151
|
Thread.sleep(SLEEP_MS_SOLR_CLIENT);
|
152
|
CloudIndexClient newclient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
|
153
|
log.info("Got new CloudIndexClient");
|
154
|
return newclient;
|
155
|
}
|
156
|
|
157
|
|
158
|
private void processLists(final CloudIndexClient idxClient, String baseUrl, String coll, final List<SolrInputDocument> toFeed, final List<String> toDeleteFromCache) throws CloudIndexClientException, MSROException, IOException, InterruptedException {
|
159
|
int max_attempts = ATTEMPTS;
|
160
|
tryToFeed(idxClient, baseUrl, coll, toFeed, max_attempts);
|
161
|
queue.remove(toDeleteFromCache);
|
162
|
log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size()));
|
163
|
toFeed.clear();
|
164
|
toDeleteFromCache.clear();
|
165
|
log.info("Cleaned temporary lists");
|
166
|
}
|
167
|
|
168
|
public RecentResultsQueue getQueue() {
|
169
|
return queue;
|
170
|
}
|
171
|
|
172
|
@Required
|
173
|
public void setQueue(final RecentResultsQueue queue) {
|
174
|
this.queue = queue;
|
175
|
}
|
176
|
|
177
|
public OafToIndexRecordFactory getOafToIndexRecordFactory() {
|
178
|
return oafToIndexRecordFactory;
|
179
|
}
|
180
|
|
181
|
@Required
|
182
|
public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) {
|
183
|
this.oafToIndexRecordFactory = oafToIndexRecordFactory;
|
184
|
}
|
185
|
|
186
|
private String calculateIndexBaseUrl() throws Exception {
|
187
|
final String query = IOUtils.toString(findSolrIndexUrl.getInputStream());
|
188
|
return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
|
189
|
}
|
190
|
}
|