Project

General

Profile

1
package eu.dnetlib.openaire.directindex.api;
2

    
3
import java.util.Map;
4
import java.util.concurrent.Executors;
5
import java.util.concurrent.ScheduledExecutorService;
6
import java.util.concurrent.TimeUnit;
7
import javax.annotation.Resource;
8

    
9
import eu.dnetlib.data.index.CloudIndexClient;
10
import eu.dnetlib.data.index.CloudIndexClientException;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.springframework.beans.factory.annotation.Value;
14

    
15
public class ResultSubmitterService {
16

    
17
    private static final Log log = LogFactory.getLog(ResultSubmitterService.class);
18

    
19
    @Resource(name = "resultSubmitterClientMap")
20
    private IndexClientMap clientMap;
21

    
22
    /**
23
     * Autocommit feature activation flag
24
     */
25
    @Value(value = "${openaire.api.directindex.autocommit.active}")
26
    private boolean autocommitactive;
27

    
28
    /**
29
     * Autocommit frequency (Seconds)
30
     */
31
    @Value(value = "${openaire.api.directindex.autocommit.frequency}")
32
    private long commitfrquency = 60;
33

    
34
    private ScheduledExecutorService executor;
35

    
36
    public ResultSubmitterService() {
37
        executor = Executors.newSingleThreadScheduledExecutor();
38
        updateCommitSchedule();
39
    }
40

    
41
    private void updateCommitSchedule() {
42
        log.info("updating commit schedule");
43

    
44
        executor.scheduleAtFixedRate(() -> {
45
            if (isAutocommitactive()) {
46
                try {
47
                    for (Map.Entry<IndexDsInfo, CloudIndexClient> entry : clientMap.getClients().entrySet()) {
48
                        final IndexDsInfo i = entry.getKey();
49
                        final CloudIndexClient client = entry.getValue();
50

    
51
                        log.info("performing commit on " + i.getColl());
52
                        try {
53
                            client.commit();
54
                        } catch (CloudIndexClientException e) {
55
                            log.error("error performing commit on " + i.getColl(), e);
56
                        }
57
                    }
58
                } catch (Throwable e) {
59
                    log.error("unable to perform commit", e);
60
                }
61
            }
62
        }, 0, getCommitfrquency(), TimeUnit.SECONDS);
63
    }
64

    
65
    public boolean isAutocommitactive() {
66
        return autocommitactive;
67
    }
68

    
69
    public synchronized void setAutocommitactive(boolean autocommitactive) {
70
        this.autocommitactive = autocommitactive;
71
    }
72

    
73
    public long getCommitfrquency() {
74
        return commitfrquency;
75
    }
76

    
77
    public synchronized void setCommitfrquency(long commitfrquency) {
78
        this.commitfrquency = commitfrquency;
79
    }
80

    
81
}
(8-8/8)