Project

General

Profile

1 26600 sandro.lab
package eu.dnetlib.data.hadoop.mapred;
2
3
import java.io.IOException;
4
import java.util.Date;
5
import java.util.Map;
6
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.mapred.Counters;
10
import org.apache.hadoop.mapred.Counters.Counter;
11
import org.apache.hadoop.mapred.Counters.Group;
12 49795 claudio.at
import org.apache.hadoop.mapred.JobClient;
13 26600 sandro.lab
import org.apache.hadoop.mapred.JobStatus;
14
import org.apache.hadoop.mapred.RunningJob;
15
16
import com.google.common.collect.Maps;
17
18
import eu.dnetlib.data.hadoop.action.JobCompletion;
19
import eu.dnetlib.data.hadoop.action.JobMonitor;
20
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
21
22 60355 claudio.at
@Deprecated
23 26600 sandro.lab
public class MapreduceJobMonitor extends JobMonitor {
24
25
	private static final Log log = LogFactory.getLog(MapreduceJobMonitor.class); // NOPMD by marko on 11/24/08 5:02 PM
26
27
	private final RunningJob runningJob;
28
29 49795 claudio.at
	private final JobClient jobClient;
30
31 34703 claudio.at
	public MapreduceJobMonitor(final RunningJob runningJob, final JobCompletion callback) {
32 49795 claudio.at
		this(null, runningJob, callback);
33
	}
34
35
	public MapreduceJobMonitor(final JobClient jobClient, final RunningJob runningJob, final JobCompletion callback) {
36 26600 sandro.lab
		super(callback);
37
		this.runningJob = runningJob;
38 49795 claudio.at
		this.jobClient = jobClient;
39 26600 sandro.lab
	}
40
41
	@Override
42
	public void run() {
43
		try {
44
			log.info("waiting for job completion: " + getRunningJob().getID().getId());
45
46
			int runState = getRunState();
47
			while (!getRunningJob().isComplete()) {
48
				Thread.sleep(monitorSleepTimeSecs * 1000);
49
50
				// if the run state has changed we update the last activity
51 34703 claudio.at
				final int currentState = getRunState();
52 26600 sandro.lab
				if (currentState != runState) {
53
					runState = currentState;
54
					lastActivity = new Date(System.currentTimeMillis());
55
				}
56
			}
57
58
			if (getRunningJob().isSuccessful()) {
59
				log.info("job successful: " + getRunningJob().getID().getId());
60
				getCallback().done(asMap(getRunningJob().getCounters()));
61
			} else {
62 34703 claudio.at
				final String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus();
63 60355 claudio.at
				getCallback().failed(Maps.newHashMap(), msg, new HadoopServiceException(msg));
64 26600 sandro.lab
			}
65 34703 claudio.at
		} catch (final Throwable e) {
66 60355 claudio.at
			getCallback().failed(Maps.newHashMap(), getHadoopId(), e);
67 49795 claudio.at
		} finally {
68
			try {
69
				if (jobClient != null) {
70
					jobClient.close();
71
				}
72
			} catch (IOException e) {
73
				throw new RuntimeException("unable to close jobClient", e);
74
			}
75 26600 sandro.lab
		}
76
	}
77
78
	@Override
79
	public String getHadoopId() {
80
		return String.valueOf(getRunningJob().getID().getId());
81
	}
82
83
	// TODO: many hadoop classes are available under the newer package org.apache.hadoop.mapreduce.Counters,
84
	// but the JobClient and its returned objects are still using the old and deprecated org.apache.hadoop.mapred
85 34703 claudio.at
	protected Map<String, String> asMap(final Counters counters) {
86 26600 sandro.lab
		final Map<String, String> res = Maps.newHashMap();
87
		if (counters != null) {
88 34703 claudio.at
			for (final Group group : counters) {
89
				for (final Counter counter : group) {
90 26600 sandro.lab
					res.put(group.getDisplayName() + "." + counter.getDisplayName(), String.valueOf(counter.getValue()));
91
				}
92
			}
93
		}
94
		return res;
95
	}
96
97
	public RunningJob getRunningJob() {
98
		return runningJob;
99
	}
100
101
	@Override
102
	public String getStatus() {
103
		try {
104
			return JobStatus.getJobRunState(getRunState());
105 34703 claudio.at
		} catch (final IOException e) {
106 26600 sandro.lab
			log.error("error accessing job status", e);
107
			return "UNKNOWN";
108
		}
109
	}
110
111
	private int getRunState() throws IOException {
112
		return getRunningJob().getJobStatus().getRunState();
113
	}
114
115
	@Override
116
	public Date getLastActivity() {
117
		return lastActivity;
118
	}
119
120
	@Override
121
	public Date getStartTime() throws HadoopServiceException {
122
		try {
123
			return new Date(getRunningJob().getJobStatus().getStartTime());
124 34703 claudio.at
		} catch (final IOException e) {
125 26600 sandro.lab
			throw new HadoopServiceException("unable to read job start time", e);
126
		}
127
	}
128
129
	@Override
130
	public String getTrackerUrl() {
131
		return getRunningJob().getTrackingURL();
132
	}
133
134
	@Override
135
	public void kill() {
136
		try {
137
			log.info("killing job: " + getHadoopId());
138
			getRunningJob().killJob();
139 34703 claudio.at
		} catch (final IOException e) {
140 26600 sandro.lab
			log.error("unable to kill job: " + getHadoopId(), e);
141
		}
142
	}
143
144 49795 claudio.at
	public JobClient getJobClient() {
145
		return jobClient;
146
	}
147 26600 sandro.lab
}