1
|
package eu.dnetlib.msro.workflows.nodes.index;
|
2
|
|
3
|
import java.io.StringReader;
|
4
|
import java.util.ArrayList;
|
5
|
import java.util.List;
|
6
|
import javax.annotation.Resource;
|
7
|
|
8
|
import com.google.common.base.Function;
|
9
|
import eu.dnetlib.data.index.CloudIndexClient;
|
10
|
import eu.dnetlib.data.index.CloudIndexClientException;
|
11
|
import eu.dnetlib.data.index.CloudIndexClientFactory;
|
12
|
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
|
13
|
import eu.dnetlib.msro.workflows.graph.Arc;
|
14
|
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
|
15
|
import eu.dnetlib.msro.workflows.procs.Env;
|
16
|
import eu.dnetlib.openaire.api.RecentPublicationsQueue;
|
17
|
import eu.dnetlib.rmi.enabling.ISLookUpService;
|
18
|
import eu.dnetlib.rmi.manager.MSROException;
|
19
|
import org.apache.commons.io.IOUtils;
|
20
|
import org.apache.commons.logging.Log;
|
21
|
import org.apache.commons.logging.LogFactory;
|
22
|
import org.apache.solr.common.SolrInputDocument;
|
23
|
import org.dom4j.io.SAXReader;
|
24
|
import org.springframework.beans.factory.annotation.Required;
|
25
|
import org.springframework.beans.factory.annotation.Value;
|
26
|
import org.springframework.core.io.ClassPathResource;
|
27
|
|
28
|
/**
|
29
|
* Created by michele on 15/12/15.
|
30
|
*/
|
31
|
public class FeedMissingClaimsJobNode extends AsyncJobNode {
|
32
|
|
33
|
private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class);
|
34
|
private RecentPublicationsQueue queue;
|
35
|
private OafToIndexRecordFactory oafToIndexRecordFactory;
|
36
|
|
37
|
@Resource
|
38
|
private UniqueServiceLocator serviceLocator;
|
39
|
|
40
|
@Value(value = "${openaireplus.msro.api.findSolrIndexUrl.xquery}")
|
41
|
private ClassPathResource findSolrIndexUrl;
|
42
|
|
43
|
@Override
|
44
|
protected String execute(final Env env) throws Exception {
|
45
|
|
46
|
final String format = env.getAttribute("format", String.class);
|
47
|
final String coll = format + "-index-openaire";
|
48
|
final String indexDsId = env.getAttribute("index_id", String.class);
|
49
|
final String baseUrl = calculateIndexBaseUrl();
|
50
|
|
51
|
CloudIndexClient idxClient = null;
|
52
|
|
53
|
try {
|
54
|
final List<SolrInputDocument> toFeed = new ArrayList<>();
|
55
|
final List<String> toDeleteFromCache = new ArrayList<>();
|
56
|
|
57
|
final SAXReader reader = new SAXReader();
|
58
|
final Function<String, String> xslt = (Function<String, String>) oafToIndexRecordFactory.newTransformer(format);
|
59
|
|
60
|
idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
|
61
|
|
62
|
for (final String record : this.queue) {
|
63
|
final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']");
|
64
|
if (idxClient.isRecordIndexed(id)) {
|
65
|
toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt));
|
66
|
} else {
|
67
|
toDeleteFromCache.add(id);
|
68
|
}
|
69
|
}
|
70
|
|
71
|
idxClient.feed(toFeed, null);
|
72
|
this.queue.remove(toDeleteFromCache);
|
73
|
|
74
|
} catch (final Throwable e) {
|
75
|
log.error("Error feeding missing claims", e);
|
76
|
throw new MSROException("Error feeding missing claims: " + e.getMessage(), e);
|
77
|
} finally {
|
78
|
if (idxClient != null) {
|
79
|
idxClient.close();
|
80
|
}
|
81
|
}
|
82
|
|
83
|
return Arc.DEFAULT_ARC;
|
84
|
}
|
85
|
|
86
|
public RecentPublicationsQueue getQueue() {
|
87
|
return this.queue;
|
88
|
}
|
89
|
|
90
|
@Required
|
91
|
public void setQueue(final RecentPublicationsQueue queue) {
|
92
|
this.queue = queue;
|
93
|
}
|
94
|
|
95
|
public OafToIndexRecordFactory getOafToIndexRecordFactory() {
|
96
|
return this.oafToIndexRecordFactory;
|
97
|
}
|
98
|
|
99
|
@Required
|
100
|
public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) {
|
101
|
this.oafToIndexRecordFactory = oafToIndexRecordFactory;
|
102
|
}
|
103
|
|
104
|
private String calculateIndexBaseUrl() throws Exception {
|
105
|
final String query = IOUtils.toString(this.findSolrIndexUrl.getInputStream());
|
106
|
return this.serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
|
107
|
}
|
108
|
|
109
|
private class IsNotIndexed implements com.google.common.base.Predicate<String> {
|
110
|
|
111
|
private final CloudIndexClient idxClient;
|
112
|
|
113
|
private final SAXReader reader = new SAXReader();
|
114
|
|
115
|
public IsNotIndexed(final CloudIndexClient idxClient) {
|
116
|
this.idxClient = idxClient;
|
117
|
}
|
118
|
|
119
|
@Override
|
120
|
public boolean apply(final String s) {
|
121
|
try {
|
122
|
final String id = this.reader.read(new StringReader(s)).valueOf("//*[local-name() = 'objIdentifier']");
|
123
|
return !this.idxClient.isRecordIndexed(id);
|
124
|
} catch (final Throwable e) {
|
125
|
log.error("Error searching record: " + s, e);
|
126
|
throw new RuntimeException(e);
|
127
|
}
|
128
|
}
|
129
|
|
130
|
}
|
131
|
|
132
|
private class CreateSolrDocument implements Function<String, SolrInputDocument> {
|
133
|
|
134
|
private final CloudIndexClient idxClient;
|
135
|
private final String indexDsId;
|
136
|
private final Function<String, String> toIndexRecord;
|
137
|
|
138
|
public CreateSolrDocument(final CloudIndexClient idxClient, final String indexDsId, final Function<String, String> toIndexRecord) {
|
139
|
this.idxClient = idxClient;
|
140
|
this.indexDsId = indexDsId;
|
141
|
this.toIndexRecord = toIndexRecord;
|
142
|
}
|
143
|
|
144
|
@Override
|
145
|
public SolrInputDocument apply(final String s) {
|
146
|
try {
|
147
|
return this.idxClient.prepareSolrDocument(s, this.indexDsId, this.toIndexRecord);
|
148
|
} catch (final CloudIndexClientException e) {
|
149
|
throw new RuntimeException(e);
|
150
|
}
|
151
|
}
|
152
|
|
153
|
}
|
154
|
}
|