Project

General

Profile

« Previous | Next » 

Revision 47222

added param declaring the oozie job action names to be reported back to the workflow logger

View differences:

modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieJobMonitor.java
1 1
package eu.dnetlib.data.hadoop.oozie;
2 2

  
3
import java.util.Date;
4
import java.util.HashMap;
3
import java.io.IOException;
4
import java.util.*;
5 5

  
6
import com.google.common.collect.Maps;
7
import com.google.common.collect.Sets;
8
import org.apache.commons.io.IOUtils;
9
import org.apache.commons.lang.StringUtils;
6 10
import org.apache.commons.logging.Log;
7 11
import org.apache.commons.logging.LogFactory;
8 12
import org.apache.oozie.client.OozieClient;
9 13
import org.apache.oozie.client.OozieClientException;
14
import org.apache.oozie.client.WorkflowAction;
15
import org.apache.oozie.client.WorkflowJob;
10 16
import org.apache.oozie.client.WorkflowJob.Status;
11 17

  
12 18
import eu.dnetlib.data.hadoop.action.JobCompletion;
......
21 27

  
22 28
	private final String jobId;
23 29

  
24
	public OozieJobMonitor(OozieClient oozieClient, String jobId, JobCompletion callback) {
30
	public static final String ACTION_TYPE_SUBWORKFLOW = "sub-workflow";
31

  
32
	private Set<String> workflowActions = Sets.newHashSet();
33

  
34
	public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback) {
25 35
		super(callback);
26 36
		this.oozieClient = oozieClient;
27 37
		this.jobId = jobId;
28 38
	}
29 39

  
40
	public OozieJobMonitor(final OozieClient oozieClient, String jobId, final JobCompletion callback, final Set<String> workflowActions) {
41
		super(callback);
42
		this.oozieClient = oozieClient;
43
		this.jobId = jobId;
44
		this.workflowActions = workflowActions;
45
	}
46

  
30 47
	@Override
31 48
	public void run() {
32 49
		try {
......
46 63
			log.debug("job " + jobId + " finihsed with status: " + status);
47 64
			if (status == Status.SUCCEEDED) {
48 65
				// TODO set some content to return to the blackboard msg.
49
				getCallback().done(new HashMap<String, String>());
66

  
67
				final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions);
68
				if (report != null) {
69
					final Map<String, String> map = Maps.newHashMap();
70
					report.forEach((k, v) -> map.put(k.toString(), v.toString()));
71
					getCallback().done(map);
72
				} else {
73
					getCallback().done(new HashMap<>());
74
				}
50 75
            } else {
51 76
				// TODO retrieve some failure information from the oozie client.
52 77
				String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus() + ", oozie log:\n "
......
58 83
		}
59 84
	}
60 85

  
86
	/**
87
	 * Provides report entries when found for given oozie job identifier. Returns null when report not found.
88
	 */
89
	private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
90
		WorkflowJob mainIisWfJob = oozieClient.getJobInfo(oozieJobId);
91
		for (WorkflowAction currentAction : mainIisWfJob.getActions()) {
92
			if (workflowActions.contains(currentAction.getName())) {
93
				if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) {
94
					Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
95
					if (subworkflowProperties != null) {
96
						return subworkflowProperties;
97
					}
98
				} else if (StringUtils.isNotBlank(currentAction.getData())) {
99
					Properties properties = new Properties();
100
					properties.load(IOUtils.toInputStream(currentAction.getData()));
101
					return properties;
102
				}
103
			}
104
		}
105

  
106
		return null;
107
	}
108

  
61 109
	@Override
62 110
	public String getHadoopId() {
63 111
		return jobId;
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/SubmitOozieJobAction.java
3 3
import java.util.Map;
4 4
import java.util.Map.Entry;
5 5
import java.util.Properties;
6
import java.util.Set;
6 7

  
8
import com.google.common.base.Splitter;
9
import com.google.common.collect.Sets;
10
import org.apache.commons.lang.StringUtils;
7 11
import org.apache.commons.logging.Log;
8 12
import org.apache.commons.logging.LogFactory;
9 13
import org.apache.hadoop.conf.Configuration;
......
21 25

  
22 26
	private static final Log log = LogFactory.getLog(SubmitOozieJobAction.class); // NOPMD by marko on 11/24/08 5:02 PM
23 27

  
28
	public static final String OOZIE_REPORT_ACTIONS = "oozie.report.actions.csv";
29

  
24 30
	@Override
25 31
	public void submit(final JobCompletion callback, final BlackboardJob bbJob, final String jobName, final JobProfile jobProfile)
26 32
			throws HadoopServiceException {
......
40 46
			final String jobId = newJobId(clusterName, internalId);
41 47

  
42 48
			jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile,
43
					new OozieJobMonitor(oozieClient, internalId, callback)));
49
					new OozieJobMonitor(oozieClient, internalId, callback, getWorkflowActions(bbJob))));
44 50

  
45 51
		} catch (final OozieClientException e) {
46 52
			throw new HadoopServiceException("error executing hadoop job: " + jobName, e);
47 53
		}
48 54
	}
49 55

  
56
	/**
57
	 * Extracts from the blackboard job params the oozie action names as csv and returns them as a set.
58
	 * @param bbJob
59
	 * @return
60
	 */
61
	private Set<String> getWorkflowActions(final BlackboardJob bbJob) {
62
		final String actions = bbJob.getParameters().get(OOZIE_REPORT_ACTIONS);
63
		if (StringUtils.isNotBlank(actions)) {
64
			return Sets.newHashSet(Splitter.on(",").split(actions));
65
		}
66
		return Sets.newHashSet();
67
	}
68

  
50 69
	private Properties prepareJob(final Configuration configuration, final String jobName, final JobProfile jobProfile, final Map<String, String> parameters) {
51 70

  
52 71
		log.info("creating job: " + jobName);

Also available in: Unified diff