Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.datacite;
2

    
3
import java.io.StringReader;
4
import java.util.concurrent.ArrayBlockingQueue;
5
import java.util.concurrent.BlockingQueue;
6
import java.util.concurrent.Executor;
7
import java.util.concurrent.Executors;
8
import javax.annotation.Resource;
9
import javax.xml.ws.wsaddressing.W3CEndpointReference;
10

    
11
import com.googlecode.sarasvati.Arc;
12
import com.googlecode.sarasvati.NodeToken;
13
import eu.dnetlib.data.collector.plugins.datasets.DatasetsByJournalIterator;
14
import eu.dnetlib.data.collector.plugins.datasets.PangaeaJournalInfo;
15
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
16
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
17
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
18
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
19
import org.apache.commons.logging.Log;
20
import org.apache.commons.logging.LogFactory;
21
import org.dom4j.Document;
22
import org.dom4j.io.SAXReader;
23

    
24
// TODO: Auto-generated Javadoc
25

    
26
/**
27
 * The Class CollectDatasetsByJournalJobNode.
28
 */
29
public class CollectDatasetsByJournalJobNode extends SimpleJobNode {
30

    
31
	private static final Log log = LogFactory.getLog(CollectDatasetsByJournalJobNode.class);
32

    
33
	public static PangaeaJournalInfo END_QUEUE = new PangaeaJournalInfo();
34

    
35
	/**
36
	 * The datasource id.
37
	 */
38
	private String datasourceId;
39

    
40
	/**
41
	 * The output epr param.
42
	 */
43
	private String outputEprParam;
44

    
45
	/**
46
	 * The input epr param.
47
	 */
48
	private String inputEprParam;
49

    
50
	/**
51
	 * The result set client factory.
52
	 */
53
	private ResultSetClientFactory resultSetClientFactory;
54

    
55
	/**
56
	 * The result set factory.
57
	 */
58
	@Resource(name = "iterableResultSetFactory")
59
	private IterableResultSetFactory resultSetFactory;
60

    
61
	/**
62
	 * The executor.
63
	 */
64
	private Executor executor = Executors.newSingleThreadExecutor();
65

    
66
	/*
67
	 * (non-Javadoc)
68
	 * 
69
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
70
	 */
71
	@Override
72
	protected String execute(final NodeToken token) throws Exception {
73
		final W3CEndpointReference inputEpr = new EPRUtils().getEpr(token.getEnv().getAttribute(inputEprParam));
74
		final Iterable<String> input = resultSetClientFactory.getClient(inputEpr);
75
		final BlockingQueue<PangaeaJournalInfo> publicationsQueue = new ArrayBlockingQueue<PangaeaJournalInfo>(50);
76

    
77
		executor.execute(new Runnable() {
78

    
79
			@Override
80
			public void run() {
81
				final SAXReader reader = new SAXReader();
82

    
83
				for (String inputString : input) {
84
					try {
85
						Document doc = reader.read(new StringReader(inputString));
86
						final String jId = doc.valueOf("//FIELD[@name='id']");
87
						final String dsId = doc.valueOf("//FIELD[@name='datasource']");
88
						final String dsName = doc.valueOf("//FIELD[@name='name']");
89
						final String jISSN = doc.valueOf("//FIELD[@name='issn']");
90
						PangaeaJournalInfo info = new PangaeaJournalInfo();
91
						info.setDatasourceId(dsId);
92
						info.setJournalId(jId);
93
						info.setJournalName(dsName);
94
						info.setJournalISSN(jISSN);
95
						publicationsQueue.put(info);
96
					} catch (Exception e) {
97
						log.error(e);
98
					}
99
				}
100
				try {
101
					publicationsQueue.put(END_QUEUE);
102
				} catch (InterruptedException e) {
103
					log.error(e);
104
				}
105

    
106
			}
107
		});
108

    
109
		IteratorOnQueue<PangaeaJournalInfo> itOnQueue = new IteratorOnQueue<PangaeaJournalInfo>(publicationsQueue, CollectDatasetsByJournalJobNode.END_QUEUE);
110

    
111
		W3CEndpointReference eprOutput = resultSetFactory.createIterableResultSet(new DatasetsByJournalIterator(itOnQueue));
112
		token.getEnv().setAttribute(getOutputEprParam(), eprOutput.toString());
113
		return Arc.DEFAULT_ARC;
114
	}
115

    
116
	/**
117
	 * Gets the datasource id.
118
	 *
119
	 * @return the datasourceId
120
	 */
121
	public String getDatasourceId() {
122
		return datasourceId;
123
	}
124

    
125
	/**
126
	 * Sets the datasource id.
127
	 *
128
	 * @param datasourceId the datasourceId to set
129
	 */
130
	public void setDatasourceId(final String datasourceId) {
131
		this.datasourceId = datasourceId;
132
	}
133

    
134
	/**
135
	 * @return the outputEprParam
136
	 */
137
	public String getOutputEprParam() {
138
		return outputEprParam;
139
	}
140

    
141
	/**
142
	 * @param outputEprParam the outputEprParam to set
143
	 */
144
	public void setOutputEprParam(final String outputEprParam) {
145
		this.outputEprParam = outputEprParam;
146
	}
147

    
148
	/**
149
	 * @return the inputEprParam
150
	 */
151
	public String getInputEprParam() {
152
		return inputEprParam;
153
	}
154

    
155
	/**
156
	 * @param inputEprParam the inputEprParam to set
157
	 */
158
	public void setInputEprParam(final String inputEprParam) {
159
		this.inputEprParam = inputEprParam;
160
	}
161

    
162
	/**
163
	 * Gets the result set client factory.
164
	 *
165
	 * @return the resultSetClientFactory
166
	 */
167
	public ResultSetClientFactory getResultSetClientFactory() {
168
		return resultSetClientFactory;
169
	}
170

    
171
	/**
172
	 * Sets the result set client factory.
173
	 *
174
	 * @param resultSetClientFactory the resultSetClientFactory to set
175
	 */
176
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
177
		this.resultSetClientFactory = resultSetClientFactory;
178
	}
179

    
180
}
(1-1/4)