Revision 52122
Added by Claudio Atzori almost 6 years ago
OozieJobMonitor.java | ||
---|---|---|
19 | 19 |
import eu.dnetlib.data.hadoop.action.JobMonitor; |
20 | 20 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
21 | 21 |
|
22 |
import static java.lang.String.format; |
|
23 |
|
|
22 | 24 |
public class OozieJobMonitor extends JobMonitor { |
23 | 25 |
|
24 | 26 |
private static final Log log = LogFactory.getLog(JobMonitor.class); // NOPMD by marko on 11/24/08 5:02 PM |
... | ... | |
31 | 33 |
|
32 | 34 |
private Set<String> workflowActions = Sets.newHashSet(); |
33 | 35 |
|
36 |
@Deprecated |
|
34 | 37 |
public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback) { |
35 | 38 |
super(callback); |
36 | 39 |
this.oozieClient = oozieClient; |
... | ... | |
49 | 52 |
try { |
50 | 53 |
log.info("waiting for oozie job completion: " + getHadoopId()); |
51 | 54 |
|
52 |
Status status = doGetStatus();
|
|
53 |
while (status.equals(Status.RUNNING)) { |
|
55 |
Status status = Status.PREP;
|
|
56 |
while (status.equals(Status.PREP) || status.equals(Status.RUNNING)) {
|
|
54 | 57 |
Thread.sleep(monitorSleepTimeSecs * 1000); |
55 | 58 |
|
56 |
Status currentStatus = doGetStatus(); |
|
57 |
if (!status.equals(currentStatus)) { |
|
58 |
status = currentStatus; |
|
59 |
lastActivity = new Date(System.currentTimeMillis()); |
|
59 |
try { |
|
60 |
final Status currentStatus = doGetStatus(); |
|
61 |
|
|
62 |
if (!currentStatus.equals(status)) { |
|
63 |
status = currentStatus; |
|
64 |
lastActivity = new Date(); |
|
65 |
} |
|
66 |
} catch (Throwable e) { |
|
67 |
log.warn(format("error polling status for job %s", jobId), e); |
|
60 | 68 |
} |
61 | 69 |
} |
62 | 70 |
|
63 |
log.debug("job " + jobId + " finihsed with status: " + status);
|
|
64 |
if (status == Status.SUCCEEDED) {
|
|
71 |
log.debug(format("job %s finihsed with status: %s", jobId, status));
|
|
72 |
if (Status.SUCCEEDED.equals(status)) {
|
|
65 | 73 |
// TODO set some content to return to the blackboard msg. |
66 | 74 |
|
67 |
log.info(String.format("start looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions));
|
|
75 |
log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions));
|
|
68 | 76 |
final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions); |
69 | 77 |
if (report != null) { |
70 | 78 |
final Map<String, String> map = Maps.newHashMap(); |
... | ... | |
77 | 85 |
} |
78 | 86 |
} else { |
79 | 87 |
// TODO retrieve some failure information from the oozie client. |
80 |
String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus() + ", oozie log:\n " |
|
81 |
+ getOozieClient().getJobLog(getHadoopId()) + "\n\n"; |
|
88 |
String msg = format("hadoop job: %s failed with status: %s, oozie log:\n %s\n", getHadoopId(), getStatus(), getOozieClient().getJobLog(getHadoopId())); |
|
82 | 89 |
getCallback().failed(msg, new HadoopServiceException(msg)); |
83 | 90 |
} |
84 | 91 |
} catch (Throwable e) { |
Also available in: Unified diff
oozie job monitor does not propagate failures form the job status polling