Project

General

Profile

1
package eu.dnetlib.data.mdstore.plugins;
2

    
3
import java.io.StringReader;
4
import java.util.Collection;
5
import java.util.Map;
6
import java.util.Objects;
7
import java.util.Set;
8
import java.util.stream.Collectors;
9
import java.util.stream.StreamSupport;
10

    
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.commons.lang3.tuple.ImmutablePair;
13
import org.apache.commons.lang3.tuple.Pair;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.dom4j.Document;
17
import org.dom4j.DocumentException;
18
import org.dom4j.DocumentHelper;
19
import org.dom4j.Element;
20
import org.dom4j.Namespace;
21
import org.dom4j.QName;
22
import org.dom4j.io.SAXReader;
23

    
24
import com.google.common.base.Joiner;
25
import com.mongodb.DBObject;
26
import com.mongodb.client.MongoCollection;
27

    
28
import eu.dnetlib.data.mdstore.modular.mongodb.MongoMDStore;
29
import eu.dnetlib.miscutils.functional.hash.Hashing;
30
import eu.dnetlib.rmi.data.MDStoreServiceException;
31

    
32
public class DedupPublicationsPlugin extends AbstractIstiMDStorePlugin {
33

    
34
	private static final Log log = LogFactory.getLog(DedupPublicationsPlugin.class);
35

    
36
	private static final Namespace OAI_NS = new Namespace("oai", "http://www.openarchives.org/OAI/2.0/");
37
	private static final Namespace DRI_NS = new Namespace("dri", "http://www.driver-repository.eu/namespace/dri");
38

    
39
	@Override
40
	public void process(final Map<String, String> params) throws MDStoreServiceException {
41
		// TODO Auto-generated method stub
42
		log.info("*****************************************");
43
		log.info("* EXECUTING DEDUP...");
44
		params.entrySet().forEach(e -> log.info("*   PARAM " + e.getKey() + " = " + e.getValue()));
45
		log.info("*****************************************");
46

    
47
		final MongoMDStore sourceStore = resolveStore(params, "sourceMdId");
48
		final MongoMDStore targetStore = resolveStore(params, "targetMdId");
49

    
50
		final MongoCollection<DBObject> coll = sourceStore.getCollection();
51

    
52
		final Collection<Set<String>> groups = StreamSupport.stream(coll.find().spliterator(), false)
53
				.map(this::createDedupEntry)
54
				.filter(Objects::nonNull)
55
				.collect(Collectors.groupingBy(Pair::getRight, Collectors.mapping(Pair::getLeft, Collectors.toSet())))
56
				.values();
57

    
58
		targetStore.truncate();
59

    
60
		targetStore.feed(() -> groups.stream()
61
				.map(ids -> pack(ids, sourceStore))
62
				.iterator(), false);
63

    
64
		touch(targetStore);
65
	}
66

    
67
	private Pair<String, String> createDedupEntry(final DBObject obj) {
68

    
69
		try {
70
			final Document doc = (new SAXReader()).read(new StringReader(obj.get("body").toString()));
71
			final String id = (obj.get("id").toString());
72
			final String title = doc.valueOf("//*[local-name() = 'title' and position() = 1]/text()");
73
			final String type = doc.valueOf("//*[local-name() = 'resourceType']/text()");
74

    
75
			if (StringUtils.isNotBlank(id) && StringUtils.isNotBlank(title)) { return ImmutablePair.of(id, clean(title) + "@@@" + clean(type)); }
76
		} catch (final DocumentException e) {
77
			log.warn("Problem parsing a mdstore record");
78
		}
79
		return null;
80
	}
81

    
82
	private String clean(final String s) {
83
		return s == null ? "" : s.replaceAll("\\W", "").toLowerCase();
84
	}
85

    
86
	private String pack(final Set<String> ids, final MongoMDStore store) {
87

    
88
		final String newId = "dedup_______::" + Hashing.md5(Joiner.on(",").join(ids));
89

    
90
		final Document doc = DocumentHelper.createDocument();
91
		final Element root = doc.addElement(new QName("record", OAI_NS));
92

    
93
		root.addElement(new QName("header", OAI_NS)).addElement(new QName("objIdentifier", DRI_NS)).setText(newId);
94
		final Element pkg = root.addElement("package");
95

    
96
		ids.forEach(id -> {
97
			try {
98
				final Document innerDoc = (new SAXReader()).read(new StringReader(store.getRecord(id)));
99
				final Element record = pkg.addElement("record");
100
				record.addAttribute("provenance", id.startsWith("people") ? "people" : id.startsWith("puma") ? "puma" : "unknown");
101
				record.addAttribute("trust", id.startsWith("people") ? "0.7" : id.startsWith("puma") ? "0.8" : "0.6");
102
				record.add(innerDoc.selectSingleNode("//*[local-name() = 'metadata']/*[local-name() = 'resource']").detach());
103
			} catch (final Exception e) {
104
				log.error("Error generating package: " + ids, e);
105
			}
106
		});
107

    
108
		return doc.asXML();
109

    
110
	}
111

    
112
}
(2-2/5)