Revision 60355
Added by Claudio Atzori over 3 years ago
OozieJobMonitor.java | ||
---|---|---|
53 | 53 |
log.info("waiting for oozie job completion: " + getHadoopId()); |
54 | 54 |
|
55 | 55 |
Status status = Status.PREP; |
56 |
while (status.equals(Status.PREP) || status.equals(Status.RUNNING)) { |
|
56 |
while (status.equals(Status.PREP) || status.equals(Status.RUNNING) || status.equals(Status.SUSPENDED)) {
|
|
57 | 57 |
Thread.sleep(monitorSleepTimeSecs * 1000); |
58 | 58 |
|
59 | 59 |
try { |
60 | 60 |
final Status currentStatus = doGetStatus(); |
61 | 61 |
|
62 | 62 |
if (!currentStatus.equals(status)) { |
63 |
log.debug(format("jobId %s status changed from %s to %s", jobId, status.toString(), currentStatus.toString())); |
|
63 | 64 |
status = currentStatus; |
64 | 65 |
lastActivity = new Date(); |
65 | 66 |
} |
... | ... | |
68 | 69 |
} |
69 | 70 |
} |
70 | 71 |
|
71 |
log.debug(format("job %s finihsed with status: %s", jobId, status)); |
|
72 |
log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions)); |
|
73 |
Map<String, String> report = getReport(getOozieClient(), getHadoopId(), workflowActions); |
|
74 |
|
|
75 |
log.debug(format("job %s finished with status: %s", jobId, status)); |
|
72 | 76 |
if (Status.SUCCEEDED.equals(status)) { |
73 |
// TODO set some content to return to the blackboard msg.
|
|
77 |
getCallback().done(report);
|
|
74 | 78 |
|
75 |
log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions)); |
|
76 |
final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions); |
|
77 |
if (report != null) { |
|
78 |
final Map<String, String> map = Maps.newHashMap(); |
|
79 |
report.forEach((k, v) -> map.put(k.toString(), v.toString())); |
|
80 |
log.info("found oozie job report, size: " + map.size()); |
|
81 |
map.entrySet().forEach(e -> log.info(e.getKey() + " - " + e.getValue())); |
|
82 |
|
|
83 |
getCallback().done(map); |
|
84 |
} else { |
|
85 |
log.warn("cannot find oozie job report!"); |
|
86 |
getCallback().done(new HashMap<>()); |
|
87 |
} |
|
88 | 79 |
} else { |
89 | 80 |
// TODO retrieve some failure information from the oozie client. |
90 | 81 |
String msg = format("hadoop job: %s failed with status: %s, oozie log:\n %s\n", getHadoopId(), getStatus(), getOozieClient().getJobLog(getHadoopId())); |
91 |
getCallback().failed(msg, new HadoopServiceException(msg)); |
|
82 |
getCallback().failed(report, msg, new HadoopServiceException(msg));
|
|
92 | 83 |
} |
93 | 84 |
} catch (Throwable e) { |
94 |
getCallback().failed(getHadoopId(), e); |
|
85 |
getCallback().failed(Maps.newHashMap(), getHadoopId(), e);
|
|
95 | 86 |
} |
96 | 87 |
} |
97 | 88 |
|
98 | 89 |
/** |
99 | 90 |
* Provides report entries when found for given oozie job identifier. Returns null when report not found. |
100 | 91 |
*/ |
101 |
private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
|
|
102 |
Properties properties = new Properties();
|
|
92 |
private static Map<String, String> getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
|
|
93 |
Map<String, String> map = Maps.newHashMap();
|
|
103 | 94 |
WorkflowJob oozieJob = oozieClient.getJobInfo(oozieJobId); |
104 | 95 |
for (WorkflowAction currentAction : oozieJob.getActions()) { |
105 | 96 |
log.info(String.format("looking for workflow actions to report, current: '%s'", currentAction.getName())); |
... | ... | |
107 | 98 |
log.info(String.format("found workflow action %s", currentAction.getName())); |
108 | 99 |
if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) { |
109 | 100 |
log.info(String.format("looking for sub-workflow actions external id: %s", currentAction.getExternalId())); |
110 |
Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
|
|
101 |
Map<String, String> subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
|
|
111 | 102 |
if (subworkflowProperties != null) { |
112 | 103 |
return subworkflowProperties; |
113 | 104 |
} |
114 | 105 |
} else if (StringUtils.isNotBlank(currentAction.getData())) { |
106 |
Properties properties = new Properties(); |
|
115 | 107 |
properties.load(IOUtils.toInputStream(currentAction.getData())); |
108 |
properties.forEach((k, v) -> map.put(currentAction.getName() + ":" + k.toString(), v.toString())); |
|
109 |
map.entrySet().forEach(e -> log.info(e.getKey() + " - " + e.getValue())); |
|
116 | 110 |
log.info(String.format("found workflow action(%s) properties size %s", currentAction.getName(), properties.values().size())); |
117 | 111 |
} |
118 | 112 |
} else { |
119 | 113 |
log.info(String.format("cannot find workflow action(%s) properties", currentAction.getName())); |
120 | 114 |
} |
121 | 115 |
} |
122 |
log.info(String.format("found workflow (%s) properties size %s", oozieJob.getAppName(), properties.values().size()));
|
|
123 |
return properties;
|
|
116 |
log.info(String.format("found workflow (%s) properties size %s", oozieJob.getAppName(), map.values().size()));
|
|
117 |
return map;
|
|
124 | 118 |
} |
125 | 119 |
|
126 | 120 |
@Override |
Also available in: Unified diff
catch output params from the different oozie workflow actions