Project

General

Profile

1
package eu.dnetlib.data.hadoop;
2

    
3
import java.util.Date;
4
import java.util.concurrent.Executor;
5
import java.util.concurrent.Executors;
6

    
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9

    
10
import eu.dnetlib.data.hadoop.action.JobMonitor;
11
import eu.dnetlib.data.hadoop.config.ClusterName;
12
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
13
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
14
import eu.dnetlib.data.hadoop.utils.JobProfile;
15

    
16
public class HadoopJob {
17

    
18
	private static final Log log = LogFactory.getLog(HadoopJob.class); // NOPMD by marko on 11/24/08 5:02 PM
19

    
20
	/**
21
	 * Defines the possible stati of an hadoop job.
22
	 */
23
	public static enum Status {
24
		PREP, RUNNING, SUCCEEDED, KILLED, FAILED, SUSPENDED
25
	}
26

    
27
	private final Executor executor = Executors.newSingleThreadExecutor();
28

    
29
	private final JobMonitor jobMonitor;
30

    
31
	private final JobProfile jobProfile;
32

    
33
	private final ClusterName clusterName;
34

    
35
	private final String id;
36

    
37
	public static HadoopJob newInstance(String id, ClusterName clusterName, JobProfile profile, JobMonitor jobMonitor) {
38
		return new HadoopJob(id, clusterName, profile, jobMonitor);
39
	}
40

    
41
	private HadoopJob(String id, ClusterName clusterName, JobProfile jobProfile, JobMonitor jobMonitor) {
42
		super();
43
		this.id = id;
44
		this.clusterName = clusterName;
45
		this.jobProfile = jobProfile;
46
		this.jobMonitor = jobMonitor;
47
	}
48

    
49
	public void startMonitor() {
50
		log.info("start monitoring for job: " + getId());
51
		executor.execute(jobMonitor);
52
	}
53

    
54
	public String getId() {
55
		return id;
56
	}
57

    
58
	public String getHadoopId() {
59
		return getJobMonitor().getHadoopId();
60
	}
61

    
62
	public JobMonitor getJobMonitor() {
63
		return jobMonitor;
64
	}
65

    
66
	public Status getStatus() {
67
		return Status.valueOf(getJobMonitor().getStatus());
68
	}
69

    
70
	public boolean isComplete() {
71
		Status status = getStatus();
72
		return status.equals(Status.SUCCEEDED) || status.equals(Status.FAILED) || status.equals(Status.KILLED);
73
	}
74

    
75
	public Date getStartTime() throws HadoopServiceException {
76
		return jobMonitor.getStartTime();
77
	}
78

    
79
	public Date getLastActivity() {
80
		return jobMonitor.getLastActivity();
81
	}
82

    
83
	public ClusterName getClusterName() {
84
		return clusterName;
85
	}
86

    
87
	public JobProfile getJobProfile() {
88
		return jobProfile;
89
	}
90

    
91
	public HadoopJobDescriptor asDescriptor() throws HadoopServiceException {
92
		return new HadoopJobDescriptor(getJobProfile().getName(), getJobProfile().getDescription(), getId(), getStatus().toString(), getStartTime().toString(),
93
				getLastActivity().toString(), getJobMonitor().getHadoopId(), getJobMonitor().getTrackerUrl());
94
	}
95

    
96
}
(3-3/7)