Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.datacite;
2

    
3
import java.util.Iterator;
4
import java.util.concurrent.Executor;
5
import java.util.concurrent.Executors;
6
import java.util.concurrent.LinkedBlockingQueue;
7
import javax.annotation.Resource;
8
import javax.xml.ws.wsaddressing.W3CEndpointReference;
9

    
10
import com.googlecode.sarasvati.Arc;
11
import com.googlecode.sarasvati.NodeToken;
12
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
13
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
14
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
15
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
16

    
17
/**
18
 * The Class SplitDatasetRecord.
19
 */
20
public class SplitDatasetRecord extends SimpleJobNode {
21

    
22
	/**
23
	 * The input epr parm.
24
	 */
25
	private String inputEprParm;
26

    
27
	/**
28
	 * The output epr parm.
29
	 */
30
	private String outputEprParm;
31

    
32
	/**
33
	 * The result set client factory.
34
	 */
35
	private ResultSetClientFactory resultSetClientFactory;
36

    
37
	/**
38
	 * The result set factory.
39
	 */
40
	@Resource(name = "iterableResultSetFactory")
41
	private IterableResultSetFactory resultSetFactory;
42

    
43
	/**
44
	 * The executor.
45
	 */
46
	private Executor executor = Executors.newSingleThreadExecutor();
47

    
48
	/*
49
	 * (non-Javadoc)
50
	 *
51
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
52
	 */
53
	@Override
54
	protected String execute(final NodeToken token) throws Exception {
55
		final W3CEndpointReference inputEpr = new EPRUtils().getEpr(token.getEnv().getAttribute(inputEprParm));
56
		Iterable<String> input = resultSetClientFactory.getClient(inputEpr);
57
		final LinkedBlockingQueue<String> publicationsQueue = new LinkedBlockingQueue<String>();
58
		final SplitterDatasetsIterator splitterIterator = new SplitterDatasetsIterator(publicationsQueue, input, "publications");
59
		executor.execute(new Runnable() {
60

    
61
			@Override
62
			public void run() {
63
				splitterIterator.populateQueues();
64
			}
65
		});
66

    
67
		W3CEndpointReference eprUrls_publications = resultSetFactory.createIterableResultSet(new Iterable<String>() {
68

    
69
			@Override
70
			public Iterator<String> iterator() {
71
				return new IteratorOnQueue<String>(publicationsQueue, SplitterDatasetsIterator.END_QUEUE);
72
			}
73
		});
74

    
75
		token.getEnv().setAttribute(getOutputEprParm(), eprUrls_publications.toString());
76

    
77
		return Arc.DEFAULT_ARC;
78
	}
79

    
80
	/**
81
	 * Gets the input epr parm.
82
	 *
83
	 * @return the inputEprParm
84
	 */
85
	public String getInputEprParm() {
86
		return inputEprParm;
87
	}
88

    
89
	/**
90
	 * Sets the input epr parm.
91
	 *
92
	 * @param inputEprParm the inputEprParm to set
93
	 */
94
	public void setInputEprParm(final String inputEprParm) {
95
		this.inputEprParm = inputEprParm;
96
	}
97

    
98
	/**
99
	 * Gets the result set client factory.
100
	 *
101
	 * @return the resultSetClientFactory
102
	 */
103
	public ResultSetClientFactory getResultSetClientFactory() {
104
		return resultSetClientFactory;
105
	}
106

    
107
	/**
108
	 * Sets the result set client factory.
109
	 *
110
	 * @param resultSetClientFactory the resultSetClientFactory to set
111
	 */
112
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
113
		this.resultSetClientFactory = resultSetClientFactory;
114
	}
115

    
116
	/**
117
	 * Gets the output epr parm.
118
	 *
119
	 * @return the output epr parm
120
	 */
121
	public String getOutputEprParm() {
122
		return outputEprParm;
123
	}
124

    
125
	/**
126
	 * Sets the output epr parm.
127
	 *
128
	 * @param outputEprParm the new output epr parm
129
	 */
130
	public void setOutputEprParm(final String outputEprParm) {
131
		this.outputEprParm = outputEprParm;
132
	}
133

    
134
}
(3-3/4)