Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

    
3
import java.io.StringReader;
4
import java.util.Set;
5
import javax.xml.ws.wsaddressing.W3CEndpointReference;
6

    
7
import com.googlecode.sarasvati.Arc;
8
import com.googlecode.sarasvati.NodeToken;
9
import eu.dnetlib.enabling.datasources.LocalOpenaireDatasourceManager;
10
import eu.dnetlib.enabling.resultset.MappedResultSetFactory;
11
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
12
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.dom4j.Document;
16
import org.dom4j.DocumentException;
17
import org.dom4j.Element;
18
import org.dom4j.io.SAXReader;
19
import org.springframework.beans.factory.annotation.Autowired;
20

    
21
public class FilterManagedDatasourcesJobNode extends SimpleJobNode {
22

    
23
	private static final Log log = LogFactory.getLog(FilterManagedDatasourcesJobNode.class);
24

    
25
	private String inputEprParam;
26
	private String outputEprParam;
27

    
28
	@Autowired
29
	private LocalOpenaireDatasourceManager dsManager;
30

    
31
	@Autowired
32
	private MappedResultSetFactory mappedResultSetFactory;
33

    
34
	@Override
35
	protected String execute(final NodeToken token) throws Exception {
36
		final W3CEndpointReference inputEpr = new EPRUtils().getEpr(token.getEnv().getAttribute(getInputEprParam()));
37

    
38
		final Set<String> managedDatasources = dsManager.listManagedDatasourceIds();
39

    
40
		log.info(String.format("found %s managed datasources", managedDatasources.size()));
41

    
42
		final W3CEndpointReference outputEpr = mappedResultSetFactory.createMappedResultSet(inputEpr, s -> filterManaged(s, managedDatasources));
43

    
44
		token.getEnv().setAttribute(getOutputEprParam(), outputEpr.toString());
45

    
46
		return Arc.DEFAULT_ARC;
47
	}
48

    
49
	/**
50
	 * Extracts the datasource id from the input record and checks its existence in the given set.
51
	 * @param data
52
	 * @param filter
53
	 * @return The
54
	 * @throws IllegalStateException
55
	 */
56
	private String filterManaged(final String data, Set<String> filter) throws IllegalStateException {
57
		try {
58
			final Document doc = new SAXReader().read(new StringReader(data));
59

    
60
			final String dsId = doc.valueOf("/record/metadata/ROWS/ROW[@table = 'dsm_datasources']/FIELD[@name = 'id']/text()");
61
			if (filter.contains(dsId)) {
62
				doc.selectSingleNode("/record/metadata/ROWS").detach();
63
				((Element) doc.selectSingleNode("/record/metadata")).addElement("ROWS");
64
				return doc.asXML();
65
			} else return data;
66
		} catch (DocumentException e) {
67
			throw new IllegalStateException(e);
68
		}
69
	}
70

    
71
	public String getInputEprParam() {
72
		return inputEprParam;
73
	}
74

    
75
	public void setInputEprParam(final String inputEprParam) {
76
		this.inputEprParam = inputEprParam;
77
	}
78

    
79
	public String getOutputEprParam() {
80
		return outputEprParam;
81
	}
82

    
83
	public void setOutputEprParam(final String outputEprParam) {
84
		this.outputEprParam = outputEprParam;
85
	}
86

    
87
}
(3-3/24)