Project

General

Profile

« Previous | Next » 

Revision 44637

partial implementation of merge node

View differences:

modules/dnet-isti/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/merge/MergeAlgorithm.java
1
package eu.dnetlib.msro.workflows.nodes.merge;
2

  
3
public enum MergeAlgorithm {
4
	REPLACE, APPEND
5
}
modules/dnet-isti/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/merge/FieldRule.java
1
package eu.dnetlib.msro.workflows.nodes.merge;
2

  
3
public class FieldRule {
4

  
5
	private String name;
6
	private MergeAlgorithm algo;
7
	private String parentXpath;
8
	private String xpath;
9

  
10
	public FieldRule() {}
11

  
12
	public FieldRule(final String name, final MergeAlgorithm algo, final String parentXpath, final String xpath) {
13
		this.name = name;
14
		this.algo = algo;
15
		this.parentXpath = parentXpath;
16
		this.xpath = xpath;
17
	}
18

  
19
	public String getName() {
20
		return name;
21
	}
22

  
23
	public void setName(final String name) {
24
		this.name = name;
25
	}
26

  
27
	public MergeAlgorithm getAlgo() {
28
		return algo;
29
	}
30

  
31
	public void setAlgo(final MergeAlgorithm algo) {
32
		this.algo = algo;
33
	}
34

  
35
	public String getParentXpath() {
36
		return parentXpath;
37
	}
38

  
39
	public void setParentXpath(final String parentXpath) {
40
		this.parentXpath = parentXpath;
41
	}
42

  
43
	public String getXpath() {
44
		return xpath;
45
	}
46

  
47
	public void setXpath(final String xpath) {
48
		this.xpath = xpath;
49
	}
50

  
51
}
modules/dnet-isti/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/MergeRecordsJobNode.java
1
package eu.dnetlib.msro.workflows.nodes;
2

  
3
import java.io.StringReader;
4
import java.util.ArrayList;
5
import java.util.List;
6
import java.util.stream.Collectors;
7

  
8
import org.apache.commons.io.IOUtils;
9
import org.apache.commons.lang3.math.NumberUtils;
10
import org.apache.commons.lang3.tuple.Pair;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.dom4j.Document;
14
import org.dom4j.DocumentException;
15
import org.dom4j.Element;
16
import org.dom4j.Node;
17
import org.dom4j.io.SAXReader;
18
import org.springframework.beans.factory.annotation.Autowired;
19
import org.springframework.core.io.ClassPathResource;
20

  
21
import com.google.gson.Gson;
22
import com.google.gson.reflect.TypeToken;
23

  
24
import eu.dnetlib.enabling.resultset.factory.ResultSetFactory;
25
import eu.dnetlib.msro.workflows.graph.Arc;
26
import eu.dnetlib.msro.workflows.nodes.merge.FieldRule;
27
import eu.dnetlib.msro.workflows.nodes.merge.MergeAlgorithm;
28
import eu.dnetlib.msro.workflows.procs.Env;
29
import eu.dnetlib.rmi.common.ResultSet;
30
import eu.dnetlib.rmi.manager.MSROException;
31

  
32
public class MergeRecordsJobNode extends SimpleJobNode {
33

  
34
	private String inputEprParam;
35
	private String outputEprParam;
36
	private String rule;
37

  
38
	private List<FieldRule> fields = new ArrayList<>();
39

  
40
	private static final Log log = LogFactory.getLog(MergeRecordsJobNode.class);
41

  
42
	@Autowired
43
	private ResultSetFactory resultSetFactory;
44

  
45
	@Override
46
	protected String execute(final Env env) throws Exception {
47
		final ResultSet<?> rsIn = env.getAttribute(inputEprParam, ResultSet.class);
48
		if ((rsIn == null)) { throw new MSROException("InputEprParam (" + inputEprParam + ") not found in ENV"); }
49

  
50
		fields = new Gson().fromJson(
51
				IOUtils.toString(new ClassPathResource(rule).getInputStream()),
52
				new TypeToken<List<FieldRule>>() {}.getType());
53

  
54
		final ResultSet<String> rsOut = resultSetFactory.map(rsIn, String.class, this::merge);
55

  
56
		env.setAttribute(outputEprParam, rsOut);
57

  
58
		return Arc.DEFAULT_ARC;
59
	}
60

  
61
	@SuppressWarnings("unchecked")
62
	private String merge(final String pkg) {
63
		try {
64
			final Document doc = new SAXReader().read(new StringReader(pkg));
65

  
66
			final Element pkgNode = (Element) doc.selectSingleNode("//package");
67

  
68
			final List<Element> records = ((List<Element>) pkgNode.elements())
69
					.stream()
70
					.filter(e -> e.getName() == "record")
71
					.map(e -> {
72
						final Element record = (Element) e.selectSingleNode("./*[local-name() = 'resource']");
73
						final float trust = NumberUtils.toFloat(e.valueOf("@trust"), 0.1f);
74
						return Pair.of(record, trust);
75
					})
76
					.sorted((x, y) -> Float.compare(y.getRight(), x.getRight()))
77
					.map(Pair::getLeft)
78
					.collect(Collectors.toList());
79

  
80
			if (records.isEmpty()) {
81
				log.error("Empty package");
82
				throw new RuntimeException("Empty package");
83
			}
84

  
85
			final Element master = records.get(0);
86
			for (final FieldRule rule : fields) {
87
				for (int i = 1; (i < records.size()) && isNotComplete(master, rule); i++) {
88
					merge(master, records.get(i), rule);
89
				}
90
			}
91

  
92
			for (final Object o : master.selectNodes(".//*[local-name()='project']")) {
93
				((Node) o).detach();
94
			}
95
			for (final Object o : pkgNode.selectNodes("//projects/*")) {
96
				master.add(((Node) o).detach());
97
			}
98

  
99
			pkgNode.getParent().add(master.detach());
100

  
101
			pkgNode.detach();
102

  
103
			return doc.asXML();
104

  
105
		} catch (final DocumentException e) {
106
			log.error("Error processing package: " + pkg, e);
107
			throw new RuntimeException("Error processing package: " + pkg, e);
108
		}
109
	}
110

  
111
	@SuppressWarnings("unchecked")
112
	private void merge(final Element master, final Element other, final FieldRule rule) {
113
		final Element nodeMaster = (Element) master.selectSingleNode(rule.getParentXpath());
114

  
115
		if (nodeMaster == null) {
116
			log.warn("Missing parent node in master record, xpath: " + rule.getParentXpath());
117
			return;
118
		}
119

  
120
		final Element nodeOther = (Element) other.selectSingleNode(rule.getParentXpath());
121

  
122
		if (nodeOther == null) {
123
			log.warn("Missing parent node in other record, xpath: " + rule.getParentXpath());
124
			return;
125
		}
126

  
127
		nodeOther.selectNodes(rule.getXpath()).forEach(o -> {
128
			nodeMaster.add(((Node) o).detach());
129
		});
130
	}
131

  
132
	private boolean isNotComplete(final Element master, final FieldRule rule) {
133
		return ((rule.getAlgo() == MergeAlgorithm.APPEND) || master.selectSingleNode(rule.getParentXpath()).selectNodes(rule.getXpath()).isEmpty());
134
	}
135

  
136
	public String getInputEprParam() {
137
		return inputEprParam;
138
	}
139

  
140
	public void setInputEprParam(final String inputEprParam) {
141
		this.inputEprParam = inputEprParam;
142
	}
143

  
144
	public String getOutputEprParam() {
145
		return outputEprParam;
146
	}
147

  
148
	public void setOutputEprParam(final String outputEprParam) {
149
		this.outputEprParam = outputEprParam;
150
	}
151

  
152
	public String getRule() {
153
		return rule;
154
	}
155

  
156
	public void setRule(final String rule) {
157
		this.rule = rule;
158
	}
159

  
160
}
modules/dnet-isti/trunk/src/main/java/eu/dnetlib/msro/workflows/nodes/FetchMultipleMDStoresJobNode.java
22 22
	private String interpretation;
23 23
	private String eprParam;
24 24

  
25
	private static final Log log = LogFactory.getLog(FetchMultipleMDStoresJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
25
	private static final Log log = LogFactory.getLog(FetchMultipleMDStoresJobNode.class);
26 26

  
27 27
	@Autowired
28 28
	private UniqueServiceLocator serviceLocator;
modules/dnet-isti/trunk/src/main/resources/eu/dnetlib/isti/applicationContext-msro-isti.xml
3 3
       xmlns="http://www.springframework.org/schema/beans"
4 4
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd">
5 5

  
6

  
7

  
8
	<!-- Wf Nodes -->
6 9
	<bean id="wfNodeObtainISTIDataSourceParams"
7 10
		class="eu.dnetlib.msro.workflows.nodes.ObtainISTIDataSourceParamsJobNode"
8 11
		scope="prototype" />
9 12

  
10

  
11 13
	<bean id="wfNodeFetchMultipleMDStores"
12 14
		class="eu.dnetlib.msro.workflows.nodes.FetchMultipleMDStoresJobNode"
13 15
		scope="prototype" />
14
		
16

  
17
	<bean id="wfNodeMergeRecords"
18
		class="eu.dnetlib.msro.workflows.nodes.MergeRecordsJobNode"
19
		scope="prototype" />
20

  
21
	<!-- MDStore plugins -->
15 22
	<bean id="dedupMDstorePlugin"
16 23
		class="eu.dnetlib.data.mdstore.plugins.DedupPublicationsPlugin" />
17 24

  
......
20 27

  
21 28
	<bean id="enrichProjectsMDstorePlugin"
22 29
		class="eu.dnetlib.data.mdstore.plugins.EnrichProjectsPlugin" />
30
		
23 31
	
24 32

  
25 33
</beans>
modules/dnet-isti/trunk/src/main/resources/eu/dnetlib/isti/workflows/merge/unpack.json
1
[ {
2
	"name"       : "title",
3
	"algo"       : "REPLACE",
4
	"parentXpath": "./*[local-name()='titles']",
5
	"xpath"      : "./*[local-name()='title']"
6
},{
7
	"name"       : "author",
8
	"algo"       : "REPLACE",
9
	"parentXpath": "./*[local-name()='creators']",
10
	"xpath"      : "./*[local-name()='creatorName']"
11
},{
12
	"name"       : "id",
13
	"algo"       : "APPEND",
14
	"parentXpath": ".",
15
	"xpath"      : "./*[local-name()='identifier']"
16
},{
17
	"name"       : "url",
18
	"algo"       : "APPEND",
19
	"parentXpath": "./*[local-name()='alternateIdentifiers']",
20
	"xpath"      : "./*[local-name()='alternateIdentifier' and @alternateIdentifierType='url']"
21
} ]
22

  
23

  
24

  
modules/dnet-isti/trunk/src/main/resources/eu/dnetlib/bootstrap/profiles/workflows/infospace/merge.xml
54 54
					</ARCS>
55 55
				</NODE>
56 56
				
57
				<NODE name="merge" type="ApplyXslt">
57
				<NODE name="merge" type="MergeRecords">
58 58
					<DESCRIPTION>Merge</DESCRIPTION>
59 59
					<PARAMETERS>
60 60
						<PARAM name="inputEprParam" value="source_epr" />
61 61
						<PARAM name="outputEprParam" value="dedup_epr"/>
62
						<PARAM name="xsltClasspath" value="/eu/dnetlib/isti/xslt/unpack.xslt" />
62
						<PARAM name="rule" value="/eu/dnetlib/isti/workflows/merge/unpack.json" />
63 63
					</PARAMETERS>
64 64
					<ARCS>
65 65
						<ARC to="storeRecords"/>

Also available in: Unified diff