Project

General

Profile

1
package eu.dnetlib.data.hadoop.mapred;
2

    
3
import java.io.IOException;
4
import java.util.Date;
5
import java.util.Map;
6

    
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.mapred.Counters;
10
import org.apache.hadoop.mapred.Counters.Counter;
11
import org.apache.hadoop.mapred.Counters.Group;
12
import org.apache.hadoop.mapred.JobClient;
13
import org.apache.hadoop.mapred.JobStatus;
14
import org.apache.hadoop.mapred.RunningJob;
15

    
16
import com.google.common.collect.Maps;
17

    
18
import eu.dnetlib.data.hadoop.action.JobCompletion;
19
import eu.dnetlib.data.hadoop.action.JobMonitor;
20
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
21

    
22
public class MapreduceJobMonitor extends JobMonitor {
23

    
24
	private static final Log log = LogFactory.getLog(MapreduceJobMonitor.class); // NOPMD by marko on 11/24/08 5:02 PM
25

    
26
	private final RunningJob runningJob;
27

    
28
	private final JobClient jobClient;
29

    
30
	public MapreduceJobMonitor(final RunningJob runningJob, final JobCompletion callback) {
31
		this(null, runningJob, callback);
32
	}
33

    
34
	public MapreduceJobMonitor(final JobClient jobClient, final RunningJob runningJob, final JobCompletion callback) {
35
		super(callback);
36
		this.runningJob = runningJob;
37
		this.jobClient = jobClient;
38
	}
39

    
40
	@Override
41
	public void run() {
42
		try {
43
			log.info("waiting for job completion: " + getRunningJob().getID().getId());
44

    
45
			int runState = getRunState();
46
			while (!getRunningJob().isComplete()) {
47
				Thread.sleep(monitorSleepTimeSecs * 1000);
48

    
49
				// if the run state has changed we update the last activity
50
				final int currentState = getRunState();
51
				if (currentState != runState) {
52
					runState = currentState;
53
					lastActivity = new Date(System.currentTimeMillis());
54
				}
55
			}
56

    
57
			if (getRunningJob().isSuccessful()) {
58
				log.info("job successful: " + getRunningJob().getID().getId());
59
				getCallback().done(asMap(getRunningJob().getCounters()));
60
			} else {
61
				final String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus();
62
				getCallback().failed(msg, new HadoopServiceException(msg));
63
			}
64
		} catch (final Throwable e) {
65
			getCallback().failed(getHadoopId(), e);
66
		} finally {
67
			try {
68
				if (jobClient != null) {
69
					jobClient.close();
70
				}
71
			} catch (IOException e) {
72
				throw new RuntimeException("unable to close jobClient", e);
73
			}
74
		}
75
	}
76

    
77
	@Override
78
	public String getHadoopId() {
79
		return String.valueOf(getRunningJob().getID().getId());
80
	}
81

    
82
	// TODO: many hadoop classes are available under the newer package org.apache.hadoop.mapreduce.Counters,
83
	// but the JobClient and its returned objects are still using the old and deprecated org.apache.hadoop.mapred
84
	protected Map<String, String> asMap(final Counters counters) {
85
		final Map<String, String> res = Maps.newHashMap();
86
		if (counters != null) {
87
			for (final Group group : counters) {
88
				for (final Counter counter : group) {
89
					res.put(group.getDisplayName() + "." + counter.getDisplayName(), String.valueOf(counter.getValue()));
90
				}
91
			}
92
		}
93
		return res;
94
	}
95

    
96
	public RunningJob getRunningJob() {
97
		return runningJob;
98
	}
99

    
100
	@Override
101
	public String getStatus() {
102
		try {
103
			return JobStatus.getJobRunState(getRunState());
104
		} catch (final IOException e) {
105
			log.error("error accessing job status", e);
106
			return "UNKNOWN";
107
		}
108
	}
109

    
110
	private int getRunState() throws IOException {
111
		return getRunningJob().getJobStatus().getRunState();
112
	}
113

    
114
	@Override
115
	public Date getLastActivity() {
116
		return lastActivity;
117
	}
118

    
119
	@Override
120
	public Date getStartTime() throws HadoopServiceException {
121
		try {
122
			return new Date(getRunningJob().getJobStatus().getStartTime());
123
		} catch (final IOException e) {
124
			throw new HadoopServiceException("unable to read job start time", e);
125
		}
126
	}
127

    
128
	@Override
129
	public String getTrackerUrl() {
130
		return getRunningJob().getTrackingURL();
131
	}
132

    
133
	@Override
134
	public void kill() {
135
		try {
136
			log.info("killing job: " + getHadoopId());
137
			getRunningJob().killJob();
138
		} catch (final IOException e) {
139
			log.error("unable to kill job: " + getHadoopId(), e);
140
		}
141
	}
142

    
143
	public JobClient getJobClient() {
144
		return jobClient;
145
	}
146
}
(2-2/2)