Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.hadoop.mapred;
2
3
import java.io.IOException;
4
import java.util.Date;
5
import java.util.Map;
6
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.mapred.Counters;
10
import org.apache.hadoop.mapred.Counters.Counter;
11
import org.apache.hadoop.mapred.Counters.Group;
12 49795 claudio.at
import org.apache.hadoop.mapred.JobClient;
13 26600 sandro.lab
import org.apache.hadoop.mapred.JobStatus;
14
import org.apache.hadoop.mapred.RunningJob;
15
16
import com.google.common.collect.Maps;
17
18
import eu.dnetlib.data.hadoop.action.JobCompletion;
19
import eu.dnetlib.data.hadoop.action.JobMonitor;
20
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
21
22
public class MapreduceJobMonitor extends JobMonitor {
23
24
	private static final Log log = LogFactory.getLog(MapreduceJobMonitor.class); // NOPMD by marko on 11/24/08 5:02 PM
25
26
	private final RunningJob runningJob;
27
28 49795 claudio.at
	private final JobClient jobClient;
29
30 34703 claudio.at
	public MapreduceJobMonitor(final RunningJob runningJob, final JobCompletion callback) {
31 49795 claudio.at
		this(null, runningJob, callback);
32
	}
33
34
	public MapreduceJobMonitor(final JobClient jobClient, final RunningJob runningJob, final JobCompletion callback) {
35 26600 sandro.lab
		super(callback);
36
		this.runningJob = runningJob;
37 49795 claudio.at
		this.jobClient = jobClient;
38 26600 sandro.lab
	}
39
40
	@Override
41
	public void run() {
42
		try {
43
			log.info("waiting for job completion: " + getRunningJob().getID().getId());
44
45
			int runState = getRunState();
46
			while (!getRunningJob().isComplete()) {
47
				Thread.sleep(monitorSleepTimeSecs * 1000);
48
49
				// if the run state has changed we update the last activity
50 34703 claudio.at
				final int currentState = getRunState();
51 26600 sandro.lab
				if (currentState != runState) {
52
					runState = currentState;
53
					lastActivity = new Date(System.currentTimeMillis());
54
				}
55
			}
56
57
			if (getRunningJob().isSuccessful()) {
58
				log.info("job successful: " + getRunningJob().getID().getId());
59
				getCallback().done(asMap(getRunningJob().getCounters()));
60
			} else {
61 34703 claudio.at
				final String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus();
62 26600 sandro.lab
				getCallback().failed(msg, new HadoopServiceException(msg));
63
			}
64 34703 claudio.at
		} catch (final Throwable e) {
65 26600 sandro.lab
			getCallback().failed(getHadoopId(), e);
66 49795 claudio.at
		} finally {
67
			try {
68
				if (jobClient != null) {
69
					jobClient.close();
70
				}
71
			} catch (IOException e) {
72
				throw new RuntimeException("unable to close jobClient", e);
73
			}
74 26600 sandro.lab
		}
75
	}
76
77
	@Override
78
	public String getHadoopId() {
79
		return String.valueOf(getRunningJob().getID().getId());
80
	}
81
82
	// TODO: many hadoop classes are available under the newer package org.apache.hadoop.mapreduce.Counters,
83
	// but the JobClient and its returned objects are still using the old and deprecated org.apache.hadoop.mapred
84 34703 claudio.at
	protected Map<String, String> asMap(final Counters counters) {
85 26600 sandro.lab
		final Map<String, String> res = Maps.newHashMap();
86
		if (counters != null) {
87 34703 claudio.at
			for (final Group group : counters) {
88
				for (final Counter counter : group) {
89 26600 sandro.lab
					res.put(group.getDisplayName() + "." + counter.getDisplayName(), String.valueOf(counter.getValue()));
90
				}
91
			}
92
		}
93
		return res;
94
	}
95
96
	public RunningJob getRunningJob() {
97
		return runningJob;
98
	}
99
100
	@Override
101
	public String getStatus() {
102
		try {
103
			return JobStatus.getJobRunState(getRunState());
104 34703 claudio.at
		} catch (final IOException e) {
105 26600 sandro.lab
			log.error("error accessing job status", e);
106
			return "UNKNOWN";
107
		}
108
	}
109
110
	private int getRunState() throws IOException {
111
		return getRunningJob().getJobStatus().getRunState();
112
	}
113
114
	@Override
115
	public Date getLastActivity() {
116
		return lastActivity;
117
	}
118
119
	@Override
120
	public Date getStartTime() throws HadoopServiceException {
121
		try {
122
			return new Date(getRunningJob().getJobStatus().getStartTime());
123 34703 claudio.at
		} catch (final IOException e) {
124 26600 sandro.lab
			throw new HadoopServiceException("unable to read job start time", e);
125
		}
126
	}
127
128
	@Override
129
	public String getTrackerUrl() {
130
		return getRunningJob().getTrackingURL();
131
	}
132
133
	@Override
134
	public void kill() {
135
		try {
136
			log.info("killing job: " + getHadoopId());
137
			getRunningJob().killJob();
138 34703 claudio.at
		} catch (final IOException e) {
139 26600 sandro.lab
			log.error("unable to kill job: " + getHadoopId(), e);
140
		}
141
	}
142
143 49795 claudio.at
	public JobClient getJobClient() {
144
		return jobClient;
145
	}
146 26600 sandro.lab
}