Project

General

Profile

1
package eu.dnetlib.msro.workflows.nodes;
2

    
3
import java.io.StringReader;
4
import java.util.ArrayList;
5
import java.util.List;
6
import java.util.stream.Collectors;
7

    
8
import org.apache.commons.io.IOUtils;
9
import org.apache.commons.lang3.math.NumberUtils;
10
import org.apache.commons.lang3.tuple.Pair;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.dom4j.Document;
14
import org.dom4j.DocumentException;
15
import org.dom4j.Element;
16
import org.dom4j.Namespace;
17
import org.dom4j.Node;
18
import org.dom4j.QName;
19
import org.dom4j.io.SAXReader;
20
import org.springframework.beans.factory.annotation.Autowired;
21
import org.springframework.core.io.ClassPathResource;
22

    
23
import com.google.gson.Gson;
24
import com.google.gson.reflect.TypeToken;
25

    
26
import eu.dnetlib.enabling.resultset.factory.ResultSetFactory;
27
import eu.dnetlib.msro.workflows.graph.Arc;
28
import eu.dnetlib.msro.workflows.nodes.merge.FieldRule;
29
import eu.dnetlib.msro.workflows.nodes.merge.MergeAlgorithm;
30
import eu.dnetlib.msro.workflows.procs.Env;
31
import eu.dnetlib.rmi.common.ResultSet;
32
import eu.dnetlib.rmi.manager.MSROException;
33

    
34
public class MergeRecordsJobNode extends SimpleJobNode {
35

    
36
	private String inputEprParam;
37
	private String outputEprParam;
38
	private String rule;
39

    
40
	private List<FieldRule> fields = new ArrayList<>();
41

    
42
	private static final Log log = LogFactory.getLog(MergeRecordsJobNode.class);
43

    
44
	@Autowired
45
	private ResultSetFactory resultSetFactory;
46

    
47
	@Override
48
	protected String execute(final Env env) throws Exception {
49
		final ResultSet<?> rsIn = env.getAttribute(inputEprParam, ResultSet.class);
50
		if ((rsIn == null)) { throw new MSROException("InputEprParam (" + inputEprParam + ") not found in ENV"); }
51

    
52
		fields = new Gson().fromJson(
53
				IOUtils.toString(new ClassPathResource(rule).getInputStream()),
54
				new TypeToken<List<FieldRule>>() {}.getType());
55

    
56
		final ResultSet<String> rsOut = resultSetFactory.map(rsIn, String.class, this::merge);
57

    
58
		env.setAttribute(outputEprParam, rsOut);
59

    
60
		return Arc.DEFAULT_ARC;
61
	}
62

    
63
	@SuppressWarnings("unchecked")
64
	private String merge(final String pkg) {
65
		try {
66
			final Document doc = new SAXReader().read(new StringReader(pkg));
67

    
68
			final Element pkgNode = (Element) doc.selectSingleNode("//package");
69

    
70
			final List<Element> records = ((List<Element>) pkgNode.elements())
71
					.stream()
72
					.filter(e -> e.getName() == "record")
73
					.map(e -> {
74
						final Element record = (Element) e.selectSingleNode("./*[local-name() = 'resource']");
75
						final float trust = NumberUtils.toFloat(e.valueOf("@trust"), 0.1f);
76
						return Pair.of(record, trust);
77
					})
78
					.sorted((x, y) -> Float.compare(y.getRight(), x.getRight()))
79
					.map(Pair::getLeft)
80
					.collect(Collectors.toList());
81

    
82
			if (records.isEmpty()) {
83
				log.error("Empty package");
84
				throw new RuntimeException("Empty package");
85
			}
86

    
87
			final Element master = records.get(0);
88
			for (final FieldRule rule : fields) {
89
				for (int i = 1; (i < records.size()) && isNotComplete(master, rule); i++) {
90
					merge(master, records.get(i), rule);
91
				}
92
			}
93

    
94
			final Element projectsNode = (Element) master.selectSingleNode("./*[local-name()='projects']");
95
			for (final Object o : projectsNode.selectNodes("./*")) {
96
				((Node) o).detach();
97
			}
98
			for (final Object o : pkgNode.selectNodes("./*[local-name()='projects']/*")) {
99
				projectsNode.add(((Node) o).detach());
100
			}
101

    
102
			pkgNode.getParent()
103
					.addElement(new QName("metadata", new Namespace("oai", "http://www.openarchives.org/OAI/2.0/")))
104
					.add(master.detach());
105

    
106
			pkgNode.detach();
107

    
108
			return doc.asXML();
109

    
110
		} catch (final DocumentException e) {
111
			log.error("Error processing package: " + pkg, e);
112
			throw new RuntimeException("Error processing package: " + pkg, e);
113
		}
114
	}
115

    
116
	@SuppressWarnings("unchecked")
117
	private void merge(final Element master, final Element other, final FieldRule rule) {
118
		final Element nodeMaster = (Element) master.selectSingleNode(rule.getParentXpath());
119

    
120
		if (nodeMaster == null) {
121
			log.warn("Missing parent node in MASTER record, xpath: " + rule.getParentXpath());
122
			return;
123
		}
124

    
125
		final Element nodeOther = (Element) other.selectSingleNode(rule.getParentXpath());
126

    
127
		if (nodeOther == null) {
128
			log.warn("Missing parent node in SECOND record, xpath: " + rule.getParentXpath());
129
			return;
130
		}
131

    
132
		nodeOther.selectNodes(rule.getXpath()).forEach(o -> {
133
			nodeMaster.add(((Node) o).detach());
134
		});
135
	}
136

    
137
	private boolean isNotComplete(final Element master, final FieldRule rule) {
138
		return ((rule.getAlgo() == MergeAlgorithm.APPEND) || master.selectSingleNode(rule.getParentXpath()).selectNodes(rule.getXpath()).isEmpty());
139
	}
140

    
141
	public String getInputEprParam() {
142
		return inputEprParam;
143
	}
144

    
145
	public void setInputEprParam(final String inputEprParam) {
146
		this.inputEprParam = inputEprParam;
147
	}
148

    
149
	public String getOutputEprParam() {
150
		return outputEprParam;
151
	}
152

    
153
	public void setOutputEprParam(final String outputEprParam) {
154
		this.outputEprParam = outputEprParam;
155
	}
156

    
157
	public String getRule() {
158
		return rule;
159
	}
160

    
161
	public void setRule(final String rule) {
162
		this.rule = rule;
163
	}
164

    
165
}
(2-2/3)