Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.functions;
2

    
3
import java.util.function.Function;
4
import java.util.stream.Stream;
5

    
6
import eu.dnetlib.msro.exceptions.MSROException;
7
import eu.dnetlib.msro.workflows.Arc;
8
import eu.dnetlib.msro.workflows.nodes.SimpleProcessNode;
9
import eu.dnetlib.msro.workflows.procs.Token;
10

    
11
public abstract class AbstractApplyFunctionJobNode<T, K> extends SimpleProcessNode {
12

    
13
	private Stream<T> inputStream;
14
	private Stream<K> outputStream;
15

    
16
	@Override
17
	protected final String execute(final Token token) throws Exception {
18
		if (inputStream == null) { throw new MSROException("inputStream not found in ENV"); }
19
		outputStream = inputStream.map(getFunction());
20
		return Arc.DEFAULT_ARC;
21
	}
22

    
23
	protected abstract Function<T, K> getFunction() throws MSROException;
24

    
25
	public Stream<T> getInputStream() {
26
		return inputStream;
27
	}
28

    
29
	public void setInputStream(final Stream<T> inputStream) {
30
		this.inputStream = inputStream;
31
	}
32

    
33
	public Stream<K> getOutputStream() {
34
		return outputStream;
35
	}
36

    
37
	public void setOutputStream(final Stream<K> outputStream) {
38
		this.outputStream = outputStream;
39
	}
40

    
41
}
(1-1/5)