Project

General

Profile

« Previous | Next » 

Revision 50420

updated aggregation workflow used for datasource entity registries: introduced node used to rule out the information associated to the managed data sources

View differences:

modules/dnet-openaireplus-workflows/branches/dsm/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/FilterManagedDatasourcesJobNode.java
1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

  
3
import java.io.StringReader;
4
import java.util.Set;
5
import javax.xml.ws.wsaddressing.W3CEndpointReference;
6

  
7
import com.googlecode.sarasvati.Arc;
8
import com.googlecode.sarasvati.NodeToken;
9
import eu.dnetlib.enabling.datasources.LocalOpenaireDatasourceManager;
10
import eu.dnetlib.enabling.resultset.MappedResultSetFactory;
11
import eu.dnetlib.enabling.resultset.client.utils.EPRUtils;
12
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.dom4j.Document;
16
import org.dom4j.DocumentException;
17
import org.dom4j.Element;
18
import org.dom4j.io.SAXReader;
19
import org.springframework.beans.factory.annotation.Autowired;
20

  
21
public class FilterManagedDatasourcesJobNode extends SimpleJobNode {
22

  
23
	private static final Log log = LogFactory.getLog(FilterManagedDatasourcesJobNode.class);
24

  
25
	private String inputEprParam;
26
	private String outputEprParam;
27

  
28
	@Autowired
29
	private LocalOpenaireDatasourceManager dsManager;
30

  
31
	@Autowired
32
	private MappedResultSetFactory mappedResultSetFactory;
33

  
34
	@Override
35
	protected String execute(final NodeToken token) throws Exception {
36
		final W3CEndpointReference inputEpr = new EPRUtils().getEpr(token.getEnv().getAttribute(getInputEprParam()));
37

  
38
		final Set<String> managedDatasources = dsManager.listManagedDatasourceIds();
39

  
40
		log.info(String.format("found %s managed datasources", managedDatasources.size()));
41

  
42
		final W3CEndpointReference outputEpr = mappedResultSetFactory.createMappedResultSet(inputEpr, s -> filterManaged(s, managedDatasources));
43

  
44
		token.getEnv().setAttribute(getOutputEprParam(), outputEpr.toString());
45

  
46
		return Arc.DEFAULT_ARC;
47
	}
48

  
49
	/**
50
	 * Extracts the datasource id from the input record and checks its existence in the given set.
51
	 * @param data
52
	 * @param filter
53
	 * @return The
54
	 * @throws IllegalStateException
55
	 */
56
	private String filterManaged(final String data, Set<String> filter) throws IllegalStateException {
57
		try {
58
			final Document doc = new SAXReader().read(new StringReader(data));
59

  
60
			final String dsId = doc.valueOf("/record/metadata/ROWS/ROW[@table = 'dsm_datasources']/FIELD[@name = 'id']/text()");
61
			if (filter.contains(dsId)) {
62
				doc.selectSingleNode("/record/metadata/ROWS").detach();
63
				((Element) doc.selectSingleNode("/record/metadata")).addElement("ROWS");
64
				return doc.asXML();
65
			} else return data;
66
		} catch (DocumentException e) {
67
			throw new IllegalStateException(e);
68
		}
69
	}
70

  
71
	public String getInputEprParam() {
72
		return inputEprParam;
73
	}
74

  
75
	public void setInputEprParam(final String inputEprParam) {
76
		this.inputEprParam = inputEprParam;
77
	}
78

  
79
	public String getOutputEprParam() {
80
		return outputEprParam;
81
	}
82

  
83
	public void setOutputEprParam(final String outputEprParam) {
84
		this.outputEprParam = outputEprParam;
85
	}
86

  
87
}
modules/dnet-openaireplus-workflows/branches/dsm/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/repo-hi/Aggregate_Metadata_from_EntityRegistry_Native.xml
9 9
		<DATE_OF_CREATION value="2006-05-04T18:13:51.0Z"/>
10 10
	</HEADER>
11 11
	<BODY>
12
		<WORKFLOW_NAME>Aggregate Metadata (datasource, organization and person entities) [Ingestion]</WORKFLOW_NAME>
12
		<WORKFLOW_NAME>Aggregate Metadata (datasource and organization entities) [Ingestion]</WORKFLOW_NAME>
13 13
		<WORKFLOW_INFO>
14 14
			<FIELD name="Action">Aggregate Metadata</FIELD>
15 15
			<FIELD name="Consequence IS">Ingestion</FIELD>
16 16
			<FIELD name="Datasource class">EntityRegistry, Aggregator</FIELD>
17
			<FIELD name="Content">datasource, organization and person entities</FIELD>
17
			<FIELD name="Content">datasource and organization entities</FIELD>
18 18
		</WORKFLOW_INFO>
19 19
		<WORKFLOW_TYPE>REPO_HI</WORKFLOW_TYPE>
20 20
		<WORKFLOW_PRIORITY>20</WORKFLOW_PRIORITY>
......
42 42
			<NODE name="createMetaWf" type="RegisterMetaWf">
43 43
				<DESCRIPTION>Create MetaWorkflow</DESCRIPTION>
44 44
				<PARAMETERS>
45
					<PARAM name="wfName" managedBy="system" required="true" type="string">Aggregate Metadata (datasource, organization and person entities) [Ingestion]</PARAM>
45
					<PARAM name="wfName" managedBy="system" required="true" type="string">Aggregate Metadata (datasource and organization entities) [Ingestion]</PARAM>
46 46
				</PARAMETERS>
47 47
				<ARCS>
48 48
					<ARC to="createGMF"/>
modules/dnet-openaireplus-workflows/branches/dsm/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/repo-hi/entityreg/transform.wf.st
64 64
		<PARAM required="true" type="string" name="outputEprParam" managedBy="system">cleaned_row_epr</PARAM>
65 65
	</PARAMETERS>
66 66
	<ARCS>
67
		<ARC to="storeRowRecords"/>
67
		<ARC to="filterManaged"/>
68 68
	</ARCS>
69 69
</NODE>
70 70

  
71
<NODE name="filterManaged" type="FilterManagedDatasources">
72
    <DESCRIPTION>Filter DB rows according to the managed flag state</DESCRIPTION>
73
    <PARAMETERS>
74
        <PARAM managedBy="system" name="inputEprParam" required="true" type="string">cleaned_row_epr</PARAM>
75
        <PARAM managedBy="system" name="outputEprParam" required="true" type="string">filtered_row_epr</PARAM>
76
    </PARAMETERS>
77
    <ARCS>
78
        <ARC to="storeRowRecords"/>
79
    </ARCS>
80
</NODE>
81

  
71 82
<NODE name="storeRowRecords" type="StoreMDStoreRecords">
72 83
	<DESCRIPTION>Store mdtore records</DESCRIPTION>
73 84
	<PARAMETERS>
modules/dnet-openaireplus-workflows/branches/dsm/src/main/resources/eu/dnetlib/msro/openaireplus/workflows/applicationContext-msro-openaire-nodes.xml
295 295
	      class="eu.dnetlib.msro.openaireplus.workflows.nodes.claims.ApplyClaimUpdatesJobNode"
296 296
	      scope="prototype"/>
297 297

  
298
	<bean id="wfNodeFilterManagedDatasources"
299
	      class="eu.dnetlib.msro.openaireplus.workflows.nodes.FilterManagedDatasourcesJobNode"
300
	      scope="prototype"/>
301

  
298 302
</beans>

Also available in: Unified diff