Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
2

    
3
import java.io.StringReader;
4
import java.util.ArrayList;
5
import java.util.List;
6
import javax.annotation.Resource;
7

    
8
import com.googlecode.sarasvati.Arc;
9
import com.googlecode.sarasvati.NodeToken;
10
import eu.dnetlib.data.index.CloudIndexClient;
11
import eu.dnetlib.data.index.CloudIndexClientFactory;
12
import eu.dnetlib.data.index.CloudIndexClientException;
13
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
14
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
15
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
16
import eu.dnetlib.msro.rmi.MSROException;
17
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
18
import eu.dnetlib.openaire.directindex.api.RecentResultsQueue;
19
import eu.dnetlib.openaire.directindex.utils.OafToIndexRecordFactory;
20

    
21
import org.apache.commons.io.IOUtils;
22
import org.apache.commons.logging.Log;
23
import org.apache.commons.logging.LogFactory;
24
import org.apache.solr.common.SolrInputDocument;
25
import org.dom4j.io.SAXReader;
26
import org.springframework.beans.factory.annotation.Required;
27
import org.springframework.beans.factory.annotation.Value;
28
import org.springframework.core.io.ClassPathResource;
29

    
30
/**
31
 * Created by michele on 15/12/15.
32
 */
33
public class FeedMissingClaimsJobNode extends AsyncJobNode {
34

    
35
	private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class);
36
	public static final int BATCH_SIZE = 1000;
37
	private RecentResultsQueue queue;
38
	private OafToIndexRecordFactory oafToIndexRecordFactory;
39

    
40
	@Resource
41
	private UniqueServiceLocator serviceLocator;
42

    
43
	@Value(value = "${openaire.api.directindex.findSolrIndexUrl.xquery}")
44
	private ClassPathResource findSolrIndexUrl;
45

    
46
	@Override
47
	protected String execute(final NodeToken nodeToken) throws Exception {
48

    
49
		final String format =
50
				nodeToken.getEnv().hasAttribute("format") ? nodeToken.getEnv().getAttribute("format") : nodeToken.getFullEnv().getAttribute("format");
51
		final String coll = format + "-index-openaire";
52
		final String indexDsId = nodeToken.getEnv().getAttribute("index_id");
53
		final String baseUrl = calculateIndexBaseUrl();
54

    
55
		CloudIndexClient idxClient = null;
56

    
57
		try {
58
			final List<SolrInputDocument> toFeed = new ArrayList<SolrInputDocument>();
59
			final List<String> toDeleteFromCache = new ArrayList<String>();
60

    
61
			final SAXReader reader = new SAXReader();
62
			final ApplyXslt xslt = oafToIndexRecordFactory.newTransformer(format);
63

    
64
			idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
65
			log.info("Starting to feed claims in index collection "+coll);
66
			int count = 0;
67
			for (String record : queue) {
68
				try {
69
					final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']");
70
					count++;
71
					if (log.isDebugEnabled()) {
72
						log.debug("Processing record " + count);
73
					}
74
					if (idxClient.isRecordIndexed(id)) {
75
						toDeleteFromCache.add(id);
76
					} else {
77
						toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt));
78
					}
79
					if (count % BATCH_SIZE == 0) processLists(idxClient, toFeed, toDeleteFromCache);
80
				}catch(CloudIndexClientException cie){
81
					log.error("Error feeding missing claims", cie);
82
					if (idxClient != null) {
83
						idxClient.close();
84
					}
85
					idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
86
					log.info("Got new CloudIndexClient");
87
				}
88
			}
89
			if(!toFeed.isEmpty() || !toDeleteFromCache.isEmpty()) processLists(idxClient, toFeed, toDeleteFromCache);
90
			log.info(String.format("Finished feeding of claims in index collection %s, total: %d", coll, count));
91

    
92
		} catch(Throwable e) {
93
			log.error("Error feeding missing claims", e);
94
			throw new MSROException("Error feeding missing claims: " + e.getMessage(), e);
95
		} finally {
96
			if (idxClient != null) {
97
				idxClient.close();
98
			}
99
			log.info("Closed Solr index client");
100
		}
101
		log.info("Now proceeding to Arc.DEFAULT_ARC");
102
		return Arc.DEFAULT_ARC;
103
	}
104

    
105

    
106

    
107
	private void processLists(final CloudIndexClient idxClient, final List<SolrInputDocument> toFeed, final List<String> toDeleteFromCache) throws CloudIndexClientException{
108
		idxClient.feed(toFeed, null);
109
		queue.remove(toDeleteFromCache);
110
		log.info(String.format("%d claims fed and cache cleaned of %d records", toFeed.size(), toDeleteFromCache.size()));
111
		toFeed.clear();
112
		toDeleteFromCache.clear();
113
		log.info("Cleaned temporary lists");
114
	}
115

    
116
	public RecentResultsQueue getQueue() {
117
		return queue;
118
	}
119

    
120
	@Required
121
	public void setQueue(final RecentResultsQueue queue) {
122
		this.queue = queue;
123
	}
124

    
125
	public OafToIndexRecordFactory getOafToIndexRecordFactory() {
126
		return oafToIndexRecordFactory;
127
	}
128

    
129
	@Required
130
	public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) {
131
		this.oafToIndexRecordFactory = oafToIndexRecordFactory;
132
	}
133

    
134
	private String calculateIndexBaseUrl() throws Exception {
135
		final String query = IOUtils.toString(findSolrIndexUrl.getInputStream());
136
		return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
137
	}
138
}
(3-3/10)