1
|
package eu.dnetlib.data.hadoop.mapred;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.Date;
|
5
|
import java.util.Map;
|
6
|
|
7
|
import org.apache.commons.logging.Log;
|
8
|
import org.apache.commons.logging.LogFactory;
|
9
|
import org.apache.hadoop.mapred.Counters;
|
10
|
import org.apache.hadoop.mapred.Counters.Counter;
|
11
|
import org.apache.hadoop.mapred.Counters.Group;
|
12
|
import org.apache.hadoop.mapred.JobClient;
|
13
|
import org.apache.hadoop.mapred.JobStatus;
|
14
|
import org.apache.hadoop.mapred.RunningJob;
|
15
|
|
16
|
import com.google.common.collect.Maps;
|
17
|
|
18
|
import eu.dnetlib.data.hadoop.action.JobCompletion;
|
19
|
import eu.dnetlib.data.hadoop.action.JobMonitor;
|
20
|
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
|
21
|
|
22
|
public class MapreduceJobMonitor extends JobMonitor {
|
23
|
|
24
|
private static final Log log = LogFactory.getLog(MapreduceJobMonitor.class); // NOPMD by marko on 11/24/08 5:02 PM
|
25
|
|
26
|
private final RunningJob runningJob;
|
27
|
|
28
|
private final JobClient jobClient;
|
29
|
|
30
|
public MapreduceJobMonitor(final RunningJob runningJob, final JobCompletion callback) {
|
31
|
this(null, runningJob, callback);
|
32
|
}
|
33
|
|
34
|
public MapreduceJobMonitor(final JobClient jobClient, final RunningJob runningJob, final JobCompletion callback) {
|
35
|
super(callback);
|
36
|
this.runningJob = runningJob;
|
37
|
this.jobClient = jobClient;
|
38
|
}
|
39
|
|
40
|
@Override
|
41
|
public void run() {
|
42
|
try {
|
43
|
log.info("waiting for job completion: " + getRunningJob().getID().getId());
|
44
|
|
45
|
int runState = getRunState();
|
46
|
while (!getRunningJob().isComplete()) {
|
47
|
Thread.sleep(monitorSleepTimeSecs * 1000);
|
48
|
|
49
|
// if the run state has changed we update the last activity
|
50
|
final int currentState = getRunState();
|
51
|
if (currentState != runState) {
|
52
|
runState = currentState;
|
53
|
lastActivity = new Date(System.currentTimeMillis());
|
54
|
}
|
55
|
}
|
56
|
|
57
|
if (getRunningJob().isSuccessful()) {
|
58
|
log.info("job successful: " + getRunningJob().getID().getId());
|
59
|
getCallback().done(asMap(getRunningJob().getCounters()));
|
60
|
} else {
|
61
|
final String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus();
|
62
|
getCallback().failed(msg, new HadoopServiceException(msg));
|
63
|
}
|
64
|
} catch (final Throwable e) {
|
65
|
getCallback().failed(getHadoopId(), e);
|
66
|
} finally {
|
67
|
try {
|
68
|
if (jobClient != null) {
|
69
|
jobClient.close();
|
70
|
}
|
71
|
} catch (IOException e) {
|
72
|
throw new RuntimeException("unable to close jobClient", e);
|
73
|
}
|
74
|
}
|
75
|
}
|
76
|
|
77
|
@Override
|
78
|
public String getHadoopId() {
|
79
|
return String.valueOf(getRunningJob().getID().getId());
|
80
|
}
|
81
|
|
82
|
// TODO: many hadoop classes are available under the newer package org.apache.hadoop.mapreduce.Counters,
|
83
|
// but the JobClient and its returned objects are still using the old and deprecated org.apache.hadoop.mapred
|
84
|
protected Map<String, String> asMap(final Counters counters) {
|
85
|
final Map<String, String> res = Maps.newHashMap();
|
86
|
if (counters != null) {
|
87
|
for (final Group group : counters) {
|
88
|
for (final Counter counter : group) {
|
89
|
res.put(group.getDisplayName() + "." + counter.getDisplayName(), String.valueOf(counter.getValue()));
|
90
|
}
|
91
|
}
|
92
|
}
|
93
|
return res;
|
94
|
}
|
95
|
|
96
|
public RunningJob getRunningJob() {
|
97
|
return runningJob;
|
98
|
}
|
99
|
|
100
|
@Override
|
101
|
public String getStatus() {
|
102
|
try {
|
103
|
return JobStatus.getJobRunState(getRunState());
|
104
|
} catch (final IOException e) {
|
105
|
log.error("error accessing job status", e);
|
106
|
return "UNKNOWN";
|
107
|
}
|
108
|
}
|
109
|
|
110
|
private int getRunState() throws IOException {
|
111
|
return getRunningJob().getJobStatus().getRunState();
|
112
|
}
|
113
|
|
114
|
@Override
|
115
|
public Date getLastActivity() {
|
116
|
return lastActivity;
|
117
|
}
|
118
|
|
119
|
@Override
|
120
|
public Date getStartTime() throws HadoopServiceException {
|
121
|
try {
|
122
|
return new Date(getRunningJob().getJobStatus().getStartTime());
|
123
|
} catch (final IOException e) {
|
124
|
throw new HadoopServiceException("unable to read job start time", e);
|
125
|
}
|
126
|
}
|
127
|
|
128
|
@Override
|
129
|
public String getTrackerUrl() {
|
130
|
return getRunningJob().getTrackingURL();
|
131
|
}
|
132
|
|
133
|
@Override
|
134
|
public void kill() {
|
135
|
try {
|
136
|
log.info("killing job: " + getHadoopId());
|
137
|
getRunningJob().killJob();
|
138
|
} catch (final IOException e) {
|
139
|
log.error("unable to kill job: " + getHadoopId(), e);
|
140
|
}
|
141
|
}
|
142
|
|
143
|
public JobClient getJobClient() {
|
144
|
return jobClient;
|
145
|
}
|
146
|
}
|