Revision 47222
Added by Claudio Atzori almost 7 years ago
OozieJobMonitor.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.hadoop.oozie; |
2 | 2 |
|
3 |
import java.util.Date;
|
|
4 |
import java.util.HashMap;
|
|
3 |
import java.io.IOException;
|
|
4 |
import java.util.*;
|
|
5 | 5 |
|
6 |
import com.google.common.collect.Maps; |
|
7 |
import com.google.common.collect.Sets; |
|
8 |
import org.apache.commons.io.IOUtils; |
|
9 |
import org.apache.commons.lang.StringUtils; |
|
6 | 10 |
import org.apache.commons.logging.Log; |
7 | 11 |
import org.apache.commons.logging.LogFactory; |
8 | 12 |
import org.apache.oozie.client.OozieClient; |
9 | 13 |
import org.apache.oozie.client.OozieClientException; |
14 |
import org.apache.oozie.client.WorkflowAction; |
|
15 |
import org.apache.oozie.client.WorkflowJob; |
|
10 | 16 |
import org.apache.oozie.client.WorkflowJob.Status; |
11 | 17 |
|
12 | 18 |
import eu.dnetlib.data.hadoop.action.JobCompletion; |
... | ... | |
21 | 27 |
|
22 | 28 |
private final String jobId; |
23 | 29 |
|
24 |
public OozieJobMonitor(OozieClient oozieClient, String jobId, JobCompletion callback) { |
|
30 |
public static final String ACTION_TYPE_SUBWORKFLOW = "sub-workflow"; |
|
31 |
|
|
32 |
private Set<String> workflowActions = Sets.newHashSet(); |
|
33 |
|
|
34 |
public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback) { |
|
25 | 35 |
super(callback); |
26 | 36 |
this.oozieClient = oozieClient; |
27 | 37 |
this.jobId = jobId; |
28 | 38 |
} |
29 | 39 |
|
40 |
public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback, final Set<String> workflowActions) { |
|
41 |
super(callback); |
|
42 |
this.oozieClient = oozieClient; |
|
43 |
this.jobId = jobId; |
|
44 |
this.workflowActions = workflowActions; |
|
45 |
} |
|
46 |
|
|
30 | 47 |
@Override |
31 | 48 |
public void run() { |
32 | 49 |
try { |
... | ... | |
46 | 63 |
log.debug("job " + jobId + " finihsed with status: " + status); |
47 | 64 |
if (status == Status.SUCCEEDED) { |
48 | 65 |
// TODO set some content to return to the blackboard msg. |
49 |
getCallback().done(new HashMap<String, String>()); |
|
66 |
|
|
67 |
final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions); |
|
68 |
if (report != null) { |
|
69 |
final Map<String, String> map = Maps.newHashMap(); |
|
70 |
report.forEach((k, v) -> map.put(k.toString(), v.toString())); |
|
71 |
getCallback().done(map); |
|
72 |
} else { |
|
73 |
getCallback().done(new HashMap<>()); |
|
74 |
} |
|
50 | 75 |
} else { |
51 | 76 |
// TODO retrieve some failure information from the oozie client. |
52 | 77 |
String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus() + ", oozie log:\n " |
... | ... | |
58 | 83 |
} |
59 | 84 |
} |
60 | 85 |
|
86 |
/** |
|
87 |
* Provides report entries when found for given oozie job identifier. Returns null when report not found. |
|
88 |
*/ |
|
89 |
private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException { |
|
90 |
WorkflowJob mainIisWfJob = oozieClient.getJobInfo(oozieJobId); |
|
91 |
for (WorkflowAction currentAction : mainIisWfJob.getActions()) { |
|
92 |
if (workflowActions.contains(currentAction.getName())) { |
|
93 |
if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) { |
|
94 |
Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions); |
|
95 |
if (subworkflowProperties != null) { |
|
96 |
return subworkflowProperties; |
|
97 |
} |
|
98 |
} else if (StringUtils.isNotBlank(currentAction.getData())) { |
|
99 |
Properties properties = new Properties(); |
|
100 |
properties.load(IOUtils.toInputStream(currentAction.getData())); |
|
101 |
return properties; |
|
102 |
} |
|
103 |
} |
|
104 |
} |
|
105 |
|
|
106 |
return null; |
|
107 |
} |
|
108 |
|
|
61 | 109 |
@Override |
62 | 110 |
public String getHadoopId() { |
63 | 111 |
return jobId; |
Also available in: Unified diff
added param declaring the oozie job action names to be reported back to the workflow logger