53 |
53 |
log.info("waiting for oozie job completion: " + getHadoopId());
|
54 |
54 |
|
55 |
55 |
Status status = Status.PREP;
|
56 |
|
while (status.equals(Status.PREP) || status.equals(Status.RUNNING)) {
|
|
56 |
while (status.equals(Status.PREP) || status.equals(Status.RUNNING) || status.equals(Status.SUSPENDED)) {
|
57 |
57 |
Thread.sleep(monitorSleepTimeSecs * 1000);
|
58 |
58 |
|
59 |
59 |
try {
|
60 |
60 |
final Status currentStatus = doGetStatus();
|
61 |
61 |
|
62 |
62 |
if (!currentStatus.equals(status)) {
|
|
63 |
log.debug(format("jobId %s status changed from %s to %s", jobId, status.toString(), currentStatus.toString()));
|
63 |
64 |
status = currentStatus;
|
64 |
65 |
lastActivity = new Date();
|
65 |
66 |
}
|
... | ... | |
68 |
69 |
}
|
69 |
70 |
}
|
70 |
71 |
|
71 |
|
log.debug(format("job %s finihsed with status: %s", jobId, status));
|
|
72 |
log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions));
|
|
73 |
Map<String, String> report = getReport(getOozieClient(), getHadoopId(), workflowActions);
|
|
74 |
|
|
75 |
log.debug(format("job %s finished with status: %s", jobId, status));
|
72 |
76 |
if (Status.SUCCEEDED.equals(status)) {
|
73 |
|
// TODO set some content to return to the blackboard msg.
|
|
77 |
getCallback().done(report);
|
74 |
78 |
|
75 |
|
log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions));
|
76 |
|
final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions);
|
77 |
|
if (report != null) {
|
78 |
|
final Map<String, String> map = Maps.newHashMap();
|
79 |
|
report.forEach((k, v) -> map.put(k.toString(), v.toString()));
|
80 |
|
log.info("found oozie job report, size: " + map.size());
|
81 |
|
map.entrySet().forEach(e -> log.info(e.getKey() + " - " + e.getValue()));
|
82 |
|
|
83 |
|
getCallback().done(map);
|
84 |
|
} else {
|
85 |
|
log.warn("cannot find oozie job report!");
|
86 |
|
getCallback().done(new HashMap<>());
|
87 |
|
}
|
88 |
79 |
} else {
|
89 |
80 |
// TODO retrieve some failure information from the oozie client.
|
90 |
81 |
String msg = format("hadoop job: %s failed with status: %s, oozie log:\n %s\n", getHadoopId(), getStatus(), getOozieClient().getJobLog(getHadoopId()));
|
91 |
|
getCallback().failed(msg, new HadoopServiceException(msg));
|
|
82 |
getCallback().failed(report, msg, new HadoopServiceException(msg));
|
92 |
83 |
}
|
93 |
84 |
} catch (Throwable e) {
|
94 |
|
getCallback().failed(getHadoopId(), e);
|
|
85 |
getCallback().failed(Maps.newHashMap(), getHadoopId(), e);
|
95 |
86 |
}
|
96 |
87 |
}
|
97 |
88 |
|
98 |
89 |
/**
|
99 |
90 |
* Provides report entries when found for given oozie job identifier. Returns null when report not found.
|
100 |
91 |
*/
|
101 |
|
private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
|
102 |
|
Properties properties = new Properties();
|
|
92 |
private static Map<String, String> getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
|
|
93 |
Map<String, String> map = Maps.newHashMap();
|
103 |
94 |
WorkflowJob oozieJob = oozieClient.getJobInfo(oozieJobId);
|
104 |
95 |
for (WorkflowAction currentAction : oozieJob.getActions()) {
|
105 |
96 |
log.info(String.format("looking for workflow actions to report, current: '%s'", currentAction.getName()));
|
... | ... | |
107 |
98 |
log.info(String.format("found workflow action %s", currentAction.getName()));
|
108 |
99 |
if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) {
|
109 |
100 |
log.info(String.format("looking for sub-workflow actions external id: %s", currentAction.getExternalId()));
|
110 |
|
Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
|
|
101 |
Map<String, String> subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
|
111 |
102 |
if (subworkflowProperties != null) {
|
112 |
103 |
return subworkflowProperties;
|
113 |
104 |
}
|
114 |
105 |
} else if (StringUtils.isNotBlank(currentAction.getData())) {
|
|
106 |
Properties properties = new Properties();
|
115 |
107 |
properties.load(IOUtils.toInputStream(currentAction.getData()));
|
|
108 |
properties.forEach((k, v) -> map.put(currentAction.getName() + ":" + k.toString(), v.toString()));
|
|
109 |
map.entrySet().forEach(e -> log.info(e.getKey() + " - " + e.getValue()));
|
116 |
110 |
log.info(String.format("found workflow action(%s) properties size %s", currentAction.getName(), properties.values().size()));
|
117 |
111 |
}
|
118 |
112 |
} else {
|
119 |
113 |
log.info(String.format("cannot find workflow action(%s) properties", currentAction.getName()));
|
120 |
114 |
}
|
121 |
115 |
}
|
122 |
|
log.info(String.format("found workflow (%s) properties size %s", oozieJob.getAppName(), properties.values().size()));
|
123 |
|
return properties;
|
|
116 |
log.info(String.format("found workflow (%s) properties size %s", oozieJob.getAppName(), map.values().size()));
|
|
117 |
return map;
|
124 |
118 |
}
|
125 |
119 |
|
126 |
120 |
@Override
|
catch output params from the different oozie workflow actions