Project

General

Profile

« Previous | Next » 

Revision 45602

View differences:

modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/WorkerConfiguration.java
1
package eu.dnetlib.msro;
2

  
3
import org.springframework.context.annotation.Configuration;
4

  
5
@Configuration
6
public class WorkerConfiguration {
7

  
8
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/exceptions/MSROException.java
1
package eu.dnetlib.msro.exceptions;
2

  
3
import eu.dnetlib.exceptions.DnetGenericException;
4

  
5
public class MSROException extends DnetGenericException {
6

  
7
	/**
8
	 *
9
	 */
10
	private static final long serialVersionUID = 3522182470263128085L;
11

  
12
	public MSROException(final String message, final Throwable cause) {
13
		super(message, cause);
14
	}
15

  
16
	public MSROException(final String message) {
17
		super(message);
18
	}
19

  
20
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/exceptions/CollectorServiceException.java
1
package eu.dnetlib.msro.exceptions;
2

  
3
import eu.dnetlib.exceptions.DnetGenericException;
4

  
5
public class CollectorServiceException extends DnetGenericException {
6

  
7
	/**
8
	 *
9
	 */
10
	private static final long serialVersionUID = -2286150868535911426L;
11

  
12
	public CollectorServiceException(final String message, final Throwable cause) {
13
		super(message, cause);
14
	}
15

  
16
	public CollectorServiceException(final String message) {
17
		super(message);
18
	}
19
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/exceptions/CollectorServiceRuntimeException.java
1
package eu.dnetlib.msro.exceptions;
2

  
3
import eu.dnetlib.exceptions.DnetGenericRuntimeException;
4

  
5
public class CollectorServiceRuntimeException extends DnetGenericRuntimeException {
6

  
7
	/**
8
	 *
9
	 */
10
	private static final long serialVersionUID = -2286150868535911426L;
11

  
12
	public CollectorServiceRuntimeException(final String message, final Throwable cause) {
13
		super(message, cause);
14
	}
15

  
16
	public CollectorServiceRuntimeException(final String message) {
17
		super(message);
18
	}
19
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/WorkerProperties.java
1
package eu.dnetlib.msro;
2

  
3
import java.io.IOException;
4
import java.util.ArrayList;
5
import java.util.List;
6
import java.util.Map;
7

  
8
import javax.validation.constraints.Max;
9
import javax.validation.constraints.Min;
10
import javax.validation.constraints.NotNull;
11

  
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.springframework.boot.context.properties.ConfigurationProperties;
15
import org.springframework.stereotype.Component;
16

  
17
import com.fasterxml.jackson.databind.ObjectMapper;
18

  
19
@Component
20
@ConfigurationProperties(prefix = "msro.worker")
21
public class WorkerProperties {
22

  
23
	private static final Log log = LogFactory.getLog(WorkerProperties.class);
24

  
25
	private final ObjectMapper mapper = new ObjectMapper();
26

  
27
	@Min(1)
28
	@Max(1024)
29
	private int maxSize;
30

  
31
	@NotNull
32
	private String datasourceProtocolsJson;
33

  
34
	@NotNull
35
	private String datasourceTypologiesJson;
36

  
37
	public String getDatasourceProtocolsJson() {
38
		return datasourceProtocolsJson;
39
	}
40

  
41
	public void setDatasourceProtocolsJson(final String datasourceProtocolsJson) {
42
		this.datasourceProtocolsJson = datasourceProtocolsJson;
43
	}
44

  
45
	public String getDatasourceTypologiesJson() {
46
		return datasourceTypologiesJson;
47
	}
48

  
49
	public void setDatasourceTypologiesJson(final String datasourceTypologiesJson) {
50
		this.datasourceTypologiesJson = datasourceTypologiesJson;
51
	}
52

  
53
	public int getMaxSize() {
54
		return maxSize;
55
	}
56

  
57
	public void setMaxSize(final int maxSize) {
58
		this.maxSize = maxSize;
59
	}
60

  
61
	public List<Map<String, String>> getDatasourceTypologies() {
62
		try {
63
			return mapper.readValue(datasourceTypologiesJson, List.class);
64
		} catch (final IOException e) {
65
			log.warn("Invalid JSON property: " + datasourceTypologiesJson, e);
66
			return new ArrayList<>();
67
		}
68
	}
69

  
70
	public List<Map<String, String>> getDatasourceProtocols() {
71
		try {
72
			return mapper.readValue(datasourceProtocolsJson, List.class);
73
		} catch (final IOException e) {
74
			log.warn("Invalid JSON property: " + datasourceProtocolsJson, e);
75
			return new ArrayList<>();
76
		}
77
	}
78
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/DnetSimpleAggregationWorkerApplication.java
1
package eu.dnetlib.msro;
2

  
3
import org.springframework.boot.SpringApplication;
4
import org.springframework.boot.autoconfigure.SpringBootApplication;
5
import org.springframework.context.annotation.ComponentScan;
6
import org.springframework.scheduling.annotation.EnableScheduling;
7

  
8
@SpringBootApplication
9
@ComponentScan("eu.dnetlib")
10
@EnableScheduling
11
public class DnetSimpleAggregationWorkerApplication {
12

  
13
	public static void main(final String[] args) {
14
		SpringApplication.run(DnetSimpleAggregationWorkerApplication.class, args);
15
	}
16

  
17
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/controller/MsroWorkerController.java
1
package eu.dnetlib.msro.workers.controller;
2

  
3
import java.util.List;
4
import java.util.stream.Collectors;
5

  
6
import org.springframework.beans.BeansException;
7
import org.springframework.context.ApplicationContext;
8
import org.springframework.context.ApplicationContextAware;
9
import org.springframework.web.bind.annotation.RequestMapping;
10
import org.springframework.web.bind.annotation.RestController;
11

  
12
import eu.dnetlib.enabling.annotations.DnetService;
13
import eu.dnetlib.enabling.annotations.DnetServiceType;
14
import eu.dnetlib.msro.workflows.nodes.AbstractProcessNode;
15
import eu.dnetlib.msro.workflows.procs.ProcessNodeDetails;
16
import eu.dnetlib.services.BaseService;
17

  
18
@RestController
19
@RequestMapping("/worker")
20
@DnetService(DnetServiceType.msroWorker)
21
public class MsroWorkerController extends BaseService implements ApplicationContextAware {
22

  
23
	private ApplicationContext applicationContext;
24

  
25
	@RequestMapping("nodes")
26
	public List<ProcessNodeDetails> listNodes() {
27
		return applicationContext.getBeansOfType(AbstractProcessNode.class)
28
				.values()
29
				.stream()
30
				.map(Object::getClass)
31
				.map(ProcessNodeDetails::prepare)
32
				.collect(Collectors.toList());
33
	}
34

  
35
	@Override
36
	public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException {
37
		this.applicationContext = applicationContext;
38
	}
39
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/mongo/MongoDumpIterator.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.mongo;
2

  
3
import java.io.BufferedReader;
4
import java.io.FileReader;
5
import java.io.IOException;
6
import java.util.Iterator;
7

  
8
import com.google.gson.JsonElement;
9
import com.google.gson.JsonObject;
10
import com.google.gson.JsonParser;
11

  
12
public class MongoDumpIterator implements Iterator<String> {
13

  
14
	private final BufferedReader inputStream;
15
	private String currentLine = null;
16

  
17
	public MongoDumpIterator(final FileReader inputStream) {
18
		this.inputStream = new BufferedReader(inputStream);
19
		this.currentLine = getNextLine();
20
	}
21

  
22
	@Override
23
	public boolean hasNext() {
24
		return currentLine != null;
25

  
26
	}
27

  
28
	@Override
29
	public String next() {
30
		final String returnedString = this.currentLine;
31
		this.currentLine = getNextLine();
32
		return returnedString;
33
	}
34

  
35
	@Override
36
	public void remove() {
37
		// TODO Auto-generated method stub
38

  
39
	}
40

  
41
	private String getNextLine() {
42
		try {
43
			String input = inputStream.readLine();
44
			while (input != null) {
45
				JsonElement jElement = new JsonParser().parse(input);
46
				JsonObject jobject = jElement.getAsJsonObject();
47
				if (jobject.has("body")) { return jobject.get("body").getAsString(); }
48
				input = inputStream.readLine();
49
			}
50
			return null;
51

  
52
		} catch (IOException e) {
53
			return null;
54
		}
55
	}
56
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/mongo/MongoDumpPlugin.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.mongo;
2

  
3
import java.io.File;
4
import java.io.FileNotFoundException;
5
import java.io.FileReader;
6
import java.util.Iterator;
7
import java.util.stream.Stream;
8

  
9
import org.springframework.stereotype.Component;
10

  
11
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
12
import eu.dnetlib.msro.workers.aggregation.collect.CollectException;
13
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
14
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin;
15
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
16

  
17
@Component
18
@DnetCollectorPlugin("mongoDump")
19
public class MongoDumpPlugin implements CollectorPlugin {
20

  
21
	@Override
22
	public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate)
23
			throws CollectException {
24
		try {
25
			final String baseUrl = interfaceDescriptor.getBaseUrl();
26

  
27
			if ((baseUrl == null) || baseUrl.isEmpty()) { throw new CollectException("Param 'baseurl' is null or empty"); }
28
			final File f = new File(baseUrl);
29
			if (f.exists() == false) { throw new CollectException("the file at url " + baseUrl + " does not exists"); }
30

  
31
			final FileReader reader = new FileReader(f);
32

  
33
			final Iterator<String> iter = new MongoDumpIterator(reader);
34

  
35
			return DnetStreamSupport.generateStreamFromIterator(iter);
36
		} catch (final FileNotFoundException e) {
37
			throw new CollectException("Error unable to open inputStream", e);
38
		}
39
	}
40

  
41
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/ProtocolDescriptor.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins;
2

  
3
import java.util.ArrayList;
4
import java.util.List;
5
import javax.xml.bind.annotation.XmlRootElement;
6

  
7
import org.springframework.beans.factory.annotation.Required;
8

  
9
@XmlRootElement
10
public class ProtocolDescriptor {
11

  
12
	private String name;
13
	private List<ProtocolParameter> params = new ArrayList<ProtocolParameter>();
14

  
15
	public ProtocolDescriptor() {
16
	}
17

  
18
	public ProtocolDescriptor(final String name, final List<ProtocolParameter> params) {
19
		this.name = name;
20
		this.params = params;
21
	}
22

  
23
	public String getName() {
24
		return name;
25
	}
26

  
27
	@Required
28
	public void setName(final String name) {
29
		this.name = name;
30
	}
31

  
32
	public List<ProtocolParameter> getParams() {
33
		return params;
34
	}
35

  
36
	public void setParams(final List<ProtocolParameter> params) {
37
		this.params = params;
38
	}
39
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/archive/zip/ZipCollectorPlugin.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.archive.zip;
2

  
3
import java.io.File;
4
import java.net.MalformedURLException;
5
import java.net.URL;
6
import java.util.Iterator;
7
import java.util.stream.Stream;
8

  
9
import org.springframework.stereotype.Component;
10

  
11
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
12
import eu.dnetlib.msro.workers.aggregation.collect.CollectException;
13
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
14
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin;
15
import eu.dnetlib.msro.workers.aggregation.collect.plugins.oai.engine.XmlCleaner;
16
// import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
17
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
18

  
19
/**
20
 * Collector pluging for collecting a zipped folder of records
21
 *
22
 * @author Andrea
23
 */
24
@Component
25
@DnetCollectorPlugin("zip")
26
public class ZipCollectorPlugin implements CollectorPlugin {
27

  
28
	@Override
29
	public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate)
30
			throws CollectException {
31

  
32
		final String baseUrl = interfaceDescriptor.getBaseUrl();
33
		if ((baseUrl == null) || baseUrl.isEmpty()) { throw new CollectException("Param 'baseurl' is null or empty"); }
34

  
35
		try {
36
			final String zipPath = interfaceDescriptor.getBaseUrl();
37
			final URL zipUrl = new URL(zipPath);
38
			final File zipFile = new File(zipUrl.getPath());
39
			if (!zipFile.exists()) { throw new CollectException(String.format("The base ULR %s, does not exist", zipFile.getPath())); }
40

  
41
			final Iterator<String> zipIterator = new ZipIterator(zipFile.getAbsolutePath());
42

  
43
			return DnetStreamSupport.generateStreamFromIterator(zipIterator)
44
					.map(s -> s.startsWith("\uFEFF") ? s.substring(1) : s)
45
					.map(XmlCleaner::cleanAllEntities);
46
		} catch (final MalformedURLException e) {
47
			throw new CollectException("Zip collector failed! ", e);
48
		}
49

  
50
	}
51

  
52
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/archive/zip/ZipIterator.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.archive.zip;
2

  
3
import java.io.File;
4
import java.io.IOException;
5
import java.io.InputStream;
6
import java.util.Enumeration;
7
import java.util.Iterator;
8
import java.util.zip.ZipEntry;
9
import java.util.zip.ZipFile;
10

  
11
import org.apache.commons.io.IOUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14

  
15
public class ZipIterator implements Iterator<String> {
16

  
17
	/**
18
	 * The Constant log.
19
	 */
20
	private static final Log log = LogFactory.getLog(ZipIterator.class);
21

  
22
	ZipFile zipFile;
23
	Enumeration<? extends ZipEntry> entries;
24
	private String current;
25

  
26
	public ZipIterator(final String zipPath) {
27
		try {
28
			this.zipFile = new ZipFile(zipPath);
29
			this.entries = zipFile.entries();
30
			this.current = findNext();
31
		} catch (IOException e) {
32
			log.error("Problems opening the .zip file " + zipPath, e);
33
		}
34
	}
35

  
36
	public ZipIterator(final File file) {
37
		try {
38
			this.zipFile = new ZipFile(file);
39
			this.entries = zipFile.entries();
40
			this.current = findNext();
41
		} catch (IOException e) {
42
			log.error("Problems opening the .zip file " + zipFile.getName(), e);
43
		}
44
	}
45

  
46
	@Override
47
	public boolean hasNext() {
48
		return current != null;
49
	}
50

  
51
	@Override
52
	public String next() {
53
		String ret = new String(current);
54
		current = findNext();
55
		return ret;
56
	}
57

  
58
	@Override
59
	public void remove() {
60
	}
61

  
62
	private synchronized String findNext() {
63
		ZipEntry entry = null;
64
		while (entries.hasMoreElements() && (entry = entries.nextElement()).isDirectory()) {
65
			log.debug("Skipping Zip entry " + entry.getName());
66
		}
67

  
68
		if (entry == null) {
69
			return null;
70
		} else {
71
			log.debug("Extracting " + entry.getName());
72
			try {
73
				InputStream stream = zipFile.getInputStream(entry);
74
				return IOUtils.toString(stream);
75
			} catch (IOException e) {
76
				log.error("Problems extracting entry " + entry.getName(), e);
77
				return null;
78
			}
79
		}
80

  
81
	}
82

  
83
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/archive/targz/TarGzIterator.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.archive.targz;
2

  
3
import java.io.*;
4
import java.util.Iterator;
5
import java.util.zip.GZIPInputStream;
6

  
7
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
8
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11

  
12
public class TarGzIterator implements Iterator<String> {
13

  
14
	/**
15
	 * The Constant log.
16
	 */
17
	private static final Log log = LogFactory.getLog(TarGzIterator.class);
18

  
19
	private TarArchiveInputStream tarInputStream;
20
	private String current;
21

  
22
	public TarGzIterator(final String tarGzPath) {
23
		try {
24
			this.tarInputStream = new TarArchiveInputStream(new BufferedInputStream(new GZIPInputStream(new FileInputStream(tarGzPath))));
25
			this.current = findNext();
26
		} catch (FileNotFoundException e) {
27
			log.error("Tar.gz file not found: " + tarGzPath, e);
28
		} catch (IOException e) {
29
			log.error("Problem opening tar.gz file " + tarGzPath, e);
30
		}
31
	}
32

  
33
	public TarGzIterator(final File tarGzFile) {
34
		try {
35
			this.tarInputStream = new TarArchiveInputStream(new BufferedInputStream(new GZIPInputStream(new FileInputStream(tarGzFile))));
36
			this.current = findNext();
37
		} catch (FileNotFoundException e) {
38
			log.error("Tar.gz file not found: " + tarGzFile.getAbsolutePath(), e);
39
		} catch (IOException e) {
40
			log.error("Problem opening tar.gz file " + tarGzFile.getAbsolutePath(), e);
41
		}
42
	}
43

  
44
	@Override
45
	public boolean hasNext() {
46
		return current != null;
47
	}
48

  
49
	@Override
50
	public String next() {
51
		String ret = new String(current);
52
		current = findNext();
53
		return ret;
54
	}
55

  
56
	@Override
57
	public void remove() {
58
	}
59

  
60
	private synchronized String findNext() {
61
		TarArchiveEntry entry = null;
62
		try {
63
			while (null != (entry = tarInputStream.getNextTarEntry()) && !entry.isFile()) {
64
				log.debug("Skipping TAR entry " + entry.getName());
65
			}
66
		} catch (IOException e) {
67
			log.error("Error during tar.gz extraction", e);
68
		}
69

  
70
		if (entry == null) {
71
			return null;
72
		} else {
73
			log.debug("Extracting " + entry.getName());
74
			byte[] content = new byte[(int) entry.getSize()];
75
			try {
76
				tarInputStream.read(content, 0, content.length);
77
				return new String(content);
78
			} catch (IOException e) {
79
				log.error("Impossible to extract file " + entry.getName(), e);
80
				return null;
81
			}
82

  
83
		}
84
	}
85
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/archive/targz/TarGzCollectorPlugin.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.archive.targz;
2

  
3
import java.io.File;
4
import java.net.MalformedURLException;
5
import java.net.URL;
6
import java.util.stream.Stream;
7

  
8
import org.springframework.stereotype.Component;
9

  
10
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
11
import eu.dnetlib.msro.workers.aggregation.collect.CollectException;
12
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
13
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin;
14
import eu.dnetlib.msro.workers.aggregation.collect.plugins.oai.engine.XmlCleaner;
15
// import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
16
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
17

  
18
/**
19
 * Collector pluging for collecting a .tar.gz folder of records
20
 *
21
 * @author andrea
22
 */
23

  
24
@Component
25
@DnetCollectorPlugin("targz")
26
public class TarGzCollectorPlugin implements CollectorPlugin {
27

  
28
	@Override
29
	public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate)
30
			throws CollectException {
31
		try {
32
			final String baseUrl = interfaceDescriptor.getBaseUrl();
33
			if ((baseUrl == null) || baseUrl.isEmpty()) { throw new CollectException("Param 'baseurl' is null or empty"); }
34

  
35
			final String tarGzPath = interfaceDescriptor.getBaseUrl();
36
			final URL tarGzUrl = new URL(tarGzPath);
37
			final File tarGzFile = new File(tarGzUrl.getPath());
38

  
39
			if (!tarGzFile.exists()) { throw new CollectException(String.format("The base ULR %s, does not exist", tarGzFile.getPath())); }
40

  
41
			final TarGzIterator tgzIterator = new TarGzIterator(tarGzFile.getAbsolutePath());
42

  
43
			return DnetStreamSupport.generateStreamFromIterator(tgzIterator)
44
					.map(s -> s.startsWith("\uFEFF") ? s.substring(1) : s)
45
					.map(XmlCleaner::cleanAllEntities);
46
		} catch (final MalformedURLException e) {
47
			throw new CollectException("TarGz collector failed! ", e);
48
		}
49
	}
50

  
51
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/ProtocolParameter.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins;
2

  
3
import org.springframework.beans.factory.annotation.Required;
4

  
5
import eu.dnetlib.msro.workers.aggregation.collect.functions.NullFunction;
6
import eu.dnetlib.msro.workers.aggregation.collect.functions.ParamValuesFunction;
7

  
8
public class ProtocolParameter {
9

  
10
	private String name;
11
	private boolean optional = false;
12
	private ProtocolParameterType type = ProtocolParameterType.TEXT;
13
	private String regex = null;
14
	private Class<? extends ParamValuesFunction> populateFunction = NullFunction.class;
15
	private boolean functionPopulated = false;
16

  
17
	public ProtocolParameter() {}
18

  
19
	public ProtocolParameter(final String name, final boolean optional, final ProtocolParameterType type, final String regex) {
20
		this(name, optional, type, regex, null);
21
	}
22

  
23
	public ProtocolParameter(final String name, final boolean optional, final ProtocolParameterType type, final String regex,
24
			final Class<? extends ParamValuesFunction> populateFunction) {
25
		this.name = name;
26
		this.optional = optional;
27
		this.type = type;
28
		this.regex = regex;
29
		this.populateFunction = populateFunction;
30
		functionPopulated = this.populateFunction != null;
31
	}
32

  
33
	public String getName() {
34
		return name;
35
	}
36

  
37
	@Required
38
	public void setName(final String name) {
39
		this.name = name;
40
	}
41

  
42
	public boolean isOptional() {
43
		return optional;
44
	}
45

  
46
	public void setOptional(final boolean optional) {
47
		this.optional = optional;
48
	}
49

  
50
	public ProtocolParameterType getType() {
51
		return type;
52
	}
53

  
54
	public void setType(final ProtocolParameterType type) {
55
		this.type = type;
56
	}
57

  
58
	public String getRegex() {
59
		return regex;
60
	}
61

  
62
	public void setRegex(final String regex) {
63
		this.regex = regex;
64
	}
65

  
66
	public Class<? extends ParamValuesFunction> getPopulateFunction() {
67
		return populateFunction;
68
	}
69

  
70
	public void setPopulateFunction(final Class<? extends ParamValuesFunction> populateFunction) {
71
		this.populateFunction = populateFunction;
72
		functionPopulated = this.populateFunction != null;
73
	}
74

  
75
	public boolean isFunctionPopulated() {
76
		return functionPopulated;
77
	}
78

  
79
	public void setFunctionPopulated(final boolean functionPopulated) {
80
		this.functionPopulated = functionPopulated;
81
	}
82

  
83
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/filesystem/FilesystemCollectorPlugin.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.filesystem;
2

  
3
import java.io.File;
4
import java.io.FileInputStream;
5
import java.net.MalformedURLException;
6
import java.net.URL;
7
import java.util.stream.Stream;
8

  
9
import org.apache.commons.io.IOUtils;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12
import org.springframework.stereotype.Component;
13

  
14
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
15
import eu.dnetlib.msro.workers.aggregation.collect.CollectException;
16
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
17
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam;
18
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin;
19
import eu.dnetlib.msro.workers.aggregation.collect.plugins.ProtocolParameterType;
20
import eu.dnetlib.msro.workers.aggregation.collect.plugins.oai.engine.XmlCleaner;
21
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
22

  
23
/**
24
 * @author andrea
25
 */
26
@Component
27
@DnetCollectorPlugin(value = "filesystem", parameters = {
28
		@DnetCollectorParam(value = "extensions", type = ProtocolParameterType.LIST)
29
})
30
public class FilesystemCollectorPlugin implements CollectorPlugin {
31

  
32
	private static final Log log = LogFactory.getLog(FilesystemCollectorPlugin.class);
33

  
34
	@Override
35
	public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate)
36
			throws CollectException {
37

  
38
		final String baseUrl = interfaceDescriptor.getBaseUrl();
39
		if ((baseUrl == null) || baseUrl.isEmpty()) { throw new CollectException("Param 'baseurl' is null or empty"); }
40

  
41
		try {
42
			final URL basePath = new URL(baseUrl);
43
			final File baseDir = new File(basePath.getPath());
44
			if (!baseDir.exists()) { throw new CollectException(String.format("The base ULR %s, does not exist", basePath.getPath())); }
45
			final String extension = interfaceDescriptor.getParams().get("extensions");
46

  
47
			final FileSystemIterator fsi = new FileSystemIterator(baseDir.getAbsolutePath(), extension);
48

  
49
			return DnetStreamSupport.generateStreamFromIterator(fsi)
50
					.map(inputFileName -> {
51
						try (FileInputStream fileInputStream = new FileInputStream(inputFileName)) {
52
							final String s = IOUtils.toString(fileInputStream);
53
							return XmlCleaner.cleanAllEntities(s.startsWith("\uFEFF") ? s.substring(1) : s);
54
						} catch (final Exception e) {
55
							log.error("Unable to read " + inputFileName);
56
							return "";
57
						}
58
					});
59
		} catch (final MalformedURLException e) {
60
			throw new CollectException("Filesystem collector failed! ", e);
61
		}
62

  
63
	}
64

  
65
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/filesystem/FileSystemIterator.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.filesystem;
2

  
3
import java.io.IOException;
4
import java.nio.file.Files;
5
import java.nio.file.Path;
6
import java.nio.file.Paths;
7
import java.util.Iterator;
8
import java.util.Set;
9

  
10
import com.google.common.collect.Iterators;
11
import com.google.common.collect.Sets;
12
import org.apache.commons.io.FilenameUtils;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15

  
16
/**
17
 * Class enabling lazy & recursive iteration of a filesystem tree. The iterator iterates over file paths.
18
 *
19
 * @author Andrea
20
 */
21
public class FileSystemIterator implements Iterator<String> {
22

  
23
	/**
24
	 * The logger
25
	 */
26
	private static final Log log = LogFactory.getLog(FileSystemIterator.class);
27

  
28
	private Set<String> extensions;
29
	private Iterator<Path> pathIterator;
30
	private String current;
31

  
32
	public FileSystemIterator(final String baseDir, final String extensions) {
33
		this.extensions = Sets.newHashSet(extensions.split(","));
34
		try {
35
			this.pathIterator = Files.newDirectoryStream(Paths.get(baseDir)).iterator();
36
			this.current = walkTillNext();
37
		} catch (IOException e) {
38
			log.error("Cannot initialize File System Iterator. Is this path correct? " + baseDir);
39
			throw new RuntimeException("Filesystem collection error.", e);
40
		}
41
	}
42

  
43
	@Override
44
	public boolean hasNext() {
45
		return current != null;
46
	}
47

  
48
	@Override
49
	public synchronized String next() {
50
		String pivot = new String(current);
51
		current = walkTillNext();
52
		log.debug("Returning: " + pivot);
53
		return pivot;
54
	}
55

  
56
	@Override
57
	public void remove() {
58
	}
59

  
60
	/**
61
	 * Walk the filesystem recursively until it finds a candidate. Strategies: a) For any directory found during the walk, an iterator is
62
	 * built and concat to the main one; b) Any file is checked against admitted extensions
63
	 *
64
	 * @return the next element to be returned by next call of this.next()
65
	 */
66
	private synchronized String walkTillNext() {
67
		while (pathIterator.hasNext()) {
68
			Path nextFilePath = pathIterator.next();
69
			if (Files.isDirectory(nextFilePath)) {
70
				// concat
71
				try {
72
					pathIterator = Iterators.concat(pathIterator, Files.newDirectoryStream(nextFilePath).iterator());
73
					log.debug("Adding folder iterator: " + nextFilePath.toString());
74
				} catch (IOException e) {
75
					log.error("Cannot create folder iterator! Is this path correct? " + nextFilePath.toString());
76
					return null;
77
				}
78
			} else {
79
				if (extensions.contains(FilenameUtils.getExtension(nextFilePath.toString()))) {
80
					log.debug("Returning: " + nextFilePath.toString());
81
					return nextFilePath.toString();
82
				}
83
			}
84
		}
85
		return null;
86
	}
87
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/ProtocolParameterType.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins;
2

  
3
public enum ProtocolParameterType {
4
	TEXT, NUMBER, LIST, BOOLEAN
5
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/CollectorPlugin.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins;
2

  
3
import java.util.Arrays;
4
import java.util.List;
5
import java.util.stream.Collectors;
6
import java.util.stream.Stream;
7

  
8
import org.apache.commons.lang3.StringUtils;
9

  
10
import eu.dnetlib.msro.workers.aggregation.collect.CollectException;
11
import eu.dnetlib.msro.workers.aggregation.collect.functions.NullFunction;
12
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
13

  
14
public interface CollectorPlugin {
15

  
16
	Stream<String> collect(InterfaceDescriptor interfaceDescriptor, String fromDate, String untilDate) throws CollectException;
17

  
18
	default String getProtocol() {
19
		return getClass().getAnnotation(DnetCollectorPlugin.class).value();
20
	}
21

  
22
	default List<String> listNameParameters() {
23
		return Arrays.stream(getClass().getAnnotation(DnetCollectorPlugin.class).parameters())
24
				.map(DnetCollectorParam::value)
25
				.collect(Collectors.toList());
26
	}
27

  
28
	default ProtocolDescriptor getProtocolDescriptor() {
29
		final DnetCollectorPlugin ann = getClass().getAnnotation(DnetCollectorPlugin.class);
30

  
31
		final List<ProtocolParameter> params = Arrays.stream(ann.parameters())
32
				.map(ap -> {
33
					final ProtocolParameter p = new ProtocolParameter();
34
					p.setName(ap.value());
35
					p.setType(ap.type());
36
					p.setOptional(ap.optional());
37
					p.setPopulateFunction(ap.populateFunction());
38
					p.setFunctionPopulated(ap.populateFunction() != NullFunction.class);
39
					if (StringUtils.isNotBlank(ap.regex())) {
40
						p.setRegex(ap.regex());
41
					}
42
					return p;
43
				})
44
				.collect(Collectors.toList());
45

  
46
		return new ProtocolDescriptor(ann.value(), params);
47
	}
48
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/filesfrommetadata/FilesFromMetadataCollectorPlugin.java
1
/**
2
 *
3
 */
4
package eu.dnetlib.msro.workers.aggregation.collect.plugins.filesfrommetadata;
5

  
6
import java.util.stream.Stream;
7

  
8
import org.springframework.stereotype.Component;
9

  
10
import eu.dnetlib.msro.workers.aggregation.collect.CollectException;
11
import eu.dnetlib.msro.workers.aggregation.collect.functions.PopulateFileDownloadBasePath;
12
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
13
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam;
14
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin;
15
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
16

  
17
/**
18
 * @author sandro
19
 */
20
@Component
21
@DnetCollectorPlugin(value = "files_from_metadata", parameters = {
22
		@DnetCollectorParam(value = "basePath", populateFunction = PopulateFileDownloadBasePath.class)
23
})
24
public class FilesFromMetadataCollectorPlugin implements CollectorPlugin {
25

  
26
	/**
27
	 * {@inheritDoc}
28
	 */
29
	@Override
30
	public Stream<String> collect(final InterfaceDescriptor arg0, final String arg1, final String arg2) throws CollectException {
31
		// TODO Auto-generated method stub
32
		return null;
33
	}
34

  
35
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/ftp/FtpCollectorPlugin.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.ftp;
2

  
3
import java.util.Iterator;
4
import java.util.Set;
5
import java.util.stream.Stream;
6

  
7
import org.springframework.stereotype.Component;
8

  
9
import com.google.common.base.Splitter;
10
import com.google.common.collect.Sets;
11

  
12
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
13
import eu.dnetlib.msro.workers.aggregation.collect.CollectException;
14
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
15
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam;
16
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin;
17
import eu.dnetlib.msro.workers.aggregation.collect.plugins.ProtocolParameterType;
18
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
19

  
20
/**
21
 * @author Author: Andrea Mannocci
22
 */
23
@Component
24
@DnetCollectorPlugin(value = "ftp", parameters = {
25
		@DnetCollectorParam("username"),
26
		@DnetCollectorParam("password"),
27
		@DnetCollectorParam(value = "recursive", type = ProtocolParameterType.BOOLEAN),
28
		@DnetCollectorParam(value = "extensions", type = ProtocolParameterType.LIST)
29
})
30
public class FtpCollectorPlugin implements CollectorPlugin {
31

  
32
	@Override
33
	public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate)
34
			throws CollectException {
35

  
36
		final String baseUrl = interfaceDescriptor.getBaseUrl();
37
		final String username = interfaceDescriptor.getParams().get("username");
38
		final String password = interfaceDescriptor.getParams().get("password");
39
		final String recursive = interfaceDescriptor.getParams().get("recursive");
40
		final String extensions = interfaceDescriptor.getParams().get("extensions");
41

  
42
		if ((baseUrl == null) || baseUrl.isEmpty()) { throw new CollectException("Param 'baseurl' is null or empty"); }
43
		if ((username == null) || username.isEmpty()) { throw new CollectException("Param 'username' is null or empty"); }
44
		if ((password == null) || password.isEmpty()) { throw new CollectException("Param 'password' is null or empty"); }
45
		if ((recursive == null) || recursive.isEmpty()) { throw new CollectException("Param 'recursive' is null or empty"); }
46
		if ((extensions == null) || extensions.isEmpty()) { throw new CollectException("Param 'extensions' is null or empty"); }
47

  
48
		final boolean isRecursive = "true".equals(recursive);
49

  
50
		final Set<String> extensionsSet = parseSet(extensions);
51

  
52
		final Iterator<String> iter = new FtpIterator(baseUrl, username, password, isRecursive, extensionsSet);
53

  
54
		return DnetStreamSupport.generateStreamFromIterator(iter);
55
	}
56

  
57
	private Set<String> parseSet(final String extensions) {
58
		return Sets.newHashSet(Splitter.on(",").omitEmptyStrings().trimResults().split(extensions));
59
	}
60

  
61
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/ftp/FtpIterator.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.ftp;
2

  
3
import java.io.IOException;
4
import java.io.OutputStream;
5
import java.net.MalformedURLException;
6
import java.net.URL;
7
import java.util.Iterator;
8
import java.util.LinkedList;
9
import java.util.Queue;
10
import java.util.Set;
11

  
12
import org.apache.commons.io.output.ByteArrayOutputStream;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.apache.commons.net.ftp.FTPClient;
16
import org.apache.commons.net.ftp.FTPFile;
17
import org.apache.commons.net.ftp.FTPReply;
18

  
19
import eu.dnetlib.msro.exceptions.CollectorServiceRuntimeException;
20

  
21
/**
22
 * @author Author: Andrea Mannocci
23
 */
24
public class FtpIterator implements Iterator<String> {
25

  
26
	private static final Log log = LogFactory.getLog(FtpIterator.class);
27

  
28
	private static final int MAX_RETRIES = 5;
29
	private static final int DEFAULT_TIMEOUT = 30000;
30
	private static final long BACKOFF_MILLIS = 10000;
31

  
32
	private FTPClient ftpClient;
33
	private String ftpServerAddress;
34
	private String remoteFtpBasePath;
35
	private String username;
36
	private String password;
37
	private boolean isRecursive;
38
	private Set<String> extensionsSet;
39

  
40
	private Queue<String> queue;
41

  
42
	public FtpIterator(final String baseUrl, final String username, final String password, final boolean isRecursive,
43
			final Set<String> extensionsSet) {
44
		this.username = username;
45
		this.password = password;
46
		this.isRecursive = isRecursive;
47
		this.extensionsSet = extensionsSet;
48
		try {
49
			final URL server = new URL(baseUrl);
50
			ftpServerAddress = server.getHost();
51
			remoteFtpBasePath = server.getPath();
52
		} catch (final MalformedURLException e1) {
53
			throw new CollectorServiceRuntimeException("Malformed URL exception " + baseUrl);
54
		}
55

  
56
		connectToFtpServer();
57
		initializeQueue();
58
	}
59

  
60
	private void connectToFtpServer() {
61
		ftpClient = new FTPClient();
62
		ftpClient.setDefaultTimeout(DEFAULT_TIMEOUT);
63
		ftpClient.setDataTimeout(DEFAULT_TIMEOUT);
64
		ftpClient.setConnectTimeout(DEFAULT_TIMEOUT);
65
		try {
66
			ftpClient.connect(ftpServerAddress);
67

  
68
			// try to login
69
			if (!ftpClient.login(username, password)) {
70
				ftpClient.logout();
71
				throw new CollectorServiceRuntimeException("Unable to login to FTP server " + ftpServerAddress);
72
			}
73
			final int reply = ftpClient.getReplyCode();
74
			if (!FTPReply.isPositiveCompletion(reply)) {
75
				ftpClient.disconnect();
76
				throw new CollectorServiceRuntimeException("Unable to connect to FTP server " + ftpServerAddress);
77
			}
78

  
79
			ftpClient.enterLocalPassiveMode();
80
			log.info("Connected to FTP server " + ftpServerAddress);
81
			log.info(String.format("FTP collecting from %s with recursion = %s", remoteFtpBasePath, isRecursive));
82
		} catch (final IOException e) {
83
			throw new CollectorServiceRuntimeException("Unable to connect to FTP server " + ftpServerAddress);
84
		}
85
	}
86

  
87
	private void disconnectFromFtpServer() {
88
		try {
89
			if (ftpClient.isConnected()) {
90
				ftpClient.logout();
91
				ftpClient.disconnect();
92
			}
93
		} catch (final IOException e) {
94
			log.error("Failed to logout & disconnect from the FTP server", e);
95
		}
96
	}
97

  
98
	private void initializeQueue() {
99
		queue = new LinkedList<String>();
100
		listDirectoryRecursive(remoteFtpBasePath, "");
101
	}
102

  
103
	private void listDirectoryRecursive(final String parentDir, final String currentDir) {
104
		String dirToList = parentDir;
105
		if (!currentDir.equals("")) {
106
			dirToList += "/" + currentDir;
107
		}
108
		FTPFile[] subFiles;
109
		try {
110
			subFiles = ftpClient.listFiles(dirToList);
111
			if ((subFiles != null) && (subFiles.length > 0)) {
112
				for (final FTPFile aFile : subFiles) {
113
					final String currentFileName = aFile.getName();
114
					if (currentFileName.equals(".") || currentFileName.equals("..")) {
115
						// skip parent directory and directory itself
116
						continue;
117
					}
118
					if (aFile.isDirectory()) {
119
						if (isRecursive) {
120
							listDirectoryRecursive(dirToList, currentFileName);
121
						}
122
					} else {
123
						// test the file for extensions compliance and, just in case, add it to the list.
124
						for (final String ext : extensionsSet) {
125
							if (currentFileName.endsWith(ext)) {
126
								queue.add(dirToList + "/" + currentFileName);
127
							}
128
						}
129
					}
130
				}
131
			}
132
		} catch (final IOException e) {
133
			throw new CollectorServiceRuntimeException("Unable to list FTP remote folder", e);
134
		}
135
	}
136

  
137
	@Override
138
	public boolean hasNext() {
139
		if (queue.isEmpty()) {
140
			disconnectFromFtpServer();
141
			return false;
142
		} else {
143
			return true;
144
		}
145
	}
146

  
147
	@Override
148
	public String next() {
149
		final String nextRemotePath = queue.remove();
150
		int nRepeat = 0;
151
		while (nRepeat < MAX_RETRIES) {
152
			try {
153
				final OutputStream baos = new ByteArrayOutputStream();
154
				if (!ftpClient.isConnected()) {
155
					connectToFtpServer();
156
				}
157
				ftpClient.retrieveFile(nextRemotePath, baos);
158

  
159
				log.debug(String.format("Collected file from FTP: %s%s", ftpServerAddress, nextRemotePath));
160
				return baos.toString();
161
			} catch (final IOException e) {
162
				nRepeat++;
163
				log.warn(String.format("An error occurred [%s] for %s%s, retrying.. [retried %s time(s)]", e.getMessage(), ftpServerAddress, nextRemotePath,
164
						nRepeat));
165
				disconnectFromFtpServer();
166
				try {
167
					Thread.sleep(BACKOFF_MILLIS);
168
				} catch (final InterruptedException e1) {
169
					log.error(e1);
170
				}
171
			}
172
		}
173
		throw new CollectorServiceRuntimeException(
174
				String.format("Impossible to retrieve FTP file %s after %s retries. Aborting FTP collection.", nextRemotePath, nRepeat));
175
	}
176

  
177
	@Override
178
	public void remove() {
179
		throw new UnsupportedOperationException();
180
	}
181
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/csv/FileCSVCollectorPlugin.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.csv;
2

  
3
import java.io.BufferedReader;
4
import java.io.File;
5
import java.io.FileReader;
6
import java.io.IOException;
7
import java.net.MalformedURLException;
8
import java.net.URL;
9
import java.util.Iterator;
10
import java.util.Spliterator;
11
import java.util.Spliterators;
12
import java.util.stream.Stream;
13
import java.util.stream.StreamSupport;
14

  
15
import org.apache.commons.lang3.StringEscapeUtils;
16
import org.apache.commons.logging.Log;
17
import org.apache.commons.logging.LogFactory;
18
import org.dom4j.Document;
19
import org.dom4j.DocumentHelper;
20
import org.dom4j.Element;
21
import org.springframework.stereotype.Component;
22

  
23
import eu.dnetlib.msro.workers.aggregation.collect.CollectException;
24
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
25
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam;
26
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin;
27
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
28

  
29
@Component
30
@DnetCollectorPlugin(value = "fileCSV", parameters = {
31
		@DnetCollectorParam("header"),
32
		@DnetCollectorParam("separator"),
33
		@DnetCollectorParam("identifier"),
34
		@DnetCollectorParam("quote")
35
})
36
public class FileCSVCollectorPlugin implements CollectorPlugin {
37

  
38
	private static final Log log = LogFactory.getLog(FileCSVCollectorPlugin.class);
39
	private String[] headers = null;
40
	private int identifierNumber;
41

  
42
	@Override
43
	public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate)
44
			throws CollectException {
45
		final String header = interfaceDescriptor.getParams().get("header");
46
		final String separator = StringEscapeUtils.unescapeJava(interfaceDescriptor.getParams().get("separator"));
47

  
48
		identifierNumber = Integer.parseInt(interfaceDescriptor.getParams().get("identifier"));
49
		URL u = null;
50
		try {
51
			u = new URL(interfaceDescriptor.getBaseUrl());
52
		} catch (final MalformedURLException e1) {
53
			throw new CollectException("Invalid URL: " + interfaceDescriptor.getBaseUrl(), e1);
54
		}
55
		final String baseUrl = u.getPath();
56

  
57
		log.info("base URL = " + baseUrl);
58

  
59
		try {
60
			final BufferedReader br = new BufferedReader(new FileReader(new File(baseUrl)));
61
			if ((header != null) && "true".equals(header.toLowerCase())) {
62
				headers = br.readLine().split(separator);
63
			}
64

  
65
			final Iterator<String> iter = new FileCSVIterator(br, separator);
66

  
67
			return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED), false);
68

  
69
		} catch (final Exception e) {
70
			throw new CollectException("Error iterating CSV", e);
71
		}
72
	}
73

  
74
	class FileCSVIterator implements Iterator<String> {
75

  
76
		private String next;
77

  
78
		private BufferedReader reader;
79

  
80
		private String separator;
81

  
82
		public FileCSVIterator(final BufferedReader reader, final String separator) {
83
			this.reader = reader;
84
			this.separator = separator;
85
			next = calculateNext();
86
		}
87

  
88
		@Override
89
		public boolean hasNext() {
90
			return next != null;
91
		}
92

  
93
		@Override
94
		public String next() {
95
			final String s = next;
96
			next = calculateNext();
97
			return s;
98
		}
99

  
100
		private String calculateNext() {
101
			try {
102
				final Document document = DocumentHelper.createDocument();
103
				final Element root = document.addElement("csvRecord");
104

  
105
				String newLine = reader.readLine();
106

  
107
				// FOR SOME FILES IT RETURN NULL ALSO IF THE FILE IS NOT READY DONE
108
				if (newLine == null) {
109
					newLine = reader.readLine();
110
				}
111
				if (newLine == null) {
112
					log.info("there is no line, closing RESULT SET");
113

  
114
					reader.close();
115
					return null;
116
				}
117
				final String[] currentRow = newLine.split(separator);
118

  
119
				if (currentRow != null) {
120

  
121
					for (int i = 0; i < currentRow.length; i++) {
122
						final String hAttribute = (headers != null) && (i < headers.length) ? headers[i] : "column" + i;
123

  
124
						final Element row = root.addElement("column");
125
						if (i == identifierNumber) {
126
							row.addAttribute("isID", "true");
127
						}
128
						row.addAttribute("name", hAttribute).addText(currentRow[i]);
129
					}
130
					return document.asXML();
131
				}
132
			} catch (final IOException e) {
133
				log.error("Error calculating next csv element", e);
134
			}
135
			return null;
136
		}
137

  
138
		@Override
139
		public void remove() {
140
			throw new UnsupportedOperationException();
141
		}
142

  
143
	}
144

  
145
}
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/csv/HttpCSVCollectorPlugin.java
1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.csv;
2

  
3
import java.io.InputStreamReader;
4
import java.io.Reader;
5
import java.net.URL;
6
import java.util.Iterator;
7
import java.util.Set;
8
import java.util.stream.Stream;
9

  
10
import org.apache.commons.csv.CSVFormat;
11
import org.apache.commons.csv.CSVParser;
12
import org.apache.commons.lang3.StringUtils;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.dom4j.Document;
16
import org.dom4j.DocumentHelper;
17
import org.dom4j.Element;
18
import org.springframework.stereotype.Component;
19

  
20
import com.google.common.collect.Iterators;
21

  
22
import eu.dnetlib.miscutils.streams.DnetStreamSupport;
23
import eu.dnetlib.msro.workers.aggregation.collect.CollectException;
24
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin;
25
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam;
26
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin;
27
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor;
28

  
29
/**
30
 * The Class HttpCSVCollectorPlugin.
31
 */
32
@Component
33
@DnetCollectorPlugin(value = "httpCSV", parameters = {
34
		@DnetCollectorParam("separator"),
35
		@DnetCollectorParam("identifier"),
36
		@DnetCollectorParam("quote"),
37
})
38
public class HttpCSVCollectorPlugin implements CollectorPlugin {
39

  
40
	private static final Log log = LogFactory.getLog(HttpCSVCollectorPlugin.class);
41

  
42
	/*
43
	 * (non-Javadoc)
44
	 *
45
	 * @see eu.dnetlib.msro.workers.aggregation.collect.plugin.CollectorPlugin#collect(eu.dnetlib.msro.workers.aggregation.collect.rmi.
46
	 * InterfaceDescriptor, java.lang.String, java.lang.String)
47
	 */
48
	@Override
49
	public Stream<String> collect(final InterfaceDescriptor descriptor, final String fromDate, final String untilDate) throws CollectException {
50
		return DnetStreamSupport.generateStreamFromIterator(getIterator(descriptor));
51
	}
52

  
53
	private Iterator<String> getIterator(final InterfaceDescriptor descriptor) {
54
		try {
55
			final URL url = new URL(descriptor.getBaseUrl());
56
			url.openConnection();
57

  
58
			final String separatorString = descriptor.getParams().get("separator");
59
			final String identifier = descriptor.getParams().get("identifier");
60
			final String quote = descriptor.getParams().get("quote");
61
			final char separator = separatorString.equals("\\t") || StringUtils.isBlank(separatorString) ? '\t' : separatorString.charAt(0);
62

  
63
			final CSVFormat format = StringUtils.isBlank(quote) ? CSVFormat.EXCEL.withHeader().withDelimiter(separator)
64
					: CSVFormat.EXCEL.withHeader().withDelimiter(separator).withQuote(quote.charAt(0));
... This diff was truncated because it exceeds the maximum size that can be displayed.

Also available in: Unified diff