Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.datacite;
2

    
3
import java.util.concurrent.Executor;
4
import java.util.concurrent.Executors;
5
import java.util.concurrent.LinkedBlockingQueue;
6

    
7
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
8
import eu.dnetlib.enabling.resultset.factory.ResultSetFactory;
9
import eu.dnetlib.msro.workflows.graph.Arc;
10
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
11
import eu.dnetlib.msro.workflows.procs.Env;
12
import eu.dnetlib.rmi.common.ResultSet;
13
import org.springframework.beans.factory.annotation.Autowired;
14

    
15
/**
16
 * The Class SplitDatasetRecord.
17
 */
18
public class SplitDatasetRecord extends SimpleJobNode {
19

    
20
	/**
21
	 * The executor.
22
	 */
23
	private final Executor executor = Executors.newSingleThreadExecutor();
24
	/**
25
	 * The input epr parm.
26
	 */
27
	private String inputEprParm;
28
	/**
29
	 * The output epr parm.
30
	 */
31
	private String outputEprParm;
32
	/**
33
	 * The result set client factory.
34
	 */
35
	@Autowired
36
	private ResultSetClient resultSetClient;
37
	/**
38
	 * The result set factory.
39
	 */
40
	@Autowired
41
	private ResultSetFactory resultSetFactory;
42

    
43
	/*
44
	 * (non-Javadoc)
45
	 *
46
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
47
	 */
48
	@Override
49
	protected String execute(final Env env) throws Exception {
50
		final ResultSet<?> inputEpr = env.getAttribute(this.inputEprParm, ResultSet.class);
51
		final Iterable<String> input = this.resultSetClient.iter(inputEpr, String.class);
52
		final LinkedBlockingQueue<String> publicationsQueue = new LinkedBlockingQueue<String>();
53
		final SplitterDatasetsIterator splitterIterator = new SplitterDatasetsIterator(publicationsQueue, input, "publications");
54

    
55
		this.executor.execute(new Runnable() {
56

    
57
			@Override
58
			public void run() {
59
				splitterIterator.populateQueues();
60
			}
61
		});
62

    
63
		final ResultSet<String> eprUrls_publications =
64
				this.resultSetFactory.createResultSet(() -> new IteratorOnQueue<String>(publicationsQueue, SplitterDatasetsIterator.END_QUEUE));
65

    
66
		env.setAttribute(getOutputEprParm(), eprUrls_publications);
67

    
68
		return Arc.DEFAULT_ARC;
69
	}
70

    
71
	/**
72
	 * Gets the input epr parm.
73
	 *
74
	 * @return the inputEprParm
75
	 */
76
	public String getInputEprParm() {
77
		return this.inputEprParm;
78
	}
79

    
80
	/**
81
	 * Sets the input epr parm.
82
	 *
83
	 * @param inputEprParm the inputEprParm to set
84
	 */
85
	public void setInputEprParm(final String inputEprParm) {
86
		this.inputEprParm = inputEprParm;
87
	}
88

    
89
	/**
90
	 * Gets the output epr parm.
91
	 *
92
	 * @return the output epr parm
93
	 */
94
	public String getOutputEprParm() {
95
		return this.outputEprParm;
96
	}
97

    
98
	/**
99
	 * Sets the output epr parm.
100
	 *
101
	 * @param outputEprParm the new output epr parm
102
	 */
103
	public void setOutputEprParm(final String outputEprParm) {
104
		this.outputEprParm = outputEprParm;
105
	}
106

    
107
}
(3-3/4)