1
|
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
|
2
|
|
3
|
import java.io.StringReader;
|
4
|
import java.util.ArrayList;
|
5
|
import java.util.List;
|
6
|
import javax.annotation.Resource;
|
7
|
|
8
|
import com.googlecode.sarasvati.Arc;
|
9
|
import com.googlecode.sarasvati.NodeToken;
|
10
|
import eu.dnetlib.data.index.CloudIndexClient;
|
11
|
import eu.dnetlib.data.index.CloudIndexClientFactory;
|
12
|
import eu.dnetlib.data.index.CloudIndexClientException;
|
13
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
14
|
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
|
15
|
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
|
16
|
import eu.dnetlib.msro.rmi.MSROException;
|
17
|
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
|
18
|
import eu.dnetlib.openaire.directindex.api.RecentResultsQueue;
|
19
|
import eu.dnetlib.openaire.directindex.utils.OafToIndexRecordFactory;
|
20
|
|
21
|
import org.apache.commons.io.IOUtils;
|
22
|
import org.apache.commons.logging.Log;
|
23
|
import org.apache.commons.logging.LogFactory;
|
24
|
import org.apache.solr.common.SolrInputDocument;
|
25
|
import org.dom4j.io.SAXReader;
|
26
|
import org.springframework.beans.factory.annotation.Required;
|
27
|
import org.springframework.beans.factory.annotation.Value;
|
28
|
import org.springframework.core.io.ClassPathResource;
|
29
|
|
30
|
/**
|
31
|
* Created by michele on 15/12/15.
|
32
|
*/
|
33
|
public class FeedMissingClaimsJobNode extends AsyncJobNode {
|
34
|
|
35
|
private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class);
|
36
|
public static final int BATCH_SIZE = 1000;
|
37
|
private RecentResultsQueue queue;
|
38
|
private OafToIndexRecordFactory oafToIndexRecordFactory;
|
39
|
|
40
|
@Resource
|
41
|
private UniqueServiceLocator serviceLocator;
|
42
|
|
43
|
@Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
|
44
|
private ClassPathResource findSolrIndexUrl;
|
45
|
|
46
|
@Override
|
47
|
protected String execute(final NodeToken nodeToken) throws Exception {
|
48
|
|
49
|
final String format =
|
50
|
nodeToken.getEnv().hasAttribute("format") ? nodeToken.getEnv().getAttribute("format") : nodeToken.getFullEnv().getAttribute("format");
|
51
|
final String coll = format + "-index-openaire";
|
52
|
final String indexDsId = nodeToken.getEnv().getAttribute("index_id");
|
53
|
final String baseUrl = calculateIndexBaseUrl();
|
54
|
|
55
|
CloudIndexClient idxClient = null;
|
56
|
|
57
|
try {
|
58
|
final List<SolrInputDocument> toFeed = new ArrayList<SolrInputDocument>();
|
59
|
final List<String> toDeleteFromCache = new ArrayList<String>();
|
60
|
|
61
|
final SAXReader reader = new SAXReader();
|
62
|
final ApplyXslt xslt = oafToIndexRecordFactory.newTransformer(format);
|
63
|
|
64
|
idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
|
65
|
log.info("Starting to feed claims in index collection "+coll);
|
66
|
int count = 0;
|
67
|
for (String record : queue) {
|
68
|
final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']");
|
69
|
count++;
|
70
|
if(log.isDebugEnabled()){
|
71
|
log.debug("Processing record "+count);
|
72
|
}
|
73
|
if (idxClient.isRecordIndexed(id)) {
|
74
|
toDeleteFromCache.add(id);
|
75
|
} else {
|
76
|
toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt));
|
77
|
}
|
78
|
if(count % BATCH_SIZE == 0) processLists(idxClient, toFeed, toDeleteFromCache);
|
79
|
|
80
|
}
|
81
|
if(!toFeed.isEmpty() || !toDeleteFromCache.isEmpty()) processLists(idxClient, toFeed, toDeleteFromCache);
|
82
|
log.info(String.format("Finished feeding of claims in index collection %s, total: %d", coll, count));
|
83
|
|
84
|
} catch (Throwable e) {
|
85
|
log.error("Error feeding missing claims", e);
|
86
|
throw new MSROException("Error feeding missing claims: " + e.getMessage(), e);
|
87
|
} finally {
|
88
|
if (idxClient != null) {
|
89
|
idxClient.close();
|
90
|
}
|
91
|
log.info("Closed Solr index client");
|
92
|
}
|
93
|
log.info("Now proceeding to Arc.DEFAULT_ARC");
|
94
|
return Arc.DEFAULT_ARC;
|
95
|
}
|
96
|
|
97
|
|
98
|
|
99
|
private void processLists(final CloudIndexClient idxClient, final List<SolrInputDocument> toFeed, final List<String> toDeleteFromCache) throws CloudIndexClientException{
|
100
|
idxClient.feed(toFeed, null);
|
101
|
queue.remove(toDeleteFromCache);
|
102
|
log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size()));
|
103
|
toFeed.clear();
|
104
|
toDeleteFromCache.clear();
|
105
|
log.info("Cleaned temporary lists");
|
106
|
}
|
107
|
|
108
|
public RecentResultsQueue getQueue() {
|
109
|
return queue;
|
110
|
}
|
111
|
|
112
|
@Required
|
113
|
public void setQueue(final RecentResultsQueue queue) {
|
114
|
this.queue = queue;
|
115
|
}
|
116
|
|
117
|
public OafToIndexRecordFactory getOafToIndexRecordFactory() {
|
118
|
return oafToIndexRecordFactory;
|
119
|
}
|
120
|
|
121
|
@Required
|
122
|
public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) {
|
123
|
this.oafToIndexRecordFactory = oafToIndexRecordFactory;
|
124
|
}
|
125
|
|
126
|
private String calculateIndexBaseUrl() throws Exception {
|
127
|
final String query = IOUtils.toString(findSolrIndexUrl.getInputStream());
|
128
|
return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
|
129
|
}
|
130
|
}
|