Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes.datacite;
2

    
3
import java.io.StringReader;
4
import java.util.concurrent.ArrayBlockingQueue;
5
import java.util.concurrent.BlockingQueue;
6
import java.util.concurrent.Executor;
7
import java.util.concurrent.Executors;
8

    
9
import javax.annotation.Resource;
10
import javax.xml.ws.wsaddressing.W3CEndpointReference;
11

    
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.dom4j.Document;
15
import org.dom4j.io.SAXReader;
16

    
17
import com.googlecode.sarasvati.Arc;
18
import com.googlecode.sarasvati.NodeToken;
19

    
20
import eu.dnetlib.data.collector.plugins.datasets.DatasetsByJournalIterator;
21
import eu.dnetlib.data.collector.plugins.datasets.PangaeaJournalInfo;
22
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
23
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
24
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
25
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26

    
27
// TODO: Auto-generated Javadoc
28
/**
29
 * The Class CollectDatasetsByJournalJobNode.
30
 */
31
public class CollectDatasetsByJournalJobNode extends SimpleJobNode {
32

    
33
	private static final Log log = LogFactory.getLog(CollectDatasetsByJournalJobNode.class);
34

    
35
	public static PangaeaJournalInfo END_QUEUE = new PangaeaJournalInfo();
36

    
37
	/** The datasource id. */
38
	private String datasourceId;
39

    
40
	/** The output epr param. */
41
	private String outputEprParam;
42

    
43
	/** The input epr param. */
44
	private String inputEprParam;
45

    
46
	/** The result set client factory. */
47
	private ResultSetClientFactory resultSetClientFactory;
48

    
49
	/** The result set factory. */
50
	@Resource(name = "iterableResultSetFactory")
51
	private IterableResultSetFactory resultSetFactory;
52

    
53
	/** The executor. */
54
	private Executor executor = Executors.newSingleThreadExecutor();
55

    
56
	/*
57
	 * (non-Javadoc)
58
	 * 
59
	 * @see eu.dnetlib.msro.workflows.nodes.SimpleJobNode#execute(com.googlecode.sarasvati.NodeToken)
60
	 */
61
	@Override
62
	protected String execute(final NodeToken token) throws Exception {
63
		final W3CEndpointReference inputEpr = new EPRUtils().getEpr(token.getEnv().getAttribute(inputEprParam));
64
		final Iterable<String> input = resultSetClientFactory.getClient(inputEpr);
65
		final BlockingQueue<PangaeaJournalInfo> publicationsQueue = new ArrayBlockingQueue<PangaeaJournalInfo>(50);
66

    
67
		executor.execute(new Runnable() {
68

    
69
			@Override
70
			public void run() {
71
				final SAXReader reader = new SAXReader();
72

    
73
				for (String inputString : input) {
74
					try {
75
						Document doc = reader.read(new StringReader(inputString));
76
						final String jId = doc.valueOf("//FIELD[@name='id']");
77
						final String dsId = doc.valueOf("//FIELD[@name='datasource']");
78
						final String dsName = doc.valueOf("//FIELD[@name='name']");
79
						final String jISSN = doc.valueOf("//FIELD[@name='issn']");
80
						PangaeaJournalInfo info = new PangaeaJournalInfo();
81
						info.setDatasourceId(dsId);
82
						info.setJournalId(jId);
83
						info.setJournalName(dsName);
84
						info.setJournalISSN(jISSN);
85
						publicationsQueue.put(info);
86
					} catch (Exception e) {
87
						log.error(e);
88
					}
89
				}
90
				try {
91
					publicationsQueue.put(END_QUEUE);
92
				} catch (InterruptedException e) {
93
					log.error(e);
94
				}
95

    
96
			}
97
		});
98

    
99
		IteratorOnQueue<PangaeaJournalInfo> itOnQueue = new IteratorOnQueue<PangaeaJournalInfo>(publicationsQueue, CollectDatasetsByJournalJobNode.END_QUEUE);
100

    
101
		W3CEndpointReference eprOutput = resultSetFactory.createIterableResultSet(new DatasetsByJournalIterator(itOnQueue));
102
		token.getEnv().setAttribute(getOutputEprParam(), eprOutput.toString());
103
		return Arc.DEFAULT_ARC;
104
	}
105

    
106
	/**
107
	 * Gets the datasource id.
108
	 *
109
	 * @return the datasourceId
110
	 */
111
	public String getDatasourceId() {
112
		return datasourceId;
113
	}
114

    
115
	/**
116
	 * Sets the datasource id.
117
	 *
118
	 * @param datasourceId
119
	 *            the datasourceId to set
120
	 */
121
	public void setDatasourceId(final String datasourceId) {
122
		this.datasourceId = datasourceId;
123
	}
124

    
125
	/**
126
	 * @return the outputEprParam
127
	 */
128
	public String getOutputEprParam() {
129
		return outputEprParam;
130
	}
131

    
132
	/**
133
	 * @param outputEprParam
134
	 *            the outputEprParam to set
135
	 */
136
	public void setOutputEprParam(final String outputEprParam) {
137
		this.outputEprParam = outputEprParam;
138
	}
139

    
140
	/**
141
	 * @return the inputEprParam
142
	 */
143
	public String getInputEprParam() {
144
		return inputEprParam;
145
	}
146

    
147
	/**
148
	 * @param inputEprParam
149
	 *            the inputEprParam to set
150
	 */
151
	public void setInputEprParam(final String inputEprParam) {
152
		this.inputEprParam = inputEprParam;
153
	}
154

    
155
	/**
156
	 * Gets the result set client factory.
157
	 *
158
	 * @return the resultSetClientFactory
159
	 */
160
	public ResultSetClientFactory getResultSetClientFactory() {
161
		return resultSetClientFactory;
162
	}
163

    
164
	/**
165
	 * Sets the result set client factory.
166
	 *
167
	 * @param resultSetClientFactory
168
	 *            the resultSetClientFactory to set
169
	 */
170
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
171
		this.resultSetClientFactory = resultSetClientFactory;
172
	}
173

    
174
}
(1-1/4)