Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes.unpack;
2

    
3
import java.io.StringReader;
4
import java.util.Iterator;
5
import java.util.Queue;
6
import java.util.concurrent.PriorityBlockingQueue;
7

    
8
import javax.xml.ws.wsaddressing.W3CEndpointReference;
9

    
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.dom4j.Document;
13
import org.dom4j.Node;
14
import org.dom4j.io.SAXReader;
15
import org.springframework.beans.factory.annotation.Required;
16

    
17
import com.googlecode.sarasvati.Arc;
18
import com.googlecode.sarasvati.NodeToken;
19

    
20
import eu.dnetlib.enabling.resultset.IterableResultSetFactory;
21
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
22
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
23

    
24
public class UnpackJobNode extends SimpleJobNode {
25

    
26
	/**
27
	 * logger.
28
	 */
29
	private static final Log log = LogFactory.getLog(UnpackJobNode.class);
30

    
31
	private String inputEprParam;
32
	private String outputEprParam;
33
	private String xpath;
34

    
35
	private IterableResultSetFactory iterableResultSetFactory;
36
	private ResultSetClientFactory resultSetClientFactory;
37

    
38
	@Override
39
	protected String execute(final NodeToken token) throws Exception {
40
		final Iterator<String> client = resultSetClientFactory.getClient(token.getEnv().getAttribute(inputEprParam)).iterator();
41
		final Queue<String> queue = new PriorityBlockingQueue<String>();
42

    
43
		while (queue.isEmpty() && client.hasNext()) {
44
			populateQueue(queue, client.next(), xpath);
45
		}
46

    
47
		final W3CEndpointReference epr = iterableResultSetFactory.createIterableResultSet(new Iterable<String>() {
48

    
49
			@Override
50
			public Iterator<String> iterator() {
51
				return new Iterator<String>() {
52

    
53
					@Override
54
					public boolean hasNext() {
55
						synchronized (queue) {
56
							return !queue.isEmpty();
57
						}
58
					}
59

    
60
					@Override
61
					public String next() {
62
						synchronized (queue) {
63
							final String res = queue.poll();
64
							while (queue.isEmpty() && client.hasNext()) {
65
								populateQueue(queue, client.next(), xpath);
66
							}
67
							return res;
68
						}
69
					}
70

    
71
					@Override
72
					public void remove() {}
73
				};
74
			}
75
		});
76

    
77
		token.getEnv().setAttribute(outputEprParam, epr.toString());
78

    
79
		return Arc.DEFAULT_ARC;
80
	}
81

    
82
	private void populateQueue(final Queue<String> queue, final String record, final String xpath) {
83
		try {
84
			final SAXReader reader = new SAXReader();
85
			final Document doc = reader.read(new StringReader(record));
86
			for (Object o : doc.selectNodes(xpath)) {
87
				queue.add(((Node) o).asXML());
88
			}
89
		} catch (Exception e) {
90
			log.error("Error unpacking record: \n" + record, e);
91
			throw new RuntimeException(e);
92
		}
93
	}
94

    
95
	public IterableResultSetFactory getIterableResultSetFactory() {
96
		return iterableResultSetFactory;
97
	}
98

    
99
	@Required
100
	public void setIterableResultSetFactory(final IterableResultSetFactory iterableResultSetFactory) {
101
		this.iterableResultSetFactory = iterableResultSetFactory;
102
	}
103

    
104
	public ResultSetClientFactory getResultSetClientFactory() {
105
		return resultSetClientFactory;
106
	}
107

    
108
	@Required
109
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
110
		this.resultSetClientFactory = resultSetClientFactory;
111
	}
112

    
113
	public String getInputEprParam() {
114
		return inputEprParam;
115
	}
116

    
117
	public void setInputEprParam(final String inputEprParam) {
118
		this.inputEprParam = inputEprParam;
119
	}
120

    
121
	public String getOutputEprParam() {
122
		return outputEprParam;
123
	}
124

    
125
	public void setOutputEprParam(final String outputEprParam) {
126
		this.outputEprParam = outputEprParam;
127
	}
128

    
129
	public String getXpath() {
130
		return xpath;
131
	}
132

    
133
	public void setXpath(final String xpath) {
134
		this.xpath = xpath;
135
	}
136

    
137
}
    (1-1/1)