Project

General

Profile

1
package eu.dnetlib.index.actors;
2

    
3
import java.util.concurrent.ExecutorService;
4
import java.util.concurrent.Executors;
5
import java.util.function.Function;
6

    
7
import eu.dnetlib.clients.index.model.document.IndexDocument;
8
import eu.dnetlib.clients.index.utils.IndexFieldUtility;
9
import eu.dnetlib.index.utils.IndexDateUtility;
10
import eu.dnetlib.utils.MetadataReference;
11
import eu.dnetlib.clients.index.utils.ServiceTools;
12
import eu.dnetlib.enabling.tools.DnetStreamSupport;
13
import eu.dnetlib.index.IndexCollection;
14
import eu.dnetlib.index.IndexServerDAO;
15
import eu.dnetlib.index.IndexServerDAOMap;
16
import eu.dnetlib.cql.CqlTranslator;
17
import eu.dnetlib.index.feed.DocumentFeeder;
18
import eu.dnetlib.index.feed.DocumentMapperFactory;
19
import eu.dnetlib.index.feed.FeedMode;
20
import eu.dnetlib.index.feed.FeedResult;
21
import eu.dnetlib.miscutils.datetime.DateUtils;
22
import eu.dnetlib.rmi.provision.IndexServiceException;
23
import org.apache.commons.logging.Log;
24
import org.apache.commons.logging.LogFactory;
25

    
26
/**
27
 * The Class IndexFeedActorImpl.
28
 */
29
public class IndexFeedActorImpl implements IndexFeedActor {
30

    
31
	/**
32
	 * The Constant log.
33
	 */
34
	private static final Log log = LogFactory.getLog(IndexFeedActorImpl.class); // NOPMD by marko on 11/24/08 5:02 PM
35

    
36
	/**
37
	 * The index server dao map.
38
	 */
39
	private final transient IndexServerDAOMap indexServerDAOMap;
40

    
41
	/**
42
	 * The service tools.
43
	 */
44
	private final transient ServiceTools serviceTools;
45
	/**
46
	 * Thread pool used for the feeding process.
47
	 */
48
	private final transient ExecutorService threadPool = Executors.newCachedThreadPool();
49
	/**
50
	 * CqlTranslator.
51
	 */
52
	private CqlTranslator translator;
53

    
54
	/**
55
	 * Instantiates a new index feed actor impl.
56
	 *
57
	 * @param indexServerDAOMap the index server dao map
58
	 * @param serviceTools      the service tools
59
	 */
60
	public IndexFeedActorImpl(final IndexServerDAOMap indexServerDAOMap, final ServiceTools serviceTools, final CqlTranslator translator) {
61
		super();
62
		this.indexServerDAOMap = indexServerDAOMap;
63
		this.serviceTools = serviceTools;
64
		this.translator = translator;
65
	}
66

    
67
	/**
68
	 * {@inheritDoc}
69
	 */
70
	@Override
71
	public void feedIndex(final String dsId,
72
			final FeedMode feedMode,
73
			final Iterable<String> docIterator,
74
			final ResultsetKeepAliveCallback startCallback,
75
			final BlackboardActorCallback endCallback,
76
			final String backendId, final boolean emptyResult) {
77
		IndexCollection indexCollection = null;
78
		try {
79
			startCallback.unschedule();
80
			final MetadataReference mdref = serviceTools.getMetadataRef(dsId);
81

    
82
			final IndexServerDAO serverDAO = indexServerDAOMap.getIndexServerDAO(backendId);
83
			final DocumentMapperFactory docMapperFactory = serverDAO.getDocumentMapperFactory();
84
            final String version = DateUtils.getParsedDateField(DateUtils.now_ISO8601());
85
            final Function<String, IndexDocument> docMapper = docMapperFactory.getRecordMapper(serverDAO.getSchema(mdref), mdref, dsId, version, emptyResult);
86

    
87
			Iterable<IndexDocument> result = DnetStreamSupport.generateStreamFromIterator(docIterator.iterator()).map(docMapper)::iterator;
88

    
89
			indexCollection = serverDAO.getIndexCollection(mdref);
90
			final FeedResult res = threadPool.submit(new DocumentFeeder(indexCollection, result)).get();
91

    
92
			cleanMarkedDocuments(indexCollection, dsId);
93
			if (feedMode.equals(FeedMode.REFRESH)) {
94
				deleteByVersion(indexCollection, dsId, IndexDateUtility.getParsedDateField(version));
95
			}
96

    
97
			indexCollection.commit();
98
			indexCollection.shutdown();
99

    
100
			log.info("FeedResult: " + res.setTimeElapsed(System.currentTimeMillis()));
101
			endCallback.setJobDone();
102
		} catch (final Exception e) {
103
			endCallback.setJobFailed(e);
104
			log.error("feed index job failed", e);
105
		} finally {
106
			if (indexCollection != null) {
107
				indexCollection.shutdown();
108
			}
109
		}
110

    
111
	}
112

    
113
	/**
114
	 * mdFormatVersion.
115
	 *
116
	 * @param indexCollection the server dao
117
	 * @param dsId            the ds id
118
	 * @throws IndexServiceException the index service exception
119
	 */
120
	private void deleteByVersion(final IndexCollection indexCollection, final String dsId, final String version) throws IndexServiceException {
121
        final String query = String.format("%s:[* TO \"%s\"}", IndexFieldUtility.DS_VERSION, version);
122
        indexCollection.deleteByQuery(query, dsId);
123
	}
124

    
125
	/**
126
	 * method delete documents where IndexMap.DELETE_DOCUMENT field is true
127
	 *
128
	 * @param indexCollection the server dao
129
	 * @param dsId            the ds id
130
	 * @throws IndexServiceException the index service exception
131
	 */
132
	public void cleanMarkedDocuments(final IndexCollection indexCollection, final String dsId) throws IndexServiceException {
133
		final String query = IndexFieldUtility.DELETE_DOCUMENT + ":true ";
134
		indexCollection.deleteByQuery(query, dsId);
135
	}
136

    
137
}
(6-6/8)