Revision 46885
Added by Michele Artini almost 7 years ago
NodeHelper.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.workflows.util; |
2 | 2 |
|
3 | 3 |
import java.util.HashMap; |
4 |
import java.util.HashSet; |
|
5 | 4 |
import java.util.List; |
6 | 5 |
import java.util.Map; |
7 |
import java.util.Set;
|
|
6 |
import java.util.Optional;
|
|
8 | 7 |
import java.util.stream.Collectors; |
9 | 8 |
|
9 |
import javax.annotation.PostConstruct; |
|
10 |
|
|
10 | 11 |
import org.apache.commons.lang3.StringUtils; |
11 | 12 |
import org.apache.commons.logging.Log; |
12 | 13 |
import org.apache.commons.logging.LogFactory; |
... | ... | |
24 | 25 |
import eu.dnetlib.msro.exceptions.MSROException; |
25 | 26 |
import eu.dnetlib.msro.workflows.GraphNode; |
26 | 27 |
import eu.dnetlib.msro.workflows.GraphNodeParameter; |
27 |
import eu.dnetlib.msro.workflows.nodes.AbstractParallelProcessNode;
|
|
28 |
import eu.dnetlib.msro.workflows.nodes.AbstractProcessNode; |
|
28 | 29 |
import eu.dnetlib.msro.workflows.nodes.DefaultJobNode; |
29 | 30 |
import eu.dnetlib.msro.workflows.nodes.SuccessNode; |
30 | 31 |
import eu.dnetlib.msro.workflows.procs.Env; |
... | ... | |
41 | 42 |
|
42 | 43 |
private ApplicationContext applicationContext; |
43 | 44 |
|
44 |
private Set<String> validTypes = new HashSet<>();
|
|
45 |
private Map<String, Class<?>> validTypes = new HashMap<>();
|
|
45 | 46 |
|
46 |
public AbstractParallelProcessNode newProcessNode(final GraphNode node, final WorkflowProcess process, final Env env) throws MSROException { |
|
47 |
@PostConstruct |
|
48 |
public void init() { |
|
49 |
validTypes = applicationContext.getBeansWithAnnotation(ProcessNode.class) |
|
50 |
.values() |
|
51 |
.stream() |
|
52 |
.filter(bean -> bean instanceof AbstractProcessNode) |
|
53 |
.map(bean -> (AbstractProcessNode) bean) |
|
54 |
.collect(Collectors.toMap( |
|
55 |
bean -> bean.getClass().getAnnotation(ProcessNode.class).value(), |
|
56 |
Object::getClass)); |
|
57 |
|
|
58 |
} |
|
59 |
|
|
60 |
public AbstractProcessNode newProcessNode(final GraphNode node, final WorkflowProcess process, final Env env) throws MSROException { |
|
47 | 61 |
if (node.isSucessNode()) { |
48 | 62 |
return new SuccessNode(); |
49 | 63 |
} else if (StringUtils.isBlank(node.getType())) { |
50 | 64 |
return new DefaultJobNode(node.getName()); |
51 | 65 |
} else { |
52 |
final AbstractParallelProcessNode pnode = applicationContext.getBeansWithAnnotation(ProcessNode.class)
|
|
66 |
final Optional<AbstractProcessNode> optional = applicationContext.getBeansWithAnnotation(ProcessNode.class)
|
|
53 | 67 |
.values() |
54 | 68 |
.stream() |
55 | 69 |
.filter(bean -> bean.getClass().getAnnotation(ProcessNode.class).value().equals(node.getType())) |
56 |
.filter(bean -> bean instanceof AbstractParallelProcessNode) |
|
57 |
.map(bean -> (AbstractParallelProcessNode) bean) |
|
58 |
.findFirst() |
|
59 |
.get(); |
|
70 |
.filter(bean -> bean instanceof AbstractProcessNode) |
|
71 |
.map(bean -> (AbstractProcessNode) bean) |
|
72 |
.findFirst(); |
|
60 | 73 |
|
61 |
if (pnode != null) { |
|
74 |
if (optional.isPresent()) { |
|
75 |
final AbstractProcessNode pnode = optional.get(); |
|
76 |
|
|
62 | 77 |
pnode.setNodeName(node.getName()); |
63 | 78 |
// I invoke the setter methods using the static params of the graph node |
64 | 79 |
try { |
... | ... | |
71 | 86 |
} |
72 | 87 |
return pnode; |
73 | 88 |
} else { |
74 |
log.error("cannot find node " + node.getType()); |
|
89 |
log.error("cannot find node " + node.getType() + ", valid types are: " + validTypes);
|
|
75 | 90 |
throw new MSROException("cannot find node " + node.getType()); |
76 | 91 |
} |
77 | 92 |
} |
78 | 93 |
} |
79 | 94 |
|
80 |
public List<AbstractParallelProcessNode> availableNodes() { |
|
81 |
return applicationContext.getBeansOfType(AbstractParallelProcessNode.class) |
|
82 |
.values() |
|
83 |
.stream() |
|
84 |
.filter(cl -> cl.getClass().isAnnotationPresent(ProcessNode.class)) |
|
85 |
.collect(Collectors.toList()); |
|
86 |
} |
|
87 |
|
|
88 | 95 |
public Map<String, Object> resolveParamsWithNoEnv(final GraphNode node) { |
89 | 96 |
return resolveParams(node, null); |
90 | 97 |
} |
... | ... | |
153 | 160 |
} |
154 | 161 |
|
155 | 162 |
public synchronized boolean isValidType(final String type) { |
156 |
if (validTypes.isEmpty()) { |
|
157 |
validTypes = applicationContext.getBeansWithAnnotation(ProcessNode.class) |
|
158 |
.values() |
|
159 |
.stream() |
|
160 |
.filter(bean -> bean instanceof AbstractParallelProcessNode) |
|
161 |
.map(bean -> (AbstractParallelProcessNode) bean) |
|
162 |
.map(bean -> bean.getClass().getAnnotation(ProcessNode.class).value()) |
|
163 |
.collect(Collectors.toSet()); |
|
164 |
} |
|
163 |
return validTypes.containsKey(type); |
|
164 |
} |
|
165 | 165 |
|
166 |
return validTypes.contains(type); |
|
166 |
public Map<String, Class<?>> getValidTypes() { |
|
167 |
return validTypes; |
|
167 | 168 |
} |
168 | 169 |
|
169 | 170 |
} |
Also available in: Unified diff
Partial response of async methods