Revision 48800
Added by Claudio Atzori over 6 years ago
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/utils/HadoopUtils.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.hadoop.utils; |
2 | 2 |
|
3 |
import java.util.Map.Entry; |
|
4 |
import java.util.Set; |
|
5 |
|
|
6 |
import org.apache.hadoop.hbase.HColumnDescriptor; |
|
7 |
import org.apache.hadoop.hbase.HTableDescriptor; |
|
8 |
|
|
9 |
import com.google.common.base.Function; |
|
10 |
import com.google.common.base.Predicate; |
|
11 |
|
|
12 | 3 |
import eu.dnetlib.data.hadoop.HadoopJob; |
13 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
|
14 | 4 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor; |
15 | 5 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
6 |
import org.apache.commons.logging.Log; |
|
7 |
import org.apache.commons.logging.LogFactory; |
|
16 | 8 |
|
17 | 9 |
public class HadoopUtils { |
18 | 10 |
|
19 |
public static Function<HTableDescriptor, String> tableName() { |
|
20 |
return new Function<HTableDescriptor, String>() { |
|
11 |
private static final Log log = LogFactory.getLog(HadoopUtils.class); |
|
21 | 12 |
|
22 |
@Override |
|
23 |
public String apply(final HTableDescriptor d) { |
|
24 |
return d.getNameAsString(); |
|
13 |
public static java.util.function.Function<HadoopJob, HadoopJobDescriptor> asDescriptor() { |
|
14 |
return d -> { |
|
15 |
try { |
|
16 |
return d.asDescriptor(); |
|
17 |
} catch (HadoopServiceException e) { |
|
18 |
log.warn(e); |
|
19 |
return null; |
|
25 | 20 |
} |
26 | 21 |
}; |
27 | 22 |
} |
28 | 23 |
|
29 |
public static Function<HColumnDescriptor, String> columnName() { |
|
30 |
return new Function<HColumnDescriptor, String>() { |
|
31 |
|
|
32 |
@Override |
|
33 |
public String apply(final HColumnDescriptor d) { |
|
34 |
return d.getNameAsString(); |
|
35 |
} |
|
36 |
}; |
|
37 |
} |
|
38 |
|
|
39 |
public static Predicate<String> columnPredicate(final Set<String> cols) { |
|
40 |
return new HadoopUtils().getSetPredicate(cols); |
|
41 |
} |
|
42 |
|
|
43 |
public SetPredicate getSetPredicate(final Set<String> set) { |
|
44 |
return new SetPredicate(set); |
|
45 |
} |
|
46 |
|
|
47 |
class SetPredicate implements Predicate<String> { |
|
48 |
|
|
49 |
private final Set<String> set; |
|
50 |
|
|
51 |
public SetPredicate(final Set<String> set) { |
|
52 |
this.set = set; |
|
53 |
} |
|
54 |
|
|
55 |
@Override |
|
56 |
public boolean apply(final String s) { |
|
57 |
return !set.contains(s); |
|
58 |
} |
|
59 |
} |
|
60 |
|
|
61 |
public static Function<Entry<String, HadoopJob>, HadoopJobDescriptor> hadoopJobDescriptor() { |
|
62 |
return new Function<Entry<String, HadoopJob>, HadoopJobDescriptor>() { |
|
63 |
|
|
64 |
@Override |
|
65 |
public HadoopJobDescriptor apply(final Entry<String, HadoopJob> e) { |
|
66 |
try { |
|
67 |
return e.getValue().asDescriptor(); |
|
68 |
} catch (HadoopServiceException e1) { |
|
69 |
return null; |
|
70 |
} |
|
71 |
} |
|
72 |
}; |
|
73 |
} |
|
74 |
|
|
75 |
public static Predicate<HadoopJob> filterByCluster(final ClusterName clusterName) { |
|
76 |
return new Predicate<HadoopJob>() { |
|
77 |
|
|
78 |
@Override |
|
79 |
public boolean apply(final HadoopJob job) { |
|
80 |
return job.getClusterName().equals(clusterName); |
|
81 |
} |
|
82 |
|
|
83 |
}; |
|
84 |
} |
|
85 |
|
|
86 | 24 |
} |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/JobRegistry.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.util.Date; |
4 | 4 |
import java.util.List; |
5 |
import java.util.Map; |
|
6 | 5 |
import java.util.Map.Entry; |
6 |
import java.util.Objects; |
|
7 |
import java.util.stream.Collectors; |
|
7 | 8 |
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
import org.springframework.beans.factory.annotation.Required; |
|
11 |
|
|
12 | 9 |
import com.google.common.collect.BiMap; |
13 | 10 |
import com.google.common.collect.HashBiMap; |
14 |
import com.google.common.collect.Iterables; |
|
15 |
import com.google.common.collect.Lists; |
|
16 |
import com.google.common.collect.Maps; |
|
17 |
|
|
18 | 11 |
import eu.dnetlib.data.hadoop.HadoopJob.Status; |
19 | 12 |
import eu.dnetlib.data.hadoop.config.ClusterName; |
20 | 13 |
import eu.dnetlib.data.hadoop.rmi.HadoopJobDescriptor; |
21 | 14 |
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException; |
22 | 15 |
import eu.dnetlib.data.hadoop.utils.HadoopUtils; |
16 |
import org.apache.commons.logging.Log; |
|
17 |
import org.apache.commons.logging.LogFactory; |
|
18 |
import org.springframework.beans.factory.annotation.Required; |
|
23 | 19 |
|
24 | 20 |
public class JobRegistry { |
25 | 21 |
|
... | ... | |
84 | 80 |
} |
85 | 81 |
|
86 | 82 |
public List<HadoopJobDescriptor> listJobs(ClusterName clusterName) { |
87 |
Map<String, HadoopJob> filtered = Maps.filterValues(jobs, HadoopUtils.filterByCluster(clusterName)); |
|
88 |
return Lists.newArrayList(Iterables.transform(filtered.entrySet(), HadoopUtils.hadoopJobDescriptor())); |
|
83 |
return jobs.values().stream() |
|
84 |
.filter(j -> clusterName.equals(j.getClusterName())) |
|
85 |
.map(HadoopUtils.asDescriptor()) |
|
86 |
.filter(Objects::nonNull) |
|
87 |
.collect(Collectors.toList()); |
|
89 | 88 |
} |
90 | 89 |
|
91 | 90 |
@Required |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/HadoopServiceCore.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 | 4 |
import java.net.URI; |
5 |
import java.util.List; |
|
6 |
import java.util.Map; |
|
5 |
import java.util.*; |
|
7 | 6 |
import java.util.Map.Entry; |
8 |
import java.util.NavigableMap; |
|
9 |
import java.util.Set; |
|
7 |
import java.util.stream.Collectors; |
|
10 | 8 |
|
11 |
import com.google.common.base.Function; |
|
12 |
import com.google.common.collect.Iterables; |
|
13 | 9 |
import com.google.common.collect.Lists; |
14 | 10 |
import com.google.common.collect.Maps; |
15 | 11 |
import com.google.common.collect.Sets; |
... | ... | |
20 | 16 |
import eu.dnetlib.data.hadoop.rmi.hbase.HBaseRowDescriptor; |
21 | 17 |
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableDescriptor; |
22 | 18 |
import eu.dnetlib.data.hadoop.rmi.hbase.schema.HBaseTableRegionInfo; |
23 |
import eu.dnetlib.data.hadoop.utils.HadoopUtils; |
|
24 | 19 |
import org.apache.commons.lang.StringUtils; |
25 | 20 |
import org.apache.commons.logging.Log; |
26 | 21 |
import org.apache.commons.logging.LogFactory; |
... | ... | |
46 | 41 |
|
47 | 42 |
public List<String> listTables(final ClusterName clusterName) throws IOException, HadoopServiceException { |
48 | 43 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
49 |
return Lists.newArrayList(Iterables.transform(Lists.newArrayList(admin.listTables()), HadoopUtils.tableName())); |
|
44 |
return Arrays.asList(admin.listTables()) |
|
45 |
.stream() |
|
46 |
.map(HTableDescriptor::getNameAsString) |
|
47 |
.collect(Collectors.toList()); |
|
50 | 48 |
} |
51 | 49 |
|
52 | 50 |
private HBaseAdmin getHBaseAdmin(final ClusterName clusterName) throws HadoopServiceException { |
... | ... | |
70 | 68 |
|
71 | 69 |
final Set<String> columns = Sets.newHashSet(); |
72 | 70 |
|
73 |
for (HColumnDescriptor hColDesc : Lists.newArrayList(desc.getColumnFamilies())) {
|
|
71 |
for (HColumnDescriptor hColDesc : Arrays.asList(desc.getColumnFamilies())) {
|
|
74 | 72 |
columns.add(hColDesc.getNameAsString()); |
75 | 73 |
} |
76 | 74 |
|
... | ... | |
94 | 92 |
public List<String> describeTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
95 | 93 |
final HBaseAdmin admin = getHBaseAdmin(clusterName); |
96 | 94 |
final HTableDescriptor desc = admin.getTableDescriptor(table.getBytes()); |
97 |
|
|
98 |
return Lists.newArrayList(Iterables.transform(desc.getFamilies(), new Function<HColumnDescriptor, String>() { |
|
99 |
|
|
100 |
@Override |
|
101 |
public String apply(final HColumnDescriptor desc) { |
|
102 |
return desc.getNameAsString(); |
|
103 |
} |
|
104 |
})); |
|
95 |
return desc.getFamilies().stream() |
|
96 |
.map(d -> d.getNameAsString()) |
|
97 |
.collect(Collectors.toList()); |
|
105 | 98 |
} |
106 | 99 |
|
107 | 100 |
public void truncateTable(final ClusterName clusterName, final String table) throws IOException, HadoopServiceException { |
... | ... | |
201 | 194 |
createTable(clusterName, table, columns); |
202 | 195 |
} else { |
203 | 196 |
final HTableDescriptor desc = admin.getTableDescriptor(Bytes.toBytes(table)); |
204 |
final Set<String> foundColumns = Sets.newHashSet(Iterables.transform(Lists.newArrayList(desc.getColumnFamilies()), HadoopUtils.columnName())); |
|
205 | 197 |
|
198 |
final Set<String> foundColumns = desc.getFamilies().stream() |
|
199 |
.map(d -> d.getNameAsString()) |
|
200 |
.collect(Collectors.toCollection(HashSet::new)); |
|
201 |
|
|
206 | 202 |
log.info("ensuring columns on table " + table + ": " + columns); |
207 |
final List<String> missingColumns = Lists.newArrayList(Iterables.filter(columns, HadoopUtils.columnPredicate(foundColumns)));
|
|
203 |
final Collection<String> missingColumns = Sets.difference(columns, foundColumns);
|
|
208 | 204 |
if (!missingColumns.isEmpty()) { |
209 | 205 |
|
210 | 206 |
if (admin.isTableEnabled(table)) { |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/oozie/OozieJobMonitor.java | ||
---|---|---|
64 | 64 |
if (status == Status.SUCCEEDED) { |
65 | 65 |
// TODO set some content to return to the blackboard msg. |
66 | 66 |
|
67 |
log.info(String.format("start looking for oozie job(%s) output values: %s", getHadoopId(), workflowActions)); |
|
67 | 68 |
final Properties report = getReport(getOozieClient(), getHadoopId(), workflowActions); |
68 | 69 |
if (report != null) { |
69 | 70 |
final Map<String, String> map = Maps.newHashMap(); |
70 | 71 |
report.forEach((k, v) -> map.put(k.toString(), v.toString())); |
72 |
log.info("found oozie job report, size: " + map.size()); |
|
71 | 73 |
getCallback().done(map); |
72 | 74 |
} else { |
75 |
log.warn("cannot find oozie job report!"); |
|
73 | 76 |
getCallback().done(new HashMap<>()); |
74 | 77 |
} |
75 | 78 |
} else { |
... | ... | |
89 | 92 |
private static Properties getReport(final OozieClient oozieClient, final String oozieJobId, final Set<String> workflowActions) throws OozieClientException, IOException { |
90 | 93 |
WorkflowJob oozieJob = oozieClient.getJobInfo(oozieJobId); |
91 | 94 |
for (WorkflowAction currentAction : oozieJob.getActions()) { |
92 |
log.debug(String.format("looking for workflo actions to report, current: '%s'", currentAction.getName()));
|
|
95 |
log.info(String.format("looking for workflow actions to report, current: '%s'", currentAction.getName()));
|
|
93 | 96 |
if (workflowActions.contains(currentAction.getName())) { |
97 |
log.info(String.format("found workflow action %s", currentAction.getName())); |
|
94 | 98 |
if (ACTION_TYPE_SUBWORKFLOW.equals(currentAction.getType())) { |
99 |
log.info(String.format("looking for sub-workflow actions external id: %s", currentAction.getExternalId())); |
|
95 | 100 |
Properties subworkflowProperties = getReport(oozieClient, currentAction.getExternalId(), workflowActions); |
96 | 101 |
if (subworkflowProperties != null) { |
97 | 102 |
return subworkflowProperties; |
... | ... | |
99 | 104 |
} else if (StringUtils.isNotBlank(currentAction.getData())) { |
100 | 105 |
Properties properties = new Properties(); |
101 | 106 |
properties.load(IOUtils.toInputStream(currentAction.getData())); |
107 |
log.info(String.format("found workflow action(%s) properties size %s", currentAction.getName(), properties.values().size())); |
|
102 | 108 |
return properties; |
103 | 109 |
} |
110 |
} else { |
|
111 |
log.info(String.format("cannot find workflow action(%s) properties", currentAction.getName())); |
|
104 | 112 |
} |
105 | 113 |
} |
106 |
|
|
107 | 114 |
return null; |
108 | 115 |
} |
109 | 116 |
|
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/AbstractSubmitAction.java | ||
---|---|---|
137 | 137 |
@Override |
138 | 138 |
public void done(final Map<String, String> properties) { |
139 | 139 |
bbJob.getParameters().putAll(properties); |
140 |
log.info(jobName + " completed successfully");
|
|
140 |
log.info(String.format("%s completed successfully, returning %s output params", jobName, properties.size()));
|
|
141 | 141 |
handler.done(bbJob); |
142 | 142 |
decrementRunningJobs(jobName); |
143 | 143 |
} |
modules/dnet-hadoop-service/trunk/src/main/java/eu/dnetlib/data/hadoop/action/AbstractHadoopAction.java | ||
---|---|---|
57 | 57 |
|
58 | 58 |
@Override |
59 | 59 |
public void execute(final BlackboardServerHandler handler, final BlackboardJob job) { |
60 |
executor.execute(new Runnable() { |
|
61 |
|
|
62 |
@Override |
|
63 |
public void run() { |
|
64 |
try { |
|
65 |
handler.ongoing(job); |
|
66 |
executeAsync(handler, job); |
|
67 |
} catch (final Throwable e) { |
|
68 |
log.error(ExceptionUtils.getFullStackTrace(e)); |
|
69 |
handler.failed(job, e); |
|
70 |
} |
|
60 |
executor.execute(() -> { |
|
61 |
try { |
|
62 |
handler.ongoing(job); |
|
63 |
executeAsync(handler, job); |
|
64 |
} catch (final Throwable e) { |
|
65 |
log.error(ExceptionUtils.getFullStackTrace(e)); |
|
66 |
handler.failed(job, e); |
|
71 | 67 |
} |
72 | 68 |
}); |
73 | 69 |
} |
Also available in: Unified diff
more logging in the oozie job monitor procedure that tries to fetch the job report. More java8 refactorings