Project

General

Profile

« Previous | Next » 

Revision 60355

catch output params from the different oozie workflow actions

View differences:

OozieJobMonitor.java
53 53
			log.info("waiting for oozie job completion: " + getHadoopId());
54 54

  
55 55
			Status status = Status.PREP;
56
			while (status.equals(Status.PREP) || status.equals(Status.RUNNING)) {
56
			while (status.equals(Status.PREP) || status.equals(Status.RUNNING) || status.equals(Status.SUSPENDED)) {
57 57
				Thread.sleep(monitorSleepTimeSecs * 1000);
58 58

  
59 59
				try {
60 60
					final Status currentStatus = doGetStatus();
61 61

  
62 62
					if (!currentStatus.equals(status)) {
63
						log.debug(format("jobId %s status changed from %s to %s", jobId, status.toString(), currentStatus.toString()));
63 64
						status = currentStatus;
64 65
						lastActivity = new Date();
65 66
					}
......
68 69
				}
69 70
			}
70 71

  
71
			log.debug(format("job %s finihsed with status: %s", jobId, status));
72
			log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions));
73
			Map<String, String> report = getReport(getOozieClient(), getHadoopId(), workflowActions);
74

  
75
			log.debug(format("job %s finished with status: %s", jobId, status));
72 76
			if (Status.SUCCEEDED.equals(status)) {
73
				// TODO set some content to return to the blackboard msg.
77
				getCallback().done(report);
74 78

  
75
				log.info(format("looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions));
76
				final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions);
77
				if (report != null) {
78
					final Map<String, String> map = Maps.newHashMap();
79
					report.forEach((k, v) -> map.put(k.toString(), v.toString()));
80
					log.info("found oozie job report, size: " + map.size());
81
					map.entrySet().forEach(e -> log.info(e.getKey() + " - " + e.getValue()));
82

  
83
					getCallback().done(map);
84
				} else {
85
					log.warn("cannot find oozie job report!");
86
					getCallback().done(new HashMap<>());
87
				}
88 79
            } else {
89 80
				// TODO retrieve some failure information from the oozie client.
90 81
				String msg = format("hadoop job: %s failed with status: %s, oozie log:\n %s\n", getHadoopId(), getStatus(), getOozieClient().getJobLog(getHadoopId()));
91
				getCallback().failed(msg, new HadoopServiceException(msg));
82
				getCallback().failed(report, msg, new HadoopServiceException(msg));
92 83
            }
93 84
		} catch (Throwable e) {
94
			getCallback().failed(getHadoopId(), e);
85
			getCallback().failed(Maps.newHashMap(), getHadoopId(), e);
95 86
		}
96 87
	}
97 88

  
98 89
	/**
99 90
	 * Provides report entries when found for given oozie job identifier. Returns null when report not found.
100 91
	 */
101
	private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
102
		Properties properties = new Properties();
92
	private static Map<String, String> getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
93
		Map<String, String> map = Maps.newHashMap();
103 94
		WorkflowJob oozieJob = oozieClient.getJobInfo(oozieJobId);
104 95
		for (WorkflowAction currentAction : oozieJob.getActions()) {
105 96
			log.info(String.format("looking for workflow actions to report, current: '%s'", currentAction.getName()));
......
107 98
				log.info(String.format("found workflow action %s", currentAction.getName()));
108 99
				if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) {
109 100
					log.info(String.format("looking for sub-workflow actions external id: %s", currentAction.getExternalId()));
110
					Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
101
					Map<String, String> subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
111 102
					if (subworkflowProperties != null) {
112 103
						return subworkflowProperties;
113 104
					}
114 105
				} else if (StringUtils.isNotBlank(currentAction.getData())) {
106
					Properties properties = new Properties();
115 107
					properties.load(IOUtils.toInputStream(currentAction.getData()));
108
					properties.forEach((k, v) -> map.put(currentAction.getName() + ":" + k.toString(), v.toString()));
109
					map.entrySet().forEach(e -> log.info(e.getKey() + " - " + e.getValue()));
116 110
					log.info(String.format("found workflow action(%s) properties size %s", currentAction.getName(), properties.values().size()));
117 111
				}
118 112
			} else {
119 113
				log.info(String.format("cannot find workflow action(%s) properties", currentAction.getName()));
120 114
			}
121 115
		}
122
		log.info(String.format("found workflow (%s) properties size %s", oozieJob.getAppName(), properties.values().size()));
123
		return properties;
116
		log.info(String.format("found workflow (%s) properties size %s", oozieJob.getAppName(), map.values().size()));
117
		return map;
124 118
	}
125 119

  
126 120
	@Override

Also available in: Unified diff