Project

General

Profile

« Previous | Next » 

Revision 56121

add claims in batch instead of all together at the end

View differences:

FeedMissingClaimsJobNode.java
9 9
import com.googlecode.sarasvati.NodeToken;
10 10
import eu.dnetlib.data.index.CloudIndexClient;
11 11
import eu.dnetlib.data.index.CloudIndexClientFactory;
12
import eu.dnetlib.data.index.CloudIndexClientException;
12 13
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
13 14
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
14 15
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
......
32 33
public class FeedMissingClaimsJobNode extends AsyncJobNode {
33 34

  
34 35
	private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class);
36
	public static final int BATCH_SIZE = 1000;
35 37
	private RecentResultsQueue queue;
36 38
	private OafToIndexRecordFactory oafToIndexRecordFactory;
37 39

  
......
65 67
			for (String record : queue) {
66 68
				final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']");
67 69
				if(log.isDebugEnabled()){
68
					log.debug("Processing record "+count);
70
					log.debug("Processing record "+count++);
69 71
				}
70 72
				if (idxClient.isRecordIndexed(id)) {
71 73
					toDeleteFromCache.add(id);
72 74
				} else {
73 75
					toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt));
74 76
				}
77
				if(count % BATCH_SIZE == 0) processLists(idxClient, toFeed, toDeleteFromCache);
78

  
75 79
			}
80
			if(!toFeed.isEmpty() || !toDeleteFromCache.isEmpty()) processLists(idxClient, toFeed, toDeleteFromCache);
76 81

  
77
			idxClient.feed(toFeed, null);
78
			queue.remove(toDeleteFromCache);
79
			log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size()));
80 82
		} catch (Throwable e) {
81 83
			log.error("Error feeding missing claims", e);
82 84
			throw new MSROException("Error feeding missing claims: " + e.getMessage(), e);
......
89 91
		return Arc.DEFAULT_ARC;
90 92
	}
91 93

  
94

  
95

  
96
	private void processLists(final CloudIndexClient idxClient, final List<SolrInputDocument> toFeed, final List<String> toDeleteFromCache) throws CloudIndexClientException{
97
		idxClient.feed(toFeed, null);
98
		queue.remove(toDeleteFromCache);
99
		log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size()));
100
		toFeed.clear();
101
		toDeleteFromCache.clear();
102
	}
103

  
92 104
	public RecentResultsQueue getQueue() {
93 105
		return queue;
94 106
	}

Also available in: Unified diff