Project

General

Profile

1
package eu.dnetlib.efg.workflows.nodes.thumbnail;
2

    
3
import java.util.ArrayList;
4
import java.util.List;
5
import java.util.concurrent.ArrayBlockingQueue;
6
import java.util.concurrent.BlockingQueue;
7
import java.util.concurrent.ExecutorService;
8
import java.util.concurrent.Executors;
9
import java.util.concurrent.Future;
10

    
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.springframework.beans.factory.annotation.Autowired;
14

    
15
import eu.dnetlib.data.objectstore.connector.ObjectStore;
16
import eu.dnetlib.data.objectstore.connector.ObjectStoreDao;
17
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
18
import eu.dnetlib.enabling.resultset.listener.ResultSetListener;
19
import eu.dnetlib.msro.workflows.graph.Arc;
20
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
21
import eu.dnetlib.msro.workflows.procs.Env;
22
import eu.dnetlib.msro.workflows.procs.Token;
23
import eu.dnetlib.msro.workflows.util.ProgressProvider;
24
import eu.dnetlib.rmi.data.ObjectStoreFile;
25

    
26
/**
27
 * Created by sandro on 4/8/16.
28
 */
29
public class GenerateThumbnailJobNode extends SimpleJobNode implements ProgressProvider {
30

    
31
	private static final Log log = LogFactory.getLog(GenerateThumbnailJobNode.class);
32

    
33
	public static ObjectStoreFile END_QUEUE = new ObjectStoreFile();
34
	@Autowired
35
	ResultSetClient resultSetClient;
36
	private String nativeObjectStore;
37
	private String storeThumb250;
38
	private String storeThumb96;
39
	private int numberOfThreads = 4;
40
	@Autowired
41
	private ObjectStoreDao objectStoreDao;
42
	private int counter = -1;
43

    
44
	private int total = -1;
45

    
46
	@Override
47
	protected String execute(final Env env) throws Exception {
48

    
49
		final ObjectStore objectStore = this.objectStoreDao.getObjectStore(this.nativeObjectStore);
50

    
51
		this.total = objectStore.getSize();
52

    
53
		final ObjectStore objectStore250 = this.objectStoreDao.getObjectStore(env.getAttribute(this.storeThumb250, String.class));
54

    
55
		final ObjectStore objectStore96 = this.objectStoreDao.getObjectStore(env.getAttribute(this.storeThumb96, String.class));
56

    
57
		final ResultSetListener<ObjectStoreFile> deliver = objectStore.deliver(0L, System.currentTimeMillis());
58

    
59
		final BlockingQueue<ObjectStoreFile> queue = new ArrayBlockingQueue<>(10);
60

    
61
		final ExecutorService executor = Executors.newFixedThreadPool(this.numberOfThreads);
62

    
63
		final List<Future<Boolean>> responses = new ArrayList<>();
64

    
65
		for (int i = 0; i < 4; i++) {
66
			responses.add(executor.submit(new WorkerMap(objectStore, objectStore250, objectStore96, queue)));
67
        }
68

    
69
		this.counter = 0;
70

    
71
		while (deliver.hasNext()) {
72
			this.counter++;
73
			queue.put(deliver.next());
74
		}
75
		queue.put(END_QUEUE);
76

    
77
		for (final Future<Boolean> currentResponse : responses) {
78
			if (currentResponse.get() == false) {
79
				log.error("Some response fail");
80
			}
81

    
82
		}
83
		return Arc.DEFAULT_ARC;
84
	}
85

    
86
	@Override
87
	protected void beforeStart(final Token token) {
88
		token.setProgressProvider(this);
89
	}
90

    
91
	public String getNativeObjectStore() {
92
		return this.nativeObjectStore;
93
	}
94

    
95
	public void setNativeObjectStore(final String nativeObjectStore) {
96
		this.nativeObjectStore = nativeObjectStore;
97
	}
98

    
99
	public int getNumberOfThreads() {
100
		return this.numberOfThreads;
101
	}
102

    
103
	public void setNumberOfThreads(final int numberOfThreads) {
104
		this.numberOfThreads = numberOfThreads;
105
	}
106

    
107
	public String getStoreThumb250() {
108
		return this.storeThumb250;
109
	}
110

    
111
	public void setStoreThumb250(final String storeThumb250) {
112
		this.storeThumb250 = storeThumb250;
113
	}
114

    
115
	public String getStoreThumb96() {
116
		return this.storeThumb96;
117
	}
118

    
119
	public void setStoreThumb96(final String storeThumb96) {
120
		this.storeThumb96 = storeThumb96;
121
	}
122

    
123
	@Override
124
	public String getProgressDescription() {
125
		return this.counter < 0 ? "-" : String.format("%d / %d", this.counter, this.total);
126
	}
127
}
(1-1/3)