Project

General

Profile

1 50420 claudio.at
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2
3
import java.io.StringReader;
4
import java.util.Set;
5
import javax.xml.ws.wsaddressing.W3CEndpointReference;
6
7
import com.googlecode.sarasvati.Arc;
8
import com.googlecode.sarasvati.NodeToken;
9
import eu.dnetlib.enabling.datasources.LocalOpenaireDatasourceManager;
10
import eu.dnetlib.enabling.resultset.MappedResultSetFactory;
11
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
12
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.dom4j.Document;
16
import org.dom4j.DocumentException;
17
import org.dom4j.Element;
18
import org.dom4j.io.SAXReader;
19
import org.springframework.beans.factory.annotation.Autowired;
20
21
public class FilterManagedDatasourcesJobNode extends SimpleJobNode {
22
23
	private static final Log log = LogFactory.getLog(FilterManagedDatasourcesJobNode.class);
24
25
	private String inputEprParam;
26
	private String outputEprParam;
27
28
	@Autowired
29
	private LocalOpenaireDatasourceManager dsManager;
30
31
	@Autowired
32
	private MappedResultSetFactory mappedResultSetFactory;
33
34
	@Override
35
	protected String execute(final NodeToken token) throws Exception {
36
		final W3CEndpointReference inputEpr = new EPRUtils().getEpr(token.getEnv().getAttribute(getInputEprParam()));
37
38
		final Set<String> managedDatasources = dsManager.listManagedDatasourceIds();
39
40
		log.info(String.format("found %s managed datasources", managedDatasources.size()));
41
42
		final W3CEndpointReference outputEpr = mappedResultSetFactory.createMappedResultSet(inputEpr, s -> filterManaged(s, managedDatasources));
43
44
		token.getEnv().setAttribute(getOutputEprParam(), outputEpr.toString());
45
46
		return Arc.DEFAULT_ARC;
47
	}
48
49
	/**
50
	 * Extracts the datasource id from the input record and checks its existence in the given set.
51
	 * @param data
52
	 * @param filter
53
	 * @return The
54
	 * @throws IllegalStateException
55
	 */
56
	private String filterManaged(final String data, Set<String> filter) throws IllegalStateException {
57
		try {
58
			final Document doc = new SAXReader().read(new StringReader(data));
59
60
			final String dsId = doc.valueOf("/record/metadata/ROWS/ROW[@table = 'dsm_datasources']/FIELD[@name = 'id']/text()");
61
			if (filter.contains(dsId)) {
62
				doc.selectSingleNode("/record/metadata/ROWS").detach();
63
				((Element) doc.selectSingleNode("/record/metadata")).addElement("ROWS");
64
				return doc.asXML();
65
			} else return data;
66
		} catch (DocumentException e) {
67
			throw new IllegalStateException(e);
68
		}
69
	}
70
71
	public String getInputEprParam() {
72
		return inputEprParam;
73
	}
74
75
	public void setInputEprParam(final String inputEprParam) {
76
		this.inputEprParam = inputEprParam;
77
	}
78
79
	public String getOutputEprParam() {
80
		return outputEprParam;
81
	}
82
83
	public void setOutputEprParam(final String outputEprParam) {
84
		this.outputEprParam = outputEprParam;
85
	}
86
87
}