Project

General

Profile

« Previous | Next » 

Revision 60355

catch output params from the different oozie workflow actions

View differences:

modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieJobMonitor.java
53 53
			log.info("waiting for oozie job completion: " + getHadoopId());
54 54

  
55 55
			Status status = Status.PREP;
56
			while (status.equals(Status.PREP) || status.equals(Status.RUNNING)) {
56
			while (status.equals(Status.PREP) || status.equals(Status.RUNNING) || status.equals(Status.SUSPENDED)) {
57 57
				Thread.sleep(monitorSleepTimeSecs * 1000);
58 58

  
59 59
				try {
60 60
					final Status currentStatus = doGetStatus();
61 61

  
62 62
					if (!currentStatus.equals(status)) {
63
						log.debug(format("jobId %s status changed from %s to %s", jobId, status.toString(), currentStatus.toString()));
63 64
						status = currentStatus;
64 65
						lastActivity = new Date();
65 66
					}
......
68 69
				}
69 70
			}
70 71

  
71
			log.debug(format("job %s finihsed with status: %s", jobId, status));
72
			log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions));
73
			Map<String, String> report = getReport(getOozieClient(), getHadoopId(), workflowActions);
74

  
75
			log.debug(format("job %s finished with status: %s", jobId, status));
72 76
			if (Status.SUCCEEDED.equals(status)) {
73
				// TODO set some content to return to the blackboard msg.
77
				getCallback().done(report);
74 78

  
75
				log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions));
76
				final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions);
77
				if (report != null) {
78
					final Map<String, String> map = Maps.newHashMap();
79
					report.forEach((k, v) -> map.put(k.toString(), v.toString()));
80
					log.info("found oozie job report, size: " + map.size());
81
					map.entrySet().forEach(e -> log.info(e.getKey() + " - " + e.getValue()));
82

  
83
					getCallback().done(map);
84
				} else {
85
					log.warn("cannot find oozie job report!");
86
					getCallback().done(new HashMap<>());
87
				}
88 79
            } else {
89 80
				// TODO retrieve some failure information from the oozie client.
90 81
				String msg = format("hadoop job: %s failed with status: %s, oozie log:\n %s\n", getHadoopId(), getStatus(), getOozieClient().getJobLog(getHadoopId()));
91
				getCallback().failed(msg, new HadoopServiceException(msg));
82
				getCallback().failed(report, msg, new HadoopServiceException(msg));
92 83
            }
93 84
		} catch (Throwable e) {
94
			getCallback().failed(getHadoopId(), e);
85
			getCallback().failed(Maps.newHashMap(), getHadoopId(), e);
95 86
		}
96 87
	}
97 88

  
98 89
	/**
99 90
	 * Provides report entries when found for given oozie job identifier. Returns null when report not found.
100 91
	 */
101
	private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
102
		Properties properties = new Properties();
92
	private static Map<String, String> getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
93
		Map<String, String> map = Maps.newHashMap();
103 94
		WorkflowJob oozieJob = oozieClient.getJobInfo(oozieJobId);
104 95
		for (WorkflowAction currentAction : oozieJob.getActions()) {
105 96
			log.info(String.format("looking for workflow actions to report, current: '%s'", currentAction.getName()));
......
107 98
				log.info(String.format("found workflow action %s", currentAction.getName()));
108 99
				if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) {
109 100
					log.info(String.format("looking for sub-workflow actions external id: %s", currentAction.getExternalId()));
110
					Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
101
					Map<String, String> subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
111 102
					if (subworkflowProperties != null) {
112 103
						return subworkflowProperties;
113 104
					}
114 105
				} else if (StringUtils.isNotBlank(currentAction.getData())) {
106
					Properties properties = new Properties();
115 107
					properties.load(IOUtils.toInputStream(currentAction.getData()));
108
					properties.forEach((k, v) -> map.put(currentAction.getName() + ":" + k.toString(), v.toString()));
109
					map.entrySet().forEach(e -> log.info(e.getKey() + " - " + e.getValue()));
116 110
					log.info(String.format("found workflow action(%s) properties size %s", currentAction.getName(), properties.values().size()));
117 111
				}
118 112
			} else {
119 113
				log.info(String.format("cannot find workflow action(%s) properties", currentAction.getName()));
120 114
			}
121 115
		}
122
		log.info(String.format("found workflow (%s) properties size %s", oozieJob.getAppName(), properties.values().size()));
123
		return properties;
116
		log.info(String.format("found workflow (%s) properties size %s", oozieJob.getAppName(), map.values().size()));
117
		return map;
124 118
	}
125 119

  
126 120
	@Override
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/mapred/MapreduceJobMonitor.java
19 19
import eu.dnetlib.data.hadoop.action.JobMonitor;
20 20
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
21 21

  
22
@Deprecated
22 23
public class MapreduceJobMonitor extends JobMonitor {
23 24

  
24 25
	private static final Log log = LogFactory.getLog(MapreduceJobMonitor.class); // NOPMD by marko on 11/24/08 5:02 PM
......
59 60
				getCallback().done(asMap(getRunningJob().getCounters()));
60 61
			} else {
61 62
				final String msg = "hadoop job: " + getHadoopId() + " failed with status: " + getStatus();
62
				getCallback().failed(msg, new HadoopServiceException(msg));
63
				getCallback().failed(Maps.newHashMap(), msg, new HadoopServiceException(msg));
63 64
			}
64 65
		} catch (final Throwable e) {
65
			getCallback().failed(getHadoopId(), e);
66
			getCallback().failed(Maps.newHashMap(), getHadoopId(), e);
66 67
		} finally {
67 68
			try {
68 69
				if (jobClient != null) {
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/JobCompletion.java
12 12

  
13 13
	void done(Map<String, String> properties);
14 14

  
15
	void failed(String jobId, Throwable e);
15
	void failed(Map<String, String> properties, String jobId, Throwable e);
16 16
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/AbstractSubmitAction.java
135 135
		return new JobCompletion() {
136 136

  
137 137
			@Override
138
			public void done(final Map<String, String> properties) {
139
				bbJob.getParameters().putAll(properties);
140
				log.info(String.format("%s completed successfully, returning %s output params", jobName, properties.size()));
138
			public void done(final Map<String, String> report) {
139
				bbJob.getParameters().putAll(report);
140
				log.info(String.format("%s completed successfully, returning %s output params", jobName, report.size()));
141 141
				handler.done(bbJob);
142 142
				decrementRunningJobs(jobName);
143 143
			}
144 144

  
145 145
			@Override
146
			public void failed(final String msg, final Throwable e) {
147
				log.debug(msg);
146
			public void failed(final Map<String, String> report, final String msg, final Throwable e) {
147
				bbJob.getParameters().putAll(report);
148
				log.error(String.format("%s failed, returning %s output params", jobName, report.size()));
148 149
				handler.failed(bbJob, e);
149 150
				decrementRunningJobs(jobName);
150 151
			}

Also available in: Unified diff