Revision 56121
Added by Alessia Bardi almost 5 years ago
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/index/FeedMissingClaimsJobNode.java | ||
---|---|---|
9 | 9 |
import com.googlecode.sarasvati.NodeToken; |
10 | 10 |
import eu.dnetlib.data.index.CloudIndexClient; |
11 | 11 |
import eu.dnetlib.data.index.CloudIndexClientFactory; |
12 |
import eu.dnetlib.data.index.CloudIndexClientException; |
|
12 | 13 |
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService; |
13 | 14 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
14 | 15 |
import eu.dnetlib.miscutils.functional.xml.ApplyXslt; |
... | ... | |
32 | 33 |
public class FeedMissingClaimsJobNode extends AsyncJobNode { |
33 | 34 |
|
34 | 35 |
private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class); |
36 |
public static final int BATCH_SIZE = 1000; |
|
35 | 37 |
private RecentResultsQueue queue; |
36 | 38 |
private OafToIndexRecordFactory oafToIndexRecordFactory; |
37 | 39 |
|
... | ... | |
65 | 67 |
for (String record : queue) { |
66 | 68 |
final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']"); |
67 | 69 |
if(log.isDebugEnabled()){ |
68 |
log.debug("Processing record "+count); |
|
70 |
log.debug("Processing record "+count++);
|
|
69 | 71 |
} |
70 | 72 |
if (idxClient.isRecordIndexed(id)) { |
71 | 73 |
toDeleteFromCache.add(id); |
72 | 74 |
} else { |
73 | 75 |
toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt)); |
74 | 76 |
} |
77 |
if(count % BATCH_SIZE == 0) processLists(idxClient, toFeed, toDeleteFromCache); |
|
78 |
|
|
75 | 79 |
} |
80 |
if(!toFeed.isEmpty() || !toDeleteFromCache.isEmpty()) processLists(idxClient, toFeed, toDeleteFromCache); |
|
76 | 81 |
|
77 |
idxClient.feed(toFeed, null); |
|
78 |
queue.remove(toDeleteFromCache); |
|
79 |
log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size())); |
|
80 | 82 |
} catch (Throwable e) { |
81 | 83 |
log.error("Error feeding missing claims", e); |
82 | 84 |
throw new MSROException("Error feeding missing claims: " + e.getMessage(), e); |
... | ... | |
89 | 91 |
return Arc.DEFAULT_ARC; |
90 | 92 |
} |
91 | 93 |
|
94 |
|
|
95 |
|
|
96 |
private void processLists(final CloudIndexClient idxClient, final List<SolrInputDocument> toFeed, final List<String> toDeleteFromCache) throws CloudIndexClientException{ |
|
97 |
idxClient.feed(toFeed, null); |
|
98 |
queue.remove(toDeleteFromCache); |
|
99 |
log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size())); |
|
100 |
toFeed.clear(); |
|
101 |
toDeleteFromCache.clear(); |
|
102 |
} |
|
103 |
|
|
92 | 104 |
public RecentResultsQueue getQueue() { |
93 | 105 |
return queue; |
94 | 106 |
} |
Also available in: Unified diff
add claims in batch instead of all together at the end