Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.index;
2

    
3
import java.io.StringReader;
4
import java.util.ArrayList;
5
import java.util.List;
6
import javax.annotation.Resource;
7

    
8
import com.googlecode.sarasvati.Arc;
9
import com.googlecode.sarasvati.NodeToken;
10
import eu.dnetlib.data.index.CloudIndexClient;
11
import eu.dnetlib.data.index.CloudIndexClientFactory;
12
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
13
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
14
import eu.dnetlib.miscutils.functional.xml.ApplyXslt;
15
import eu.dnetlib.msro.rmi.MSROException;
16
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
17
import eu.dnetlib.openaire.directindex.api.RecentResultsQueue;
18
import eu.dnetlib.openaire.directindex.utils.OafToIndexRecordFactory;
19

    
20
import org.apache.commons.io.IOUtils;
21
import org.apache.commons.logging.Log;
22
import org.apache.commons.logging.LogFactory;
23
import org.apache.solr.common.SolrInputDocument;
24
import org.dom4j.io.SAXReader;
25
import org.springframework.beans.factory.annotation.Required;
26
import org.springframework.beans.factory.annotation.Value;
27
import org.springframework.core.io.ClassPathResource;
28

    
29
/**
30
 * Created by michele on 15/12/15.
31
 */
32
public class FeedMissingClaimsJobNode extends AsyncJobNode {
33

    
34
	private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class);
35
	private RecentResultsQueue queue;
36
	private OafToIndexRecordFactory oafToIndexRecordFactory;
37

    
38
	@Resource
39
	private UniqueServiceLocator serviceLocator;
40

    
41
	@Value(value = "${openaireplus.msro.api.findSolrIndexUrl.xquery}")
42
	private ClassPathResource findSolrIndexUrl;
43

    
44
	@Override
45
	protected String execute(final NodeToken nodeToken) throws Exception {
46

    
47
		final String format =
48
				nodeToken.getEnv().hasAttribute("format") ? nodeToken.getEnv().getAttribute("format") : nodeToken.getFullEnv().getAttribute("format");
49
		final String coll = format + "-index-openaire";
50
		final String indexDsId = nodeToken.getEnv().getAttribute("index_id");
51
		final String baseUrl = calculateIndexBaseUrl();
52

    
53
		CloudIndexClient idxClient = null;
54

    
55
		try {
56
			final List<SolrInputDocument> toFeed = new ArrayList<SolrInputDocument>();
57
			final List<String> toDeleteFromCache = new ArrayList<String>();
58

    
59
			final SAXReader reader = new SAXReader();
60
			final ApplyXslt xslt = oafToIndexRecordFactory.newTransformer(format);
61

    
62
			idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
63

    
64
			for (String record : queue) {
65
				final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']");
66
				if (idxClient.isRecordIndexed(id)) {
67
					toDeleteFromCache.add(id);
68
				} else {
69
					toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt));
70
				}
71
			}
72

    
73
			idxClient.feed(toFeed, null);
74
			queue.remove(toDeleteFromCache);
75

    
76
		} catch (Throwable e) {
77
			log.error("Error feeding missing claims", e);
78
			throw new MSROException("Error feeding missing claims: " + e.getMessage(), e);
79
		} finally {
80
			if (idxClient != null) {
81
				idxClient.close();
82
			}
83
		}
84

    
85
		return Arc.DEFAULT_ARC;
86
	}
87

    
88
	public RecentResultsQueue getQueue() {
89
		return queue;
90
	}
91

    
92
	@Required
93
	public void setQueue(final RecentResultsQueue queue) {
94
		this.queue = queue;
95
	}
96

    
97
	public OafToIndexRecordFactory getOafToIndexRecordFactory() {
98
		return oafToIndexRecordFactory;
99
	}
100

    
101
	@Required
102
	public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) {
103
		this.oafToIndexRecordFactory = oafToIndexRecordFactory;
104
	}
105

    
106
	private String calculateIndexBaseUrl() throws Exception {
107
		final String query = IOUtils.toString(findSolrIndexUrl.getInputStream());
108
		return serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
109
	}
110
}
(3-3/10)