Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.index;
2

    
3
import java.io.StringReader;
4
import java.util.ArrayList;
5
import java.util.List;
6
import javax.annotation.Resource;
7

    
8
import com.google.common.base.Function;
9
import eu.dnetlib.data.index.CloudIndexClient;
10
import eu.dnetlib.data.index.CloudIndexClientException;
11
import eu.dnetlib.data.index.CloudIndexClientFactory;
12
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
13
import eu.dnetlib.msro.workflows.graph.Arc;
14
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
15
import eu.dnetlib.msro.workflows.procs.Env;
16
import eu.dnetlib.openaire.api.RecentPublicationsQueue;
17
import eu.dnetlib.rmi.enabling.ISLookUpService;
18
import eu.dnetlib.rmi.manager.MSROException;
19
import org.apache.commons.io.IOUtils;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.solr.common.SolrInputDocument;
23
import org.dom4j.io.SAXReader;
24
import org.springframework.beans.factory.annotation.Required;
25
import org.springframework.beans.factory.annotation.Value;
26
import org.springframework.core.io.ClassPathResource;
27

    
28
/**
29
 * Created by michele on 15/12/15.
30
 */
31
public class FeedMissingClaimsJobNode extends AsyncJobNode {
32

    
33
	private static final Log log = LogFactory.getLog(FeedMissingClaimsJobNode.class);
34
	private RecentPublicationsQueue queue;
35
	private OafToIndexRecordFactory oafToIndexRecordFactory;
36

    
37
	@Resource
38
	private UniqueServiceLocator serviceLocator;
39

    
40
	@Value(value = "${openaireplus.msro.api.findSolrIndexUrl.xquery}")
41
	private ClassPathResource findSolrIndexUrl;
42

    
43
	@Override
44
	protected String execute(final Env env) throws Exception {
45

    
46
		final String format = env.getAttribute("format", String.class);
47
		final String coll = format + "-index-openaire";
48
		final String indexDsId = env.getAttribute("index_id", String.class);
49
		final String baseUrl = calculateIndexBaseUrl();
50

    
51
		CloudIndexClient idxClient = null;
52

    
53
		try {
54
			final List<SolrInputDocument> toFeed = new ArrayList<>();
55
			final List<String> toDeleteFromCache = new ArrayList<>();
56

    
57
			final SAXReader reader = new SAXReader();
58
			final Function<String, String> xslt = (Function<String, String>) oafToIndexRecordFactory.newTransformer(format);
59

    
60
			idxClient = CloudIndexClientFactory.newIndexClient(baseUrl, coll, false);
61

    
62
			for (final String record : this.queue) {
63
				final String id = reader.read(new StringReader(record)).valueOf("//*[local-name() = 'objIdentifier']");
64
				if (idxClient.isRecordIndexed(id)) {
65
					toFeed.add(idxClient.prepareSolrDocument(record, indexDsId, xslt));
66
				} else {
67
					toDeleteFromCache.add(id);
68
				}
69
			}
70

    
71
			idxClient.feed(toFeed, null);
72
			this.queue.remove(toDeleteFromCache);
73

    
74
		} catch (final Throwable e) {
75
			log.error("Error feeding missing claims", e);
76
			throw new MSROException("Error feeding missing claims: " + e.getMessage(), e);
77
		} finally {
78
			if (idxClient != null) {
79
				idxClient.close();
80
			}
81
		}
82

    
83
		return Arc.DEFAULT_ARC;
84
	}
85

    
86
	public RecentPublicationsQueue getQueue() {
87
		return this.queue;
88
	}
89

    
90
	@Required
91
	public void setQueue(final RecentPublicationsQueue queue) {
92
		this.queue = queue;
93
	}
94

    
95
	public OafToIndexRecordFactory getOafToIndexRecordFactory() {
96
		return this.oafToIndexRecordFactory;
97
	}
98

    
99
	@Required
100
	public void setOafToIndexRecordFactory(final OafToIndexRecordFactory oafToIndexRecordFactory) {
101
		this.oafToIndexRecordFactory = oafToIndexRecordFactory;
102
	}
103

    
104
	private String calculateIndexBaseUrl() throws Exception {
105
		final String query = IOUtils.toString(this.findSolrIndexUrl.getInputStream());
106
		return this.serviceLocator.getService(ISLookUpService.class).getResourceProfileByQuery(query);
107
	}
108

    
109
	private class IsNotIndexed implements com.google.common.base.Predicate<String> {
110

    
111
		private final CloudIndexClient idxClient;
112

    
113
		private final SAXReader reader = new SAXReader();
114

    
115
		public IsNotIndexed(final CloudIndexClient idxClient) {
116
			this.idxClient = idxClient;
117
		}
118

    
119
		@Override
120
		public boolean apply(final String s) {
121
			try {
122
				final String id = this.reader.read(new StringReader(s)).valueOf("//*[local-name() = 'objIdentifier']");
123
				return !this.idxClient.isRecordIndexed(id);
124
			} catch (final Throwable e) {
125
				log.error("Error searching record: " + s, e);
126
				throw new RuntimeException(e);
127
			}
128
		}
129

    
130
	}
131

    
132
	private class CreateSolrDocument implements Function<String, SolrInputDocument> {
133

    
134
		private final CloudIndexClient idxClient;
135
		private final String indexDsId;
136
		private final Function<String, String> toIndexRecord;
137

    
138
		public CreateSolrDocument(final CloudIndexClient idxClient, final String indexDsId, final Function<String, String> toIndexRecord) {
139
			this.idxClient = idxClient;
140
			this.indexDsId = indexDsId;
141
			this.toIndexRecord = toIndexRecord;
142
		}
143

    
144
		@Override
145
		public SolrInputDocument apply(final String s) {
146
			try {
147
				return this.idxClient.prepareSolrDocument(s, this.indexDsId, this.toIndexRecord);
148
			} catch (final CloudIndexClientException e) {
149
				throw new RuntimeException(e);
150
			}
151
		}
152

    
153
	}
154
}
(3-3/8)