Project

General

Profile

« Previous | Next » 

Revision 51722

Reverted filesystem plugin to the first implementation that does not use FileDirectoryStream: files were left open causing Too many open files errors

View differences:

FilesystemIterable.java
2 2

  
3 3
import java.io.File;
4 4
import java.io.FileInputStream;
5
import java.io.IOException;
5 6
import java.net.MalformedURLException;
6 7
import java.net.URL;
7 8
import java.util.Iterator;
8
import java.util.stream.Stream;
9 9

  
10
import com.google.common.collect.Iterators;
10 11
import eu.dnetlib.data.collector.plugins.oai.engine.XmlCleaner;
11
import eu.dnetlib.enabling.tools.DnetStreamSupport;
12 12
import eu.dnetlib.rmi.data.CollectorServiceException;
13 13
import eu.dnetlib.rmi.data.InterfaceDescriptor;
14 14
import org.apache.commons.io.IOUtils;
......
22 22
 */
23 23
public class FilesystemIterable implements Iterable<String> {
24 24

  
25
	/**
26
	 * The Constant log.
27
	 */
25
	/** The Constant log. */
28 26
	private static final Log log = LogFactory.getLog(FilesystemIterable.class);
29 27

  
30
	/**
31
	 * The base dir.
32
	 */
28
	/** The base dir. */
33 29
	private File baseDir;
34 30

  
35
	/**
36
	 * The extensions.
37
	 */
31
	/** The extensions. */
38 32
	private String extension;
39 33

  
40 34
	/**
41 35
	 * Instantiates a new filesystem iterable.
42 36
	 *
43
	 * @param descriptor the descriptor
44
	 * @throws CollectorServiceException the collector service exception
37
	 * @param descriptor
38
	 *            the descriptor
39
	 * @throws CollectorServiceException
40
	 *             the collector service exception
45 41
	 */
46 42
	public FilesystemIterable(final InterfaceDescriptor descriptor) throws CollectorServiceException {
47 43
		try {
48 44
			final String baseUrl = descriptor.getBaseUrl();
49 45
			URL basePath = new URL(baseUrl);
50 46
			this.baseDir = new File(basePath.getPath());
51
			if (!baseDir.exists()) { throw new CollectorServiceException(String.format("The base URL %s, does not exist", basePath.getPath())); }
47
			if (!baseDir.exists()) { throw new CollectorServiceException(String.format("The base ULR %s, does not exist", basePath.getPath())); }
52 48
			this.extension = descriptor.getParams().get("extensions");
53 49
		} catch (MalformedURLException e) {
54 50
			throw new CollectorServiceException("Filesystem collector failed! ", e);
......
58 54
	/**
59 55
	 * {@inheritDoc}
60 56
	 *
61
	 * @see Iterable#iterator()
57
	 * @see java.lang.Iterable#iterator()
62 58
	 */
63 59
	@Override
64 60
	public Iterator<String> iterator() {
65 61
		final FileSystemIterator fsi = new FileSystemIterator(baseDir.getAbsolutePath(), extension);
66
		final Stream<String> stringStream = DnetStreamSupport.generateStreamFromIterator(fsi);
67

  
68
		return stringStream.map(inputFileName -> {
69

  
70
			try (FileInputStream fileInputStream = new FileInputStream(inputFileName)) {
62
		return Iterators.transform(fsi, inputFileName -> {
63
			FileInputStream fileInputStream = null;
64
			try {
65
				fileInputStream = new FileInputStream(inputFileName);
71 66
				final String s = IOUtils.toString(fileInputStream);
72 67
				return XmlCleaner.cleanAllEntities(s.startsWith("\uFEFF") ? s.substring(1) : s);
73 68
			} catch (Exception e) {
74 69
				log.error("Unable to read " + inputFileName);
75 70
				return "";
71
			} finally {
72
				if (fileInputStream != null) {
73
					try {
74
						fileInputStream.close();
75
					} catch (IOException e) {
76
						log.error("Unable to close inputstream for  " + inputFileName);
77
					}
78
				}
76 79
			}
77
		}).iterator();
80
		});
78 81
	}
79
}
82
}

Also available in: Unified diff