Project

General

Profile

1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.csv;
2

    
3
import java.io.InputStreamReader;
4
import java.io.Reader;
5
import java.net.URL;
6
import java.util.Iterator;
7
import java.util.Set;
8
import java.util.stream.Stream;
9

    
10
import org.apache.commons.csv.CSVFormat;
11
import org.apache.commons.csv.CSVParser;
12
import org.apache.commons.lang3.StringUtils;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.dom4j.Document;
16
import org.dom4j.DocumentHelper;
17
import org.dom4j.Element;
18

    
19
import com.google.common.collect.Iterators;
20

    
21
import eu.dnetlib.enabling.tools.DnetStreamSupport;
22
import eu.dnetlib.msro.workers.aggregation.collect.CollectException;
23
import eu.dnetlib.rmi.data.InterfaceDescriptor;
24
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
25

    
26
/**
27
 * The Class HttpCSVCollectorPlugin.
28
 */
29
public class HttpCSVCollectorPlugin implements CollectorPlugin {
30

    
31
	private static final Log log = LogFactory.getLog(HttpCSVCollectorPlugin.class);
32

    
33
	/*
34
	 * (non-Javadoc)
35
	 *
36
	 * @see eu.dnetlib.msro.workers.aggregation.collect.plugin.CollectorPlugin#collect(eu.dnetlib.msro.workers.aggregation.collect.rmi.
37
	 * InterfaceDescriptor, java.lang.String, java.lang.String)
38
	 */
39
	@Override
40
	public Stream<String> collect(final InterfaceDescriptor descriptor, final String fromDate, final String untilDate) throws CollectException {
41
		return DnetStreamSupport.generateStreamFromIterator(getIterator(descriptor));
42
	}
43

    
44
	private Iterator<String> getIterator(final InterfaceDescriptor descriptor) {
45
		try {
46
			final URL url = new URL(descriptor.getBaseUrl());
47
			url.openConnection();
48

    
49
			final String separatorString = descriptor.getParams().get("separator");
50
			final String identifier = descriptor.getParams().get("identifier");
51
			final String quote = descriptor.getParams().get("quote");
52
			final char separator = separatorString.equals("\\t") || StringUtils.isBlank(separatorString) ? '\t' : separatorString.charAt(0);
53

    
54
			final CSVFormat format = StringUtils.isBlank(quote) ? CSVFormat.EXCEL.withHeader().withDelimiter(separator)
55
					: CSVFormat.EXCEL.withHeader().withDelimiter(separator).withQuote(quote.charAt(0));
56

    
57
			try (final Reader reader = new InputStreamReader(url.openStream());
58
					final CSVParser parser = new CSVParser(reader, format)) {
59

    
60
				final Set<String> headers = parser.getHeaderMap().keySet();
61

    
62
				return Iterators.transform(parser.iterator(), input -> {
63
					final Document document = DocumentHelper.createDocument();
64
					final Element root = document.addElement("csvRecord");
65
					for (final String key : headers) {
66
						final Element row = root.addElement("column");
67
						row.addAttribute("name", key).addText(input.get(key));
68
						if (key.equals(identifier)) {
69
							row.addAttribute("isID", "true");
70
						}
71
					}
72

    
73
					return document.asXML();
74
				});
75
			} catch (final Exception e) {
76
				log.error("Error iterating csv lines", e);
77
				return null;
78
			}
79
		} catch (final Exception e) {
80
			log.error("Error iterating csv lines", e);
81
			return null;
82
		}
83
	}
84
}
(2-2/2)