Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

    
3
import java.io.StringReader;
4
import java.util.Set;
5

    
6
import javax.xml.ws.wsaddressing.W3CEndpointReference;
7

    
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.dom4j.Document;
11
import org.dom4j.DocumentException;
12
import org.dom4j.Element;
13
import org.dom4j.io.SAXReader;
14
import org.springframework.beans.factory.annotation.Autowired;
15

    
16
import com.googlecode.sarasvati.Arc;
17
import com.googlecode.sarasvati.NodeToken;
18

    
19
import eu.dnetlib.enabling.datasources.LocalOpenaireDatasourceManager;
20
import eu.dnetlib.enabling.resultset.MappedResultSetFactory;
21
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
22
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
23

    
24
public class FilterManagedDatasourcesJobNode extends SimpleJobNode {
25

    
26
	private static final Log log = LogFactory.getLog(FilterManagedDatasourcesJobNode.class);
27

    
28
	private String inputEprParam;
29
	private String outputEprParam;
30

    
31
	@Autowired
32
	private LocalOpenaireDatasourceManager dsManager;
33

    
34
	@Autowired
35
	private MappedResultSetFactory mappedResultSetFactory;
36

    
37
	@Override
38
	protected String execute(final NodeToken token) throws Exception {
39
		final W3CEndpointReference inputEpr = new EPRUtils().getEpr(token.getEnv().getAttribute(getInputEprParam()));
40

    
41
		final Set<String> managedDatasources = dsManager.listManagedDatasourceIds();
42

    
43
		log.info(String.format("found %s managed datasources", managedDatasources.size()));
44

    
45
		final W3CEndpointReference outputEpr = mappedResultSetFactory.createMappedResultSet(inputEpr, s -> filterManaged(s, managedDatasources));
46

    
47
		token.getEnv().setAttribute(getOutputEprParam(), outputEpr.toString());
48

    
49
		return Arc.DEFAULT_ARC;
50
	}
51

    
52
	/**
53
	 * Extracts the datasource id from the input record and checks its existence in the given set.
54
	 *
55
	 * @param data
56
	 * @param filter
57
	 * @return The
58
	 * @throws IllegalStateException
59
	 */
60
	private String filterManaged(final String data, final Set<String> filter) throws IllegalStateException {
61
		try {
62
			final Document doc = new SAXReader().read(new StringReader(data));
63

    
64
			final String dsId = doc.valueOf("/record/metadata/ROWS/ROW[@table = 'dsm_services']/FIELD[@name = 'id']/text()");
65
			if (filter.contains(dsId)) {
66
				doc.selectSingleNode("/record/metadata/ROWS").detach();
67
				((Element) doc.selectSingleNode("/record/metadata")).addElement("ROWS");
68
				return doc.asXML();
69
			} else {
70
				return data;
71
			}
72
		} catch (final DocumentException e) {
73
			throw new IllegalStateException(e);
74
		}
75
	}
76

    
77
	public String getInputEprParam() {
78
		return inputEprParam;
79
	}
80

    
81
	public void setInputEprParam(final String inputEprParam) {
82
		this.inputEprParam = inputEprParam;
83
	}
84

    
85
	public String getOutputEprParam() {
86
		return outputEprParam;
87
	}
88

    
89
	public void setOutputEprParam(final String outputEprParam) {
90
		this.outputEprParam = outputEprParam;
91
	}
92

    
93
}
(3-3/24)