Project

General

Profile

1
package eu.dnetlib.index.actors;
2

    
3
import java.io.IOException;
4
import java.util.concurrent.ExecutorService;
5
import java.util.concurrent.Executors;
6
import java.util.function.Function;
7

    
8
import eu.dnetlib.clients.index.model.document.IndexDocument;
9
import eu.dnetlib.clients.index.utils.IndexFieldUtility;
10
import eu.dnetlib.index.utils.IndexDateUtility;
11
import eu.dnetlib.utils.MetadataReference;
12
import eu.dnetlib.clients.index.utils.ServiceTools;
13
import eu.dnetlib.enabling.tools.DnetStreamSupport;
14
import eu.dnetlib.index.IndexCollection;
15
import eu.dnetlib.index.IndexServerDAO;
16
import eu.dnetlib.index.IndexServerDAOMap;
17
import eu.dnetlib.index.cql.CqlTranslator;
18
import eu.dnetlib.index.feed.DocumentFeeder;
19
import eu.dnetlib.index.feed.DocumentMapperFactory;
20
import eu.dnetlib.index.feed.FeedMode;
21
import eu.dnetlib.index.feed.FeedResult;
22
import eu.dnetlib.miscutils.datetime.DateUtils;
23
import eu.dnetlib.rmi.provision.IndexServiceException;
24
import org.apache.commons.logging.Log;
25
import org.apache.commons.logging.LogFactory;
26
import org.z3950.zing.cql.CQLParseException;
27

    
28
/**
29
 * The Class IndexFeedActorImpl.
30
 */
31
public class IndexFeedActorImpl implements IndexFeedActor {
32

    
33
	/**
34
	 * The Constant log.
35
	 */
36
	private static final Log log = LogFactory.getLog(IndexFeedActorImpl.class); // NOPMD by marko on 11/24/08 5:02 PM
37

    
38
	/**
39
	 * The index server dao map.
40
	 */
41
	private final transient IndexServerDAOMap indexServerDAOMap;
42

    
43
	/**
44
	 * The service tools.
45
	 */
46
	private final transient ServiceTools serviceTools;
47
	/**
48
	 * Thread pool used for the feeding process.
49
	 */
50
	private final transient ExecutorService threadPool = Executors.newCachedThreadPool();
51
	/**
52
	 * CqlTranslator.
53
	 */
54
	private CqlTranslator translator;
55

    
56
	/**
57
	 * Instantiates a new index feed actor impl.
58
	 *
59
	 * @param indexServerDAOMap the index server dao map
60
	 * @param serviceTools      the service tools
61
	 */
62
	public IndexFeedActorImpl(final IndexServerDAOMap indexServerDAOMap, final ServiceTools serviceTools, final CqlTranslator translator) {
63
		super();
64
		this.indexServerDAOMap = indexServerDAOMap;
65
		this.serviceTools = serviceTools;
66
		this.translator = translator;
67
	}
68

    
69
	/**
70
	 * {@inheritDoc}
71
	 */
72
	@Override
73
	public void feedIndex(final String dsId,
74
			final FeedMode feedMode,
75
			final Iterable<String> docIterator,
76
			final ResultsetKeepAliveCallback startCallback,
77
			final BlackboardActorCallback endCallback,
78
			final String backendId, final boolean emptyResult) {
79
		IndexCollection indexCollection = null;
80
		try {
81
			startCallback.unschedule();
82
			final MetadataReference mdref = serviceTools.getMetadataRef(dsId);
83

    
84
			final IndexServerDAO serverDAO = indexServerDAOMap.getIndexServerDAO(backendId);
85
			final DocumentMapperFactory docMapperFactory = serverDAO.getDocumentMapperFactory();
86
			final String version = DateUtils.now_ISO8601();
87
			final Function<String, IndexDocument> docMapper = docMapperFactory.getRecordMapper(serverDAO.getSchema(mdref), mdref, dsId, version, emptyResult);
88

    
89
			Iterable<IndexDocument> result = DnetStreamSupport.generateStreamFromIterator(docIterator.iterator()).map(docMapper)::iterator;
90

    
91
			indexCollection = serverDAO.getIndexCollection(mdref);
92
			final FeedResult res = threadPool.submit(new DocumentFeeder(indexCollection, result)).get();
93

    
94
			cleanMarkedDocuments(indexCollection, dsId);
95
			if (feedMode.equals(FeedMode.REFRESH)) {
96
				deleteByVersion(indexCollection, dsId, version);
97
			}
98

    
99
			indexCollection.commit();
100
			indexCollection.shutdown();
101

    
102
			log.info("FeedResult: " + res.setTimeElapsed(System.currentTimeMillis()));
103
			endCallback.setJobDone();
104
		} catch (final Exception e) {
105
			endCallback.setJobFailed(e);
106
			log.error("feed index job failed", e);
107
		} finally {
108
			if (indexCollection != null) {
109
				indexCollection.shutdown();
110
			}
111
		}
112

    
113
	}
114

    
115
	/**
116
	 * mdFormatVersion.
117
	 *
118
	 * @param indexCollection the server dao
119
	 * @param dsId            the ds id
120
	 * @throws IndexServiceException the index service exception
121
	 */
122
	private void deleteByVersion(final IndexCollection indexCollection, final String dsId, final String version) throws IndexServiceException {
123
		final String query = String.format("%s:[* TO %s}", IndexFieldUtility.DS_VERSION, version);
124
		indexCollection.deleteByQuery(query, dsId);
125
	}
126

    
127
	/**
128
	 * method delete documents where IndexMap.DELETE_DOCUMENT field is true
129
	 *
130
	 * @param indexCollection the server dao
131
	 * @param dsId            the ds id
132
	 * @throws IndexServiceException the index service exception
133
	 */
134
	public void cleanMarkedDocuments(final IndexCollection indexCollection, final String dsId) throws IndexServiceException {
135
		final String query = IndexFieldUtility.DELETE_DOCUMENT + ":true ";
136
		indexCollection.deleteByQuery(query, dsId);
137
	}
138

    
139
}
(6-6/8)