Project

General

Profile

1
package eu.dnetlib.openaire.exporter;
2

    
3
import java.util.List;
4
import java.util.concurrent.*;
5
import javax.annotation.PostConstruct;
6
import javax.annotation.PreDestroy;
7

    
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.stereotype.Component;
11

    
12
@Component
13
public class OperationManager {
14

    
15
	private static final Log log = LogFactory.getLog(OperationManager.class);
16

    
17
	private static final long SLEEP_TIME = 1000;
18

    
19
	private static final int Q_SIZE = 100;
20

    
21
	private static final int POOL_SIZE = 5;
22

    
23
	private BlockingQueue<Runnable> ops = new ArrayBlockingQueue<>(Q_SIZE);
24

    
25
	private ExecutorService executor;
26

    
27
	@PostConstruct
28
	public void init() {
29
		executor = getExecutor();
30
	}
31

    
32
	public int dropAll() {
33
		final List<Runnable> lostOperations = executor.shutdownNow();
34
		log.warn(String.format("discarding %s operations", lostOperations.size()));
35
		executor = getExecutor();
36
		return lostOperations.size();
37
	}
38

    
39
	public int getOpSize() {
40
		return ops.size();
41
	}
42

    
43
	public void addOperation(final Runnable op) {
44
		executor.execute(op);
45
	}
46

    
47
	@PreDestroy
48
	public void tearDown() throws InterruptedException {
49
		executor.shutdown();
50
		final boolean done = executor.awaitTermination(SLEEP_TIME, TimeUnit.MILLISECONDS);
51
		log.debug(String.format("All operations were completed so far? %s", done));
52
	}
53

    
54
	// HELPERS
55

    
56
	private ThreadPoolExecutor getExecutor() {
57
		return new ThreadPoolExecutor(POOL_SIZE, POOL_SIZE,0L, TimeUnit.MILLISECONDS, ops);
58
	}
59

    
60
}
(2-2/2)