Project

General

Profile

1
package eu.dnetlib.index.actors;
2

    
3
import java.util.concurrent.ExecutorService;
4
import java.util.concurrent.Executors;
5
import java.util.function.Function;
6

    
7
import eu.dnetlib.clients.index.model.document.IndexDocument;
8
import eu.dnetlib.clients.index.utils.IndexFieldUtility;
9
import eu.dnetlib.utils.MetadataReference;
10
import eu.dnetlib.clients.index.utils.ServiceTools;
11
import eu.dnetlib.enabling.tools.DnetStreamSupport;
12
import eu.dnetlib.index.IndexCollection;
13
import eu.dnetlib.index.IndexServerDAO;
14
import eu.dnetlib.index.IndexServerDAOMap;
15
import eu.dnetlib.cql.CqlTranslator;
16
import eu.dnetlib.index.feed.DocumentFeeder;
17
import eu.dnetlib.index.feed.DocumentMapperFactory;
18
import eu.dnetlib.index.feed.FeedMode;
19
import eu.dnetlib.index.feed.FeedResult;
20
import eu.dnetlib.miscutils.datetime.DateUtils;
21
import eu.dnetlib.rmi.provision.IndexServiceException;
22
import org.apache.commons.logging.Log;
23
import org.apache.commons.logging.LogFactory;
24

    
25
/**
26
 * The Class IndexFeedActorImpl.
27
 */
28
public class IndexFeedActorImpl implements IndexFeedActor {
29

    
30
	/**
31
	 * The Constant log.
32
	 */
33
	private static final Log log = LogFactory.getLog(IndexFeedActorImpl.class); // NOPMD by marko on 11/24/08 5:02 PM
34

    
35
	/**
36
	 * The index server dao map.
37
	 */
38
	private final transient IndexServerDAOMap indexServerDAOMap;
39

    
40
	/**
41
	 * The service tools.
42
	 */
43
	private final transient ServiceTools serviceTools;
44
	/**
45
	 * Thread pool used for the feeding process.
46
	 */
47
	private final transient ExecutorService threadPool = Executors.newCachedThreadPool();
48
	/**
49
	 * CqlTranslator.
50
	 */
51
	private CqlTranslator translator;
52

    
53
	/**
54
	 * Instantiates a new index feed actor impl.
55
	 *
56
	 * @param indexServerDAOMap the index server dao map
57
	 * @param serviceTools      the service tools
58
	 */
59
	public IndexFeedActorImpl(final IndexServerDAOMap indexServerDAOMap, final ServiceTools serviceTools, final CqlTranslator translator) {
60
		super();
61
		this.indexServerDAOMap = indexServerDAOMap;
62
		this.serviceTools = serviceTools;
63
		this.translator = translator;
64
	}
65

    
66
	/**
67
	 * {@inheritDoc}
68
	 */
69
	@Override
70
	public void feedIndex(final String dsId,
71
			final FeedMode feedMode,
72
			final Iterable<String> docIterator,
73
			final ResultsetKeepAliveCallback startCallback,
74
			final BlackboardActorCallback endCallback,
75
			final String backendId, final boolean emptyResult) {
76
		IndexCollection indexCollection = null;
77
		try {
78
			startCallback.unschedule();
79
			final MetadataReference mdref = serviceTools.getMetadataRef(dsId);
80

    
81
			final IndexServerDAO serverDAO = indexServerDAOMap.getIndexServerDAO(backendId);
82
			final DocumentMapperFactory docMapperFactory = serverDAO.getDocumentMapperFactory();
83
            final String version = DateUtils.getParsedDateField(DateUtils.now_ISO8601());
84
            final Function<String, IndexDocument> docMapper = docMapperFactory.getRecordMapper(serverDAO.getSchema(mdref), mdref, dsId, version, emptyResult);
85

    
86
			Iterable<IndexDocument> result = DnetStreamSupport.generateStreamFromIterator(docIterator.iterator()).map(docMapper)::iterator;
87

    
88
			indexCollection = serverDAO.getIndexCollection(mdref);
89
			final FeedResult res = threadPool.submit(new DocumentFeeder(indexCollection, result)).get();
90

    
91
			cleanMarkedDocuments(indexCollection, dsId);
92
			if (feedMode.equals(FeedMode.REFRESH)) {
93
				deleteByVersion(indexCollection, dsId, version);
94
			}
95

    
96
			indexCollection.commit();
97
			indexCollection.shutdown();
98

    
99
			log.info("FeedResult: " + res.setTimeElapsed(System.currentTimeMillis()));
100
			endCallback.setJobDone();
101
		} catch (final Exception e) {
102
			endCallback.setJobFailed(e);
103
			log.error("feed index job failed", e);
104
		} finally {
105
			if (indexCollection != null) {
106
				indexCollection.shutdown();
107
			}
108
		}
109

    
110
	}
111

    
112
	/**
113
	 * mdFormatVersion.
114
	 *
115
	 * @param indexCollection the server dao
116
	 * @param dsId            the ds id
117
	 * @throws IndexServiceException the index service exception
118
	 */
119
	private void deleteByVersion(final IndexCollection indexCollection, final String dsId, final String version) throws IndexServiceException {
120
        final String query = String.format("%s:[* TO \"%s\"}", IndexFieldUtility.DS_VERSION, version);
121
        indexCollection.deleteByQuery(query, dsId);
122
	}
123

    
124
	/**
125
	 * method delete documents where IndexMap.DELETE_DOCUMENT field is true
126
	 *
127
	 * @param indexCollection the server dao
128
	 * @param dsId            the ds id
129
	 * @throws IndexServiceException the index service exception
130
	 */
131
	public void cleanMarkedDocuments(final IndexCollection indexCollection, final String dsId) throws IndexServiceException {
132
		final String query = IndexFieldUtility.DELETE_DOCUMENT + ":true ";
133
		indexCollection.deleteByQuery(query, dsId);
134
	}
135

    
136
}
(6-6/8)