Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.util.Date;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Map.Entry;
7

    
8
import eu.dnetlib.rmi.data.hadoop.ClusterName;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.springframework.beans.factory.annotation.Required;
12

    
13
import com.google.common.collect.BiMap;
14
import com.google.common.collect.HashBiMap;
15
import com.google.common.collect.Iterables;
16
import com.google.common.collect.Lists;
17
import com.google.common.collect.Maps;
18

    
19
import eu.dnetlib.data.hadoop.HadoopJob.Status;
20
import eu.dnetlib.rmi.data.hadoop.HadoopJobDescriptor;
21
import eu.dnetlib.rmi.data.hadoop.HadoopServiceException;
22
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
23

    
24
public class JobRegistry {
25

    
26
	private static final Log log = LogFactory.getLog(JobRegistry.class); // NOPMD by marko on 11/24/08 5:02 PM
27

    
28
	private int maxJobs;
29

    
30
	private final BiMap<String, HadoopJob> jobs = HashBiMap.create();
31

    
32
	public String registerJob(HadoopJob hadoopJob) throws HadoopServiceException {
33

    
34
		if (jobs.containsValue(hadoopJob)) { return jobs.inverse().get(hadoopJob); }
35

    
36
		if (jobs.size() > getMaxJobs()) {
37
			removeOldestProcess();
38
		}
39

    
40
		jobs.put(hadoopJob.getId(), hadoopJob);
41
		log.info("Registered hadoop job " + hadoopJob.getId());
42
		hadoopJob.startMonitor();
43

    
44
		return hadoopJob.getId();
45
	}
46

    
47
	public Status getJobStatus(String id) {
48
		return findJob(id).getStatus();
49
	}
50

    
51
	public HadoopJob findJob(String id) {
52
		return jobs.get(id);
53
	}
54

    
55
	public void unregisterJob(String id) throws HadoopServiceException {
56

    
57
		if (!jobs.containsKey(id)) { throw new HadoopServiceException("unable to unregister job, could not find jobId in registry: " + id); }
58

    
59
		log.info("unregistering job: " + id);
60
		jobs.get(id).getJobMonitor().kill();
61
		jobs.remove(id);
62
	}
63

    
64
	private void removeOldestProcess() throws HadoopServiceException {
65
		Date oldDate = new Date();
66
		String oldId = null;
67

    
68
		for (Entry<String, HadoopJob> e : jobs.entrySet()) {
69
			final HadoopJob hadoopJob = e.getValue();
70

    
71
			if (hadoopJob.isComplete()) {
72
				final Date date = hadoopJob.getLastActivity();
73
				if (date.before(oldDate)) {
74
					oldDate = date;
75
					oldId = e.getKey();
76
				}
77
			}
78
		}
79

    
80
		if (oldId != null) {
81
			unregisterJob(oldId);
82
		}
83

    
84
	}
85

    
86
	public List<HadoopJobDescriptor> listJobs(ClusterName clusterName) {
87
		Map<String, HadoopJob> filtered = Maps.filterValues(jobs, HadoopUtils.filterByCluster(clusterName));
88
		return Lists.newArrayList(Iterables.transform(filtered.entrySet(), HadoopUtils.hadoopJobDescriptor()));
89
	}
90

    
91
	@Required
92
	public void setMaxJobs(final int maxJobs) {
93
		this.maxJobs = maxJobs;
94
	}
95

    
96
	public int getMaxJobs() {
97
		return maxJobs;
98
	}
99

    
100
}
(7-7/7)