Project

General

Profile

1
package eu.dnetlib.index.feed;
2

    
3
import java.util.concurrent.Callable;
4
import java.util.stream.Stream;
5

    
6
import eu.dnetlib.clients.index.model.document.IndexDocument;
7
import eu.dnetlib.clients.index.utils.IndexFieldUtility;
8
import eu.dnetlib.index.IndexCollection;
9
import eu.dnetlib.rmi.provision.IndexServiceException;
10

    
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13

    
14
/**
15
 * The Class DocumentFeeder.
16
 */
17
public class DocumentFeeder implements Callable<FeedResult> {
18

    
19
	/**
20
	 * The Constant log.
21
	 */
22
	private static final Log log = LogFactory.getLog(DocumentFeeder.class);
23

    
24
	/**
25
	 * The index collection.
26
	 */
27
	private IndexCollection indexCollection;
28

    
29
	private Stream<IndexDocument> docStream;
30
	
31
	public DocumentFeeder(final IndexCollection indexCollection, final Stream<IndexDocument> docStream) {
32
		this.indexCollection = indexCollection;
33
		this.docStream = docStream;
34
	}
35

    
36
	/**
37
	 * {@inheritDoc}
38
	 *
39
	 * @see Callable#call()
40
	 */
41
	@Override
42
	public FeedResult call() {
43
		final FeedResult res = new FeedResult(System.currentTimeMillis());
44
		docStream.forEach(doc->{
45
			switch (doc.getStatus()) {
46
			
47
			case OK:
48
				boolean rsp = false;
49
				try {
50
					rsp = indexCollection.add(doc);
51
				} catch (IndexServiceException e) {
52
					log.error(e);
53
				}
54
				if (rsp) {
55
					res.add();
56
				} else {
57
					res.mark();
58
				}
59
				break;
60

    
61
			case MARKED:
62
				res.mark();
63
				log.debug("skipping record: " + doc.getFieldValue(IndexFieldUtility.INDEX_RECORD_ID));
64
				break;
65

    
66
			case ERROR:
67
				res.skip();
68
				log.info("Error on record: " + doc.getFieldValue(IndexFieldUtility.INDEX_RECORD_ID));
69
				break;
70

    
71
			default:
72
				throw new IllegalStateException("unknow document status");
73
			}
74
		});
75
		return res;
76
	}
77
	
78
}
(1-1/4)