1
|
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
|
2
|
|
3
|
import java.io.StringReader;
|
4
|
import java.util.ArrayList;
|
5
|
import java.util.List;
|
6
|
import javax.annotation.Resource;
|
7
|
|
8
|
import com.googlecode.sarasvati.Arc;
|
9
|
import com.googlecode.sarasvati.NodeToken;
|
10
|
import eu.dnetlib.data.index.CloudIndexClient;
|
11
|
import eu.dnetlib.data.index.CloudIndexClientFactory;
|
12
|
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
|
13
|
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
|
14
|
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
|
15
|
import eu.dnetlib.msro.rmi.MSROException;
|
16
|
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
|
17
|
import eu.dnetlib.openaire.directindex.api.RecentResultsQueue;
|
18
|
import eu.dnetlib.openaire.directindex.utils.OafToIndexRecordFactory;
|
19
|
|
20
|
import org.apache.commons.io.IOUtils;
|
21
|
import org.apache.commons.logging.Log;
|
22
|
import org.apache.commons.logging.LogFactory;
|
23
|
import org.apache.solr.common.SolrInputDocument;
|
24
|
import org.dom4j.io.SAXReader;
|
25
|
import org.springframework.beans.factory.annotation.Required;
|
26
|
import org.springframework.beans.factory.annotation.Value;
|
27
|
import org.springframework.core.io.ClassPathResource;
|
28
|
|
29
|
/**
|
30
|
* Created by michele on 15/12/15.
|
31
|
*/
|
32
|
public class FeedMissingClaimsJobNode extends AsyncJobNode {
|
33
|
|
34
|
private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class);
|
35
|
private RecentResultsQueue queue;
|
36
|
private OafToIndexRecordFactory oafToIndexRecordFactory;
|
37
|
|
38
|
@Resource
|
39
|
private UniqueServiceLocator serviceLocator;
|
40
|
|
41
|
@Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
|
42
|
private ClassPathResource findSolrIndexUrl;
|
43
|
|
44
|
@Override
|
45
|
protected String execute(final NodeToken nodeToken) throws Exception {
|
46
|
|
47
|
final String format =
|
48
|
nodeToken.getEnv().hasAttribute("format") ? nodeToken.getEnv().getAttribute("format") : nodeToken.getFullEnv().getAttribute("format");
|
49
|
final String coll = format + "-index-openaire";
|
50
|
final String indexDsId = nodeToken.getEnv().getAttribute("index_id");
|
51
|
final String baseUrl = calculateIndexBaseUrl();
|
52
|
|
53
|
CloudIndexClient idxClient = null;
|
54
|
|
55
|
try {
|
56
|
final List<SolrInputDocument> toFeed = new ArrayList<SolrInputDocument>();
|
57
|
final List<String> toDeleteFromCache = new ArrayList<String>();
|
58
|
|
59
|
final SAXReader reader = new SAXReader();
|
60
|
final ApplyXslt xslt = oafToIndexRecordFactory.newTransformer(format);
|
61
|
|
62
|
idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
|
63
|
log.info("Starting to feed claims in index collection "+coll);
|
64
|
int count = 1;
|
65
|
for (String record : queue) {
|
66
|
final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']");
|
67
|
if(log.isDebugEnabled()){
|
68
|
log.debug("Processing record "+count);
|
69
|
}
|
70
|
if (idxClient.isRecordIndexed(id)) {
|
71
|
toDeleteFromCache.add(id);
|
72
|
} else {
|
73
|
toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt));
|
74
|
}
|
75
|
}
|
76
|
|
77
|
idxClient.feed(toFeed, null);
|
78
|
queue.remove(toDeleteFromCache);
|
79
|
log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size()));
|
80
|
} catch (Throwable e) {
|
81
|
log.error("Error feeding missing claims", e);
|
82
|
throw new MSROException("Error feeding missing claims: " + e.getMessage(), e);
|
83
|
} finally {
|
84
|
if (idxClient != null) {
|
85
|
idxClient.close();
|
86
|
}
|
87
|
}
|
88
|
|
89
|
return Arc.DEFAULT_ARC;
|
90
|
}
|
91
|
|
92
|
public RecentResultsQueue getQueue() {
|
93
|
return queue;
|
94
|
}
|
95
|
|
96
|
@Required
|
97
|
public void setQueue(final RecentResultsQueue queue) {
|
98
|
this.queue = queue;
|
99
|
}
|
100
|
|
101
|
public OafToIndexRecordFactory getOafToIndexRecordFactory() {
|
102
|
return oafToIndexRecordFactory;
|
103
|
}
|
104
|
|
105
|
@Required
|
106
|
public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) {
|
107
|
this.oafToIndexRecordFactory = oafToIndexRecordFactory;
|
108
|
}
|
109
|
|
110
|
private String calculateIndexBaseUrl() throws Exception {
|
111
|
final String query = IOUtils.toString(findSolrIndexUrl.getInputStream());
|
112
|
return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
|
113
|
}
|
114
|
}
|