Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.util.Date;
4
import java.util.List;
5
import java.util.Map.Entry;
6
import java.util.Objects;
7
import java.util.stream.Collectors;
8

    
9
import com.google.common.collect.BiMap;
10
import com.google.common.collect.HashBiMap;
11
import eu.dnetlib.data.hadoop.HadoopJob.Status;
12
import eu.dnetlib.data.hadoop.config.ClusterName;
13
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
15
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.springframework.beans.factory.annotation.Required;
19

    
20
public class JobRegistry {
21

    
22
	private static final Log log = LogFactory.getLog(JobRegistry.class); // NOPMD by marko on 11/24/08 5:02 PM
23

    
24
	private int maxJobs;
25

    
26
	private final BiMap<String, HadoopJob> jobs = HashBiMap.create();
27

    
28
	public String registerJob(HadoopJob hadoopJob) throws HadoopServiceException {
29

    
30
		if (jobs.containsValue(hadoopJob)) { return jobs.inverse().get(hadoopJob); }
31

    
32
		if (jobs.size() > getMaxJobs()) {
33
			removeOldestProcess();
34
		}
35

    
36
		jobs.put(hadoopJob.getId(), hadoopJob);
37
		log.info("Registered hadoop job " + hadoopJob.getId());
38
		hadoopJob.startMonitor();
39

    
40
		return hadoopJob.getId();
41
	}
42

    
43
	public Status getJobStatus(String id) {
44
		return findJob(id).getStatus();
45
	}
46

    
47
	public HadoopJob findJob(String id) {
48
		return jobs.get(id);
49
	}
50

    
51
	public void unregisterJob(String id) throws HadoopServiceException {
52

    
53
		if (!jobs.containsKey(id)) { throw new HadoopServiceException("unable to unregister job, could not find jobId in registry: " + id); }
54

    
55
		log.info("unregistering job: " + id);
56
		jobs.get(id).getJobMonitor().kill();
57
		jobs.remove(id);
58
	}
59

    
60
	private void removeOldestProcess() throws HadoopServiceException {
61
		Date oldDate = new Date();
62
		String oldId = null;
63

    
64
		for (Entry<String, HadoopJob> e : jobs.entrySet()) {
65
			final HadoopJob hadoopJob = e.getValue();
66

    
67
			if (hadoopJob.isComplete()) {
68
				final Date date = hadoopJob.getLastActivity();
69
				if (date.before(oldDate)) {
70
					oldDate = date;
71
					oldId = e.getKey();
72
				}
73
			}
74
		}
75

    
76
		if (oldId != null) {
77
			unregisterJob(oldId);
78
		}
79

    
80
	}
81

    
82
	public List<HadoopJobDescriptor> listJobs(ClusterName clusterName) {
83
		return jobs.values().stream()
84
				.filter(j -> clusterName.equals(j.getClusterName()))
85
				.map(HadoopUtils.asDescriptor())
86
				.filter(Objects::nonNull)
87
				.collect(Collectors.toList());
88
	}
89

    
90
	@Required
91
	public void setMaxJobs(final int maxJobs) {
92
		this.maxJobs = maxJobs;
93
	}
94

    
95
	public int getMaxJobs() {
96
		return maxJobs;
97
	}
98

    
99
}
(7-7/7)