Revision 48139
Added by Alessia Bardi over 6 years ago
FeedMissingClaimsJobNode.java | ||
---|---|---|
5 | 5 |
import java.util.List; |
6 | 6 |
import javax.annotation.Resource; |
7 | 7 |
|
8 |
import com.google.common.base.Function; |
|
9 |
import com.google.common.base.Predicate; |
|
10 | 8 |
import com.googlecode.sarasvati.Arc; |
11 | 9 |
import com.googlecode.sarasvati.NodeToken; |
12 | 10 |
import eu.dnetlib.data.index.CloudIndexClient; |
13 |
import eu.dnetlib.data.index.CloudIndexClientException; |
|
14 | 11 |
import eu.dnetlib.data.index.CloudIndexClientFactory; |
15 | 12 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
16 | 13 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
17 |
import eu.dnetlib.miscutils.functional.UnaryFunction; |
|
18 | 14 |
import eu.dnetlib.miscutils.functional.xml.ApplyXslt; |
19 |
import eu.dnetlib.msro.openaireplus.api.RecentPublicationsQueue;
|
|
15 |
import eu.dnetlib.msro.openaireplus.api.RecentResultsQueue;
|
|
20 | 16 |
import eu.dnetlib.msro.openaireplus.utils.OafToIndexRecordFactory; |
21 | 17 |
import eu.dnetlib.msro.rmi.MSROException; |
22 | 18 |
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode; |
... | ... | |
35 | 31 |
public class FeedMissingClaimsJobNode extends AsyncJobNode { |
36 | 32 |
|
37 | 33 |
private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class); |
38 |
private RecentPublicationsQueue queue;
|
|
34 |
private RecentResultsQueue queue;
|
|
39 | 35 |
private OafToIndexRecordFactory oafToIndexRecordFactory; |
40 | 36 |
|
41 | 37 |
@Resource |
... | ... | |
67 | 63 |
for (String record : queue) { |
68 | 64 |
final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']"); |
69 | 65 |
if (idxClient.isRecordIndexed(id)) { |
66 |
toDeleteFromCache.add(id); |
|
67 |
} else { |
|
70 | 68 |
toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt)); |
71 |
} else { |
|
72 |
toDeleteFromCache.add(id); |
|
73 | 69 |
} |
74 | 70 |
} |
75 | 71 |
|
... | ... | |
88 | 84 |
return Arc.DEFAULT_ARC; |
89 | 85 |
} |
90 | 86 |
|
91 |
public RecentPublicationsQueue getQueue() {
|
|
87 |
public RecentResultsQueue getQueue() {
|
|
92 | 88 |
return queue; |
93 | 89 |
} |
94 | 90 |
|
95 | 91 |
@Required |
96 |
public void setQueue(final RecentPublicationsQueue queue) {
|
|
92 |
public void setQueue(final RecentResultsQueue queue) {
|
|
97 | 93 |
this.queue = queue; |
98 | 94 |
} |
99 | 95 |
|
... | ... | |
106 | 102 |
this.oafToIndexRecordFactory = oafToIndexRecordFactory; |
107 | 103 |
} |
108 | 104 |
|
109 |
private class IsNotIndexed implements Predicate<String> { |
|
110 |
|
|
111 |
private final CloudIndexClient idxClient; |
|
112 |
|
|
113 |
private final SAXReader reader = new SAXReader(); |
|
114 |
|
|
115 |
public IsNotIndexed(final CloudIndexClient idxClient) { |
|
116 |
this.idxClient = idxClient; |
|
117 |
} |
|
118 |
|
|
119 |
@Override |
|
120 |
public boolean apply(final String s) { |
|
121 |
try { |
|
122 |
final String id = reader.read(new StringReader(s)).valueOf("//*[local-name() = 'objIdentifier']"); |
|
123 |
return !idxClient.isRecordIndexed(id); |
|
124 |
} catch (Throwable e) { |
|
125 |
log.error("Error searching record: " + s, e); |
|
126 |
throw new RuntimeException(e); |
|
127 |
} |
|
128 |
} |
|
129 |
|
|
130 |
} |
|
131 |
|
|
132 |
private class CreateSolrDocument implements Function<String, SolrInputDocument> { |
|
133 |
|
|
134 |
private final CloudIndexClient idxClient; |
|
135 |
private final String indexDsId; |
|
136 |
private final UnaryFunction<String, String> toIndexRecord; |
|
137 |
|
|
138 |
public CreateSolrDocument(final CloudIndexClient idxClient, final String indexDsId, final UnaryFunction<String, String> toIndexRecord) { |
|
139 |
this.idxClient = idxClient; |
|
140 |
this.indexDsId = indexDsId; |
|
141 |
this.toIndexRecord = toIndexRecord; |
|
142 |
} |
|
143 |
|
|
144 |
@Override |
|
145 |
public SolrInputDocument apply(final String s) { |
|
146 |
try { |
|
147 |
return idxClient.prepareSolrDocument(s, indexDsId, toIndexRecord); |
|
148 |
} catch (CloudIndexClientException e) { |
|
149 |
throw new RuntimeException(e); |
|
150 |
} |
|
151 |
} |
|
152 |
|
|
153 |
} |
|
154 |
|
|
155 | 105 |
private String calculateIndexBaseUrl() throws Exception { |
156 | 106 |
final String query = IOUtils.toString(findSolrIndexUrl.getInputStream()); |
157 | 107 |
return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query); |
Also available in: Unified diff
integrated (hopefully) all required changes from dnet40