Project

General

Profile

« Previous | Next » 

Revision 48800

more logging in the oozie job monitor procedure that tries to fetch the job report. More java8 refactorings

View differences:

modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/utils/HadoopUtils.java
1 1
package eu.dnetlib.data.hadoop.utils;
2 2

  
3
import java.util.Map.Entry;
4
import java.util.Set;
5

  
6
import org.apache.hadoop.hbase.HColumnDescriptor;
7
import org.apache.hadoop.hbase.HTableDescriptor;
8

  
9
import com.google.common.base.Function;
10
import com.google.common.base.Predicate;
11

  
12 3
import eu.dnetlib.data.hadoop.HadoopJob;
13
import eu.dnetlib.data.hadoop.config.ClusterName;
14 4
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
15 5
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
16 8

  
17 9
public class HadoopUtils {
18 10

  
19
	public static Function<HTableDescriptor, String> tableName() {
20
		return new Function<HTableDescriptor, String>() {
11
	private static final Log log = LogFactory.getLog(HadoopUtils.class);
21 12

  
22
			@Override
23
			public String apply(final HTableDescriptor d) {
24
				return d.getNameAsString();
13
	public static java.util.function.Function<HadoopJob, HadoopJobDescriptor> asDescriptor() {
14
		return d -> {
15
			try {
16
				return d.asDescriptor();
17
			} catch (HadoopServiceException e) {
18
				log.warn(e);
19
				return null;
25 20
			}
26 21
		};
27 22
	}
28 23

  
29
	public static Function<HColumnDescriptor, String> columnName() {
30
		return new Function<HColumnDescriptor, String>() {
31

  
32
			@Override
33
			public String apply(final HColumnDescriptor d) {
34
				return d.getNameAsString();
35
			}
36
		};
37
	}
38

  
39
	public static Predicate<String> columnPredicate(final Set<String> cols) {
40
		return new HadoopUtils().getSetPredicate(cols);
41
	}
42

  
43
	public SetPredicate getSetPredicate(final Set<String> set) {
44
		return new SetPredicate(set);
45
	}
46

  
47
	class SetPredicate implements Predicate<String> {
48

  
49
		private final Set<String> set;
50

  
51
		public SetPredicate(final Set<String> set) {
52
			this.set = set;
53
		}
54

  
55
		@Override
56
		public boolean apply(final String s) {
57
			return !set.contains(s);
58
		}
59
	}
60

  
61
	public static Function<Entry<String, HadoopJob>, HadoopJobDescriptor> hadoopJobDescriptor() {
62
		return new Function<Entry<String, HadoopJob>, HadoopJobDescriptor>() {
63

  
64
			@Override
65
			public HadoopJobDescriptor apply(final Entry<String, HadoopJob> e) {
66
				try {
67
					return e.getValue().asDescriptor();
68
				} catch (HadoopServiceException e1) {
69
					return null;
70
				}
71
			}
72
		};
73
	}
74

  
75
	public static Predicate<HadoopJob> filterByCluster(final ClusterName clusterName) {
76
		return new Predicate<HadoopJob>() {
77

  
78
			@Override
79
			public boolean apply(final HadoopJob job) {
80
				return job.getClusterName().equals(clusterName);
81
			}
82

  
83
		};
84
	}
85

  
86 24
}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/JobRegistry.java
2 2

  
3 3
import java.util.Date;
4 4
import java.util.List;
5
import java.util.Map;
6 5
import java.util.Map.Entry;
6
import java.util.Objects;
7
import java.util.stream.Collectors;
7 8

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.springframework.beans.factory.annotation.Required;
11

  
12 9
import com.google.common.collect.BiMap;
13 10
import com.google.common.collect.HashBiMap;
14
import com.google.common.collect.Iterables;
15
import com.google.common.collect.Lists;
16
import com.google.common.collect.Maps;
17

  
18 11
import eu.dnetlib.data.hadoop.HadoopJob.Status;
19 12
import eu.dnetlib.data.hadoop.config.ClusterName;
20 13
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor;
21 14
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
22 15
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.springframework.beans.factory.annotation.Required;
23 19

  
24 20
public class JobRegistry {
25 21

  
......
84 80
	}
85 81

  
86 82
	public List<HadoopJobDescriptor> listJobs(ClusterName clusterName) {
87
		Map<String, HadoopJob> filtered = Maps.filterValues(jobs, HadoopUtils.filterByCluster(clusterName));
88
		return Lists.newArrayList(Iterables.transform(filtered.entrySet(), HadoopUtils.hadoopJobDescriptor()));
83
		return jobs.values().stream()
84
				.filter(j -> clusterName.equals(j.getClusterName()))
85
				.map(HadoopUtils.asDescriptor())
86
				.filter(Objects::nonNull)
87
				.collect(Collectors.toList());
89 88
	}
90 89

  
91 90
	@Required
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java
2 2

  
3 3
import java.io.IOException;
4 4
import java.net.URI;
5
import java.util.List;
6
import java.util.Map;
5
import java.util.*;
7 6
import java.util.Map.Entry;
8
import java.util.NavigableMap;
9
import java.util.Set;
7
import java.util.stream.Collectors;
10 8

  
11
import com.google.common.base.Function;
12
import com.google.common.collect.Iterables;
13 9
import com.google.common.collect.Lists;
14 10
import com.google.common.collect.Maps;
15 11
import com.google.common.collect.Sets;
......
20 16
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor;
21 17
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableDescriptor;
22 18
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableRegionInfo;
23
import eu.dnetlib.data.hadoop.utils.HadoopUtils;
24 19
import org.apache.commons.lang.StringUtils;
25 20
import org.apache.commons.logging.Log;
26 21
import org.apache.commons.logging.LogFactory;
......
46 41

  
47 42
	public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException {
48 43
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
49
		return Lists.newArrayList(Iterables.transform(Lists.newArrayList(admin.listTables()), HadoopUtils.tableName()));
44
		return Arrays.asList(admin.listTables())
45
				.stream()
46
				.map(HTableDescriptor::getNameAsString)
47
				.collect(Collectors.toList());
50 48
	}
51 49

  
52 50
	private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException {
......
70 68

  
71 69
		final Set<String> columns = Sets.newHashSet();
72 70

  
73
		for (HColumnDescriptor hColDesc : Lists.newArrayList(desc.getColumnFamilies())) {
71
		for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) {
74 72
			columns.add(hColDesc.getNameAsString());
75 73
		}
76 74

  
......
94 92
	public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
95 93
		final HBaseAdmin admin = getHBaseAdmin(clusterName);
96 94
		final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes());
97

  
98
		return Lists.newArrayList(Iterables.transform(desc.getFamilies(), new Function<HColumnDescriptor, String>() {
99

  
100
			@Override
101
			public String apply(final HColumnDescriptor desc) {
102
				return desc.getNameAsString();
103
			}
104
		}));
95
		return desc.getFamilies().stream()
96
				.map(d -> d.getNameAsString())
97
				.collect(Collectors.toList());
105 98
	}
106 99

  
107 100
	public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException {
......
201 194
			createTable(clusterName, table, columns);
202 195
		} else {
203 196
			final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table));
204
			final Set<String> foundColumns = Sets.newHashSet(Iterables.transform(Lists.newArrayList(desc.getColumnFamilies()), HadoopUtils.columnName()));
205 197

  
198
			final Set<String> foundColumns = desc.getFamilies().stream()
199
					.map(d -> d.getNameAsString())
200
					.collect(Collectors.toCollection(HashSet::new));
201

  
206 202
			log.info("ensuring columns on table " + table + ": " + columns);
207
			final List<String> missingColumns = Lists.newArrayList(Iterables.filter(columns, HadoopUtils.columnPredicate(foundColumns)));
203
			final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
208 204
			if (!missingColumns.isEmpty()) {
209 205

  
210 206
				if (admin.isTableEnabled(table)) {
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieJobMonitor.java
64 64
			if (status == Status.SUCCEEDED) {
65 65
				// TODO set some content to return to the blackboard msg.
66 66

  
67
				log.info(String.format("start looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions));
67 68
				final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions);
68 69
				if (report != null) {
69 70
					final Map<String, String> map = Maps.newHashMap();
70 71
					report.forEach((k, v) -> map.put(k.toString(), v.toString()));
72
					log.info("found oozie job report, size: " + map.size());
71 73
					getCallback().done(map);
72 74
				} else {
75
					log.warn("cannot find oozie job report!");
73 76
					getCallback().done(new HashMap<>());
74 77
				}
75 78
            } else {
......
89 92
	private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException {
90 93
		WorkflowJob oozieJob = oozieClient.getJobInfo(oozieJobId);
91 94
		for (WorkflowAction currentAction : oozieJob.getActions()) {
92
			log.debug(String.format("looking for workflo actions to report, current: '%s'", currentAction.getName()));
95
			log.info(String.format("looking for workflow actions to report, current: '%s'", currentAction.getName()));
93 96
			if (workflowActions.contains(currentAction.getName())) {
97
				log.info(String.format("found workflow action %s", currentAction.getName()));
94 98
				if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) {
99
					log.info(String.format("looking for sub-workflow actions external id: %s", currentAction.getExternalId()));
95 100
					Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions);
96 101
					if (subworkflowProperties != null) {
97 102
						return subworkflowProperties;
......
99 104
				} else if (StringUtils.isNotBlank(currentAction.getData())) {
100 105
					Properties properties = new Properties();
101 106
					properties.load(IOUtils.toInputStream(currentAction.getData()));
107
					log.info(String.format("found workflow action(%s) properties size %s", currentAction.getName(), properties.values().size()));
102 108
					return properties;
103 109
				}
110
			} else {
111
				log.info(String.format("cannot find workflow action(%s) properties", currentAction.getName()));
104 112
			}
105 113
		}
106

  
107 114
		return null;
108 115
	}
109 116

  
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/AbstractSubmitAction.java
137 137
			@Override
138 138
			public void done(final Map<String, String> properties) {
139 139
				bbJob.getParameters().putAll(properties);
140
				log.info(jobName + " completed successfully");
140
				log.info(String.format("%s completed successfully, returning %s output params", jobName, properties.size()));
141 141
				handler.done(bbJob);
142 142
				decrementRunningJobs(jobName);
143 143
			}
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/AbstractHadoopAction.java
57 57

  
58 58
	@Override
59 59
	public void execute(final BlackboardServerHandler handler, final BlackboardJob job) {
60
		executor.execute(new Runnable() {
61

  
62
			@Override
63
			public void run() {
64
				try {
65
					handler.ongoing(job);
66
					executeAsync(handler, job);
67
				} catch (final Throwable e) {
68
					log.error(ExceptionUtils.getFullStackTrace(e));
69
					handler.failed(job, e);
70
				}
60
		executor.execute(() -> {
61
			try {
62
				handler.ongoing(job);
63
				executeAsync(handler, job);
64
			} catch (final Throwable e) {
65
				log.error(ExceptionUtils.getFullStackTrace(e));
66
				handler.failed(job, e);
71 67
			}
72 68
		});
73 69
	}

Also available in: Unified diff