Project

General

Profile

1
package eu.dnetlib.index.actors;
2

    
3
import java.io.IOException;
4
import java.util.concurrent.ExecutorService;
5
import java.util.concurrent.Executors;
6
import java.util.function.Function;
7

    
8
import eu.dnetlib.clients.index.model.document.IndexDocument;
9
import eu.dnetlib.clients.index.utils.IndexFieldUtility;
10
import eu.dnetlib.utils.MetadataReference;
11
import eu.dnetlib.clients.index.utils.ServiceTools;
12
import eu.dnetlib.enabling.tools.DnetStreamSupport;
13
import eu.dnetlib.index.IndexCollection;
14
import eu.dnetlib.index.IndexServerDAO;
15
import eu.dnetlib.index.IndexServerDAOMap;
16
import eu.dnetlib.index.cql.CqlTranslator;
17
import eu.dnetlib.index.feed.DocumentFeeder;
18
import eu.dnetlib.index.feed.DocumentMapperFactory;
19
import eu.dnetlib.index.feed.FeedMode;
20
import eu.dnetlib.index.feed.FeedResult;
21
import eu.dnetlib.miscutils.datetime.DateUtils;
22
import eu.dnetlib.rmi.provision.IndexServiceException;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25
import org.z3950.zing.cql.CQLParseException;
26

    
27
/**
28
 * The Class IndexFeedActorImpl.
29
 */
30
public class IndexFeedActorImpl implements IndexFeedActor {
31

    
32
	/**
33
	 * The Constant log.
34
	 */
35
	private static final Log log = LogFactory.getLog(IndexFeedActorImpl.class); // NOPMD by marko on 11/24/08 5:02 PM
36

    
37
	/**
38
	 * The index server dao map.
39
	 */
40
	private final transient IndexServerDAOMap indexServerDAOMap;
41

    
42
	/**
43
	 * The service tools.
44
	 */
45
	private final transient ServiceTools serviceTools;
46
	/**
47
	 * Thread pool used for the feeding process.
48
	 */
49
	private final transient ExecutorService threadPool = Executors.newCachedThreadPool();
50
	/**
51
	 * CqlTranslator.
52
	 */
53
	private CqlTranslator translator;
54

    
55
	/**
56
	 * Instantiates a new index feed actor impl.
57
	 *
58
	 * @param indexServerDAOMap the index server dao map
59
	 * @param serviceTools      the service tools
60
	 */
61
	public IndexFeedActorImpl(final IndexServerDAOMap indexServerDAOMap, final ServiceTools serviceTools, final CqlTranslator translator) {
62
		super();
63
		this.indexServerDAOMap = indexServerDAOMap;
64
		this.serviceTools = serviceTools;
65
		this.translator = translator;
66
	}
67

    
68
	/**
69
	 * {@inheritDoc}
70
	 */
71
	@Override
72
	public void feedIndex(final String dsId,
73
			final FeedMode feedMode,
74
			final Iterable<String> docIterator,
75
			final ResultsetKeepAliveCallback startCallback,
76
			final BlackboardActorCallback endCallback,
77
			final String backendId, final boolean emptyResult) {
78
		IndexCollection indexCollection = null;
79
		try {
80
			startCallback.unschedule();
81
			final MetadataReference mdref = serviceTools.getMetadataRef(dsId);
82

    
83
			final IndexServerDAO serverDAO = indexServerDAOMap.getIndexServerDAO(backendId);
84
			final DocumentMapperFactory docMapperFactory = serverDAO.getDocumentMapperFactory();
85
			final String version = DateUtils.now_ISO8601();
86
			final Function<String, IndexDocument> docMapper = docMapperFactory.getRecordMapper(serverDAO.getSchema(mdref), mdref, dsId, version, emptyResult);
87

    
88
			Iterable<IndexDocument> result = DnetStreamSupport.generateStreamFromIterator(docIterator.iterator()).map(docMapper)::iterator;
89

    
90
			indexCollection = serverDAO.getIndexCollection(mdref);
91
			final FeedResult res = threadPool.submit(new DocumentFeeder(indexCollection, result)).get();
92

    
93
			cleanMarkedDocuments(indexCollection, dsId);
94
			if (feedMode.equals(FeedMode.REFRESH)) {
95
				deleteByVersion(indexCollection, dsId, version);
96
			}
97

    
98
			indexCollection.commit();
99
			indexCollection.shutdown();
100

    
101
			log.info("FeedResult: " + res.setTimeElapsed(System.currentTimeMillis()));
102
			endCallback.setJobDone();
103
		} catch (final Exception e) {
104
			endCallback.setJobFailed(e);
105
			log.error("feed index job failed", e);
106
		} finally {
107
			if (indexCollection != null) {
108
				indexCollection.shutdown();
109
			}
110
		}
111

    
112
	}
113

    
114
	/**
115
	 * mdFormatVersion.
116
	 *
117
	 * @param indexCollection the server dao
118
	 * @param dsId            the ds id
119
	 * @throws IndexServiceException the index service exception
120
	 */
121
	private void deleteByVersion(final IndexCollection indexCollection, final String dsId, final String cqlQuery) throws IndexServiceException {
122
		//final String cqlQuery = IndexFieldUtility.DS_VERSION + " < \"" + IndexDateUtility.getParsedDateField(version) + "\"";
123
		try {
124
			String luceneQuery = translator.getTranslatedQuery(cqlQuery).asLucene();
125
			indexCollection.deleteByQuery(luceneQuery, dsId);
126
		} catch (CQLParseException e) {
127
			throw new IndexServiceException("Cannot parse CQL query into lucene query: " + cqlQuery, e);
128
		} catch (IOException e) {
129
			throw new IndexServiceException("Cannot parse CQL query into lucene query: " + cqlQuery, e);
130
		}
131

    
132
	}
133

    
134
	/**
135
	 * method delete documents where IndexMap.DELETE_DOCUMENT field is true
136
	 *
137
	 * @param indexCollection the server dao
138
	 * @param dsId            the ds id
139
	 * @throws IndexServiceException the index service exception
140
	 */
141
	public void cleanMarkedDocuments(final IndexCollection indexCollection, final String dsId) throws IndexServiceException {
142

    
143
		final String cqlQuery = IndexFieldUtility.DELETE_DOCUMENT + " all true ";
144
		deleteByVersion(indexCollection, dsId, cqlQuery);
145
	}
146

    
147
}
(6-6/8)