Revision 47222
Added by Claudio Atzori about 6 years ago
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieJobMonitor.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.hadoop.oozie; |
2 | 2 |
|
3 |
import java.util.Date;
|
|
4 |
import java.util.HashMap;
|
|
3 |
import java.io.IOException;
|
|
4 |
import java.util.*;
|
|
5 | 5 |
|
6 |
import com.google.common.collect.Maps; |
|
7 |
import com.google.common.collect.Sets; |
|
8 |
import org.apache.commons.io.IOUtils; |
|
9 |
import org.apache.commons.lang.StringUtils; |
|
6 | 10 |
import org.apache.commons.logging.Log; |
7 | 11 |
import org.apache.commons.logging.LogFactory; |
8 | 12 |
import org.apache.oozie.client.OozieClient; |
9 | 13 |
import org.apache.oozie.client.OozieClientException; |
14 |
import org.apache.oozie.client.WorkflowAction; |
|
15 |
import org.apache.oozie.client.WorkflowJob; |
|
10 | 16 |
import org.apache.oozie.client.WorkflowJob.Status; |
11 | 17 |
|
12 | 18 |
import eu.dnetlib.data.hadoop.action.JobCompletion; |
... | ... | |
21 | 27 |
|
22 | 28 |
private final String jobId; |
23 | 29 |
|
24 |
public OozieJobMonitor(OozieClient oozieClient, String jobId, JobCompletion callback) { |
|
30 |
public static final String ACTION_TYPE_SUBWORKFLOW = "sub-workflow"; |
|
31 |
|
|
32 |
private Set<String> workflowActions = Sets.newHashSet(); |
|
33 |
|
|
34 |
public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback) { |
|
25 | 35 |
super(callback); |
26 | 36 |
this.oozieClient = oozieClient; |
27 | 37 |
this.jobId = jobId; |
28 | 38 |
} |
29 | 39 |
|
40 |
public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback, final Set<String> workflowActions) { |
|
41 |
super(callback); |
|
42 |
this.oozieClient = oozieClient; |
|
43 |
this.jobId = jobId; |
|
44 |
this.workflowActions = workflowActions; |
|
45 |
} |
|
46 |
|
|
30 | 47 |
@Override |
31 | 48 |
public void run() { |
32 | 49 |
try { |
... | ... | |
46 | 63 |
log.debug("job " + jobId + " finihsed with status: " + status); |
47 | 64 |
if (status == Status.SUCCEEDED) { |
48 | 65 |
// TODO set some content to return to the blackboard msg. |
49 |
getCallback().done(new HashMap<String, String>()); |
|
66 |
|
|
67 |
final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions); |
|
68 |
if (report != null) { |
|
69 |
final Map<String, String> map = Maps.newHashMap(); |
|
70 |
report.forEach((k, v) -> map.put(k.toString(), v.toString())); |
|
71 |
getCallback().done(map); |
|
72 |
} else { |
|
73 |
getCallback().done(new HashMap<>()); |
|
74 |
} |
|
50 | 75 |
} else { |
51 | 76 |
// TODO retrieve some failure information from the oozie client. |
52 | 77 |
String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus() + ", oozie log:\n " |
... | ... | |
58 | 83 |
} |
59 | 84 |
} |
60 | 85 |
|
86 |
/** |
|
87 |
* Provides report entries when found for given oozie job identifier. Returns null when report not found. |
|
88 |
*/ |
|
89 |
private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException { |
|
90 |
WorkflowJob mainIisWfJob = oozieClient.getJobInfo(oozieJobId); |
|
91 |
for (WorkflowAction currentAction : mainIisWfJob.getActions()) { |
|
92 |
if (workflowActions.contains(currentAction.getName())) { |
|
93 |
if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) { |
|
94 |
Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions); |
|
95 |
if (subworkflowProperties != null) { |
|
96 |
return subworkflowProperties; |
|
97 |
} |
|
98 |
} else if (StringUtils.isNotBlank(currentAction.getData())) { |
|
99 |
Properties properties = new Properties(); |
|
100 |
properties.load(IOUtils.toInputStream(currentAction.getData())); |
|
101 |
return properties; |
|
102 |
} |
|
103 |
} |
|
104 |
} |
|
105 |
|
|
106 |
return null; |
|
107 |
} |
|
108 |
|
|
61 | 109 |
@Override |
62 | 110 |
public String getHadoopId() { |
63 | 111 |
return jobId; |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/SubmitOozieJobAction.java | ||
---|---|---|
3 | 3 |
import java.util.Map; |
4 | 4 |
import java.util.Map.Entry; |
5 | 5 |
import java.util.Properties; |
6 |
import java.util.Set; |
|
6 | 7 |
|
8 |
import com.google.common.base.Splitter; |
|
9 |
import com.google.common.collect.Sets; |
|
10 |
import org.apache.commons.lang.StringUtils; |
|
7 | 11 |
import org.apache.commons.logging.Log; |
8 | 12 |
import org.apache.commons.logging.LogFactory; |
9 | 13 |
import org.apache.hadoop.conf.Configuration; |
... | ... | |
21 | 25 |
|
22 | 26 |
private static final Log log = LogFactory.getLog(SubmitOozieJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM |
23 | 27 |
|
28 |
public static final String OOZIE_REPORT_ACTIONS = "oozie.report.actions.csv"; |
|
29 |
|
|
24 | 30 |
@Override |
25 | 31 |
public void submit(final JobCompletion callback, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile) |
26 | 32 |
throws HadoopServiceException { |
... | ... | |
40 | 46 |
final String jobId = newJobId(clusterName, internalId); |
41 | 47 |
|
42 | 48 |
jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile, |
43 |
new OozieJobMonitor(oozieClient, internalId, callback))); |
|
49 |
new OozieJobMonitor(oozieClient, internalId, callback, getWorkflowActions(bbJob))));
|
|
44 | 50 |
|
45 | 51 |
} catch (final OozieClientException e) { |
46 | 52 |
throw new HadoopServiceException("error executing hadoop job: " + jobName, e); |
47 | 53 |
} |
48 | 54 |
} |
49 | 55 |
|
56 |
/** |
|
57 |
* Extracts from the blackboard job params the oozie action names as csv and returns them as a set. |
|
58 |
* @param bbJob |
|
59 |
* @return |
|
60 |
*/ |
|
61 |
private Set<String> getWorkflowActions(final BlackboardJob bbJob) { |
|
62 |
final String actions = bbJob.getParameters().get(OOZIE_REPORT_ACTIONS); |
|
63 |
if (StringUtils.isNotBlank(actions)) { |
|
64 |
return Sets.newHashSet(Splitter.on(",").split(actions)); |
|
65 |
} |
|
66 |
return Sets.newHashSet(); |
|
67 |
} |
|
68 |
|
|
50 | 69 |
private Properties prepareJob(final Configuration configuration, final String jobName, final JobProfile jobProfile, final Map<String, String> parameters) { |
51 | 70 |
|
52 | 71 |
log.info("creating job: " + jobName); |
Also available in: Unified diff
added param declaring the oozie job action names to be reported back to the workflow logger