Project

General

Profile

1
package eu.dnetlib.validator.service.impls.listeners;
2

    
3
import java.util.List;
4
import java.util.Map;
5
import java.util.concurrent.ExecutorService;
6

    
7
import eu.dnetlib.validator.service.impls.executors.JobWorker;
8
import net.sf.ehcache.CacheManager;
9

    
10
import org.apache.log4j.Logger;
11
import org.springframework.core.task.TaskExecutor;
12
import org.springframework.stereotype.Controller;
13

    
14
import eu.dnetlib.validator.engine.execution.CompletedTask;
15
import eu.dnetlib.validator.engine.execution.JobListener;
16
import eu.dnetlib.validator.service.impls.executors.ThreadExecutorSubmitter;
17
import eu.dnetlib.validator.service.impls.providers.CrisProvider;
18

    
19
@Controller
20
public class CrisListener implements JobListener {
21
	
22
	private static Logger logger = Logger.getLogger(CrisListener.class);
23
	
24
	private int workersFirstPhase = 0;
25
	private int workersFinished = 0;
26
	private int workers;
27
//	private ExecutorService executor;
28
	private TaskExecutor executor;
29

    
30
	private List<JobWorker> submittersForReferential;
31

    
32
//	private List<ThreadExecutorSubmitter> submittersForReferential;
33
	private CrisProvider provider;
34
	private CacheManager cacheManager = null;
35
	private String cacheName = null;
36
	
37
	public void beginCrisReferentialCheck() {
38
		provider.restartResultSets();
39
		for (JobWorker submiter : this.submittersForReferential) {
40
			executor.execute(submiter);
41
		}
42
//		for (ThreadExecutorSubmitter submiter : this.submittersForReferential) {
43
//			executor.submit(submiter);
44
//		}
45
	}
46
	
47
	@Override
48
	public void currentResults(List<CompletedTask> tasks, int jobId,
49
			Object record, Map<String, Object> recordContext, Throwable t) {
50
		// TODO Auto-generated method stub
51
		
52
	}
53
	@Override
54
	public void currentResults(List<CompletedTask> tasks, int jobId,
55
			Object record, Map<String, Object> recordContext) {
56
		// TODO Auto-generated method stub
57
		
58
	}
59
	@Override
60
	public synchronized void finished(int jobId, Map<String, Object> jobContext) {
61
		try { 
62
			workersFinished ++;
63
			if (workersFinished == workersFirstPhase) {
64
				logger.debug("1st phase finished.. Beginning second one..");
65
				this.beginCrisReferentialCheck();
66
			}
67
			if (workersFinished == workers) {
68
				if (cacheName != null) {
69
					cacheManager.removeCache(cacheName);
70
					logger.debug("caches after removal of cache: " + cacheName);
71
				}
72
	//			for (String cache : cacheManager.getCacheNames()) {
73
	//				logger.debug("name: " + cache);
74
	//			}
75
			}
76
		} catch (Exception e) {
77
			logger.error("Error while finalizing successful cris job");
78
		}
79
	}
80
	
81
	@Override
82
	public synchronized void failed(int jobId, Map<String, Object> jobContext,
83
			Throwable t) {
84
		try { 
85
			workersFinished ++;
86
			if (workersFinished == (workers-workersFirstPhase)) {
87
				if (cacheName != null) {
88
					cacheManager.removeCache(cacheName);
89
					logger.debug("caches after removal of cache: " + cacheName);
90
				}
91
	//			for (String cache : cacheManager.getCacheNames()) {
92
	//				logger.debug("name: " + cache);
93
	//			}
94
			}
95

    
96
		} catch (Exception e) {
97
			logger.error("Error while finalizing failed cris job");
98
		}
99
	}
100

    
101
	public void setWorkersFirstPhase(int workersFirstPhase) {
102
		this.workersFirstPhase = workersFirstPhase;
103
	}
104

    
105
	public void setWorkers(int workers) {
106
		this.workers = workers;
107
	}
108

    
109
//	public void setExecutor(ExecutorService executor) {
110
//		this.executor = executor;
111
//	}
112

    
113
//	public void setSubmittersForReferential(
114
//			List<ThreadExecutorSubmitter> submittersForReferential) {
115
//		this.submittersForReferential = submittersForReferential;
116
//	}
117

    
118
	public void setExecutor(TaskExecutor executor) {
119
		this.executor = executor;
120
	}
121

    
122
	public void setSubmittersForReferential(List<JobWorker> submittersForReferential) {
123
		this.submittersForReferential = submittersForReferential;
124
	}
125

    
126
	public void setProvider(CrisProvider provider) {
127
		this.provider = provider;
128
	}
129

    
130
	public void setCacheManager(CacheManager cacheManager) {
131
		this.cacheManager = cacheManager;
132
	}
133
	public void setCacheName(String cacheName) {
134
		this.cacheName = cacheName;
135
	}
136
	
137
}
(2-2/8)