Project

General

Profile

« Previous | Next » 

Revision 28806

Implemented a new version of the collector plugin for the File system

View differences:

modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/main/java/eu/dnetlib/data/collector/plugins/filesystem/StringFileWalker.java
1
package eu.dnetlib.data.collector.plugins.filesystem;
2

  
3
import java.io.File;
4
import java.io.FileInputStream;
5
import java.io.IOException;
6
import java.io.StringWriter;
7
import java.util.concurrent.BlockingQueue;
8

  
9
import org.apache.commons.io.IOUtils;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12

  
13
import com.google.common.collect.Lists;
14

  
15
public class StringFileWalker extends FileWalker<String> {
16

  
17
	private static final Log log = LogFactory.getLog(StringFileWalker.class); // NOPMD by marko on 11/24/08 5:02 PM
18

  
19
	@Override
20
	protected void doWalk() throws IOException {
21
		log.info("starting to iterate " + getType().toString() + " files under " + getSource().getAbsolutePath());
22

  
23
		walk(getSource(), getQueue());
24
		enqueue(getQueue(), FilesystemIterable.done);
25
	}
26

  
27
	@Override
28
	protected String readFile(final File file) throws IOException {
29

  
30
		FileInputStream fis = new FileInputStream(file);
31
		String fileContent = null;
32
		switch (getType()) {
33
		case TEXT:
34
			final StringWriter sw = new StringWriter();
35
			IOUtils.copy(fis, sw);
36
			sw.flush();
37
			fileContent = sw.toString();
38
			// remove BOM if present
39
			if (fileContent.startsWith("\uFEFF")) {
40
				fileContent = fileContent.substring(1);
41
			}
42
			break;
43

  
44
		default:
45
			throw new UnsupportedOperationException("FileType should be one of: " + Lists.newArrayList(FileType.values()));
46
		}
47
		fis.close();
48
		return fileContent;
49
	}
50

  
51
	public StringFileWalker(final BlockingQueue<String> queue, final FileType type, final String basePath) {
52
		super(queue, type, basePath);
53
	}
54

  
55
}
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/main/java/eu/dnetlib/data/collector/plugins/filesystem/FilesystemBlockingStream.java
1
package eu.dnetlib.data.collector.plugins.filesystem;
2

  
3
import java.util.Iterator;
4
import java.util.NoSuchElementException;
5
import java.util.concurrent.BlockingQueue;
6
import java.util.concurrent.TimeUnit;
7

  
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10

  
11
/**
12
 * FilesystemBlockingStream fetches elements from a shared queue, which must be populated by a producer.
13
 * 
14
 * @author andrea
15
 * 
16
 * @param <T>
17
 *            Type returned by the next() method
18
 */
19
public class FilesystemBlockingStream<T> implements Iterator<T> {
20

  
21
	/**
22
	 * Logger.
23
	 */
24
	private static final Log log = LogFactory.getLog(FilesystemBlockingStream.class);
25

  
26
	/**
27
	 * shared queue.
28
	 */
29
	private BlockingQueue<T> queue;
30

  
31
	/**
32
	 * next element to be returned.
33
	 */
34
	private T nextReturn;
35

  
36
	/**
37
	 * flag.
38
	 */
39
	private boolean hasNext = true;
40

  
41
	/**
42
	 * Builds a BlockingStream.
43
	 * 
44
	 * @param queue
45
	 *            the shared queue.
46
	 */
47
	public FilesystemBlockingStream(final BlockingQueue<T> queue) {
48
		super();
49
		this.queue = queue;
50
	}
51

  
52
	@Override
53
	synchronized public boolean hasNext() {
54

  
55
		while (nextReturn == null) {
56

  
57
			try {
58
				nextReturn = queue.poll(2, TimeUnit.SECONDS);
59
			} catch (InterruptedException e) {
60
				log.error("queue timed out ", e);
61
				return false;
62
			}
63

  
64
			if ((FilesystemIterable.done == nextReturn)) {
65
				hasNext = false;
66
			}
67
		}
68
		return hasNext;
69
	}
70

  
71
	@Override
72
	synchronized public T next() {
73
		if (!hasNext()) throw new NoSuchElementException();
74

  
75
		T retVal = nextReturn;
76
		nextReturn = null;
77
		return retVal;
78
	}
79

  
80
	@Override
81
	public void remove() {
82
		throw new UnsupportedOperationException("Not available.");
83
	}
84

  
85
}
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/main/java/eu/dnetlib/data/collector/plugins/filesystem/FileWalker.java
1
package eu.dnetlib.data.collector.plugins.filesystem;
2

  
3
import java.io.File;
4
import java.io.IOException;
5
import java.util.Collection;
6
import java.util.concurrent.BlockingQueue;
7

  
8
import org.apache.commons.io.DirectoryWalker;
9
import org.apache.commons.io.filefilter.FileFilterUtils;
10
import org.apache.commons.io.filefilter.HiddenFileFilter;
11
import org.apache.commons.io.filefilter.IOFileFilter;
12
import org.apache.commons.io.filefilter.SuffixFileFilter;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15

  
16
/**
17
 * FileWalker runs recursively under a directory structure starting from a given path and for each file reads it's content and puts it in a
18
 * shared queue.
19
 * 
20
 * Acts as a producer.
21
 * 
22
 * @author claudio
23
 * 
24
 * @param <T>
25
 *            Type of expected content to extract from files NB. Generally strings.
26
 */
27
public abstract class FileWalker<T> extends DirectoryWalker<T> {
28

  
29
	/**
30
	 * Logger.
31
	 */
32
	private static final Log log = LogFactory.getLog(FileWalker.class);
33

  
34
	/**
35
	 * Shared queue.
36
	 */
37
	private final BlockingQueue<T> queue;
38

  
39
	/**
40
	 * Reference to the starting directory
41
	 */
42
	private final File source;
43

  
44
	/**
45
	 * specifies the file type;
46
	 */
47
	private final FileType type;
48

  
49
	public enum FileType {
50
		TEXT, PDF, DOC
51
	}
52

  
53
	public static String IGNORE_PREFIX = ".";
54

  
55
	public static String IGNORE_SUFFIX = "~";
56

  
57
	static IOFileFilter fileFilter = FileFilterUtils.notFileFilter(FileFilterUtils.or(new SuffixFileFilter("~"), HiddenFileFilter.HIDDEN));
58

  
59
	/**
60
	 * Builds a FileWalker.
61
	 * 
62
	 * @param sourcePath
63
	 *            the
64
	 * @param queue
65
	 */
66
	public FileWalker(final BlockingQueue<T> queue, final FileType type, final String basePath) {
67
		super(fileFilter, -1);
68
		this.source = new File(basePath);
69
		this.type = type;
70
		this.queue = queue;
71
	}
72

  
73
	@Override
74
	protected void handleFile(final File file, final int depth, final Collection<T> results) throws IOException {
75
		enqueue((BlockingQueue<T>) results, readFile(file));
76
	}
77

  
78
	@Override
79
	protected boolean handleDirectory(final File directory, final int depth, final Collection<T> results) throws IOException {
80
		if (directory.getName().startsWith(IGNORE_PREFIX)) return false;
81
		return super.handleDirectory(directory, depth, results);
82
	}
83

  
84
	/* Helpers */
85
	/**
86
	 * given a file, return its content as a T
87
	 * 
88
	 * @param file
89
	 *            the source
90
	 * @return the file content as an object of type T
91
	 * @throws IOException
92
	 */
93
	protected abstract T readFile(final File file) throws IOException;
94

  
95
	/**
96
	 * Wrapper method, starts the walk and when it's done adds the flag to the queue.
97
	 */
98
	protected abstract void doWalk() throws IOException;
99

  
100
	/**
101
	 * Adds the element to the queue
102
	 */
103
	protected void enqueue(final BlockingQueue<T> queue, final T element) {
104
		try {
105
			queue.put(element);
106
		} catch (InterruptedException e) {
107
			log.warn("ops... ", e);
108
		}
109
	}
110

  
111
	public BlockingQueue<T> getQueue() {
112
		return queue;
113
	}
114

  
115
	public File getSource() {
116
		return source;
117
	}
118

  
119
	public FileType getType() {
120
		return type;
121
	}
122

  
123
}
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/test/java/eu/dnetlib/data/collector/plugins/oai/OaiCollectorPluginRealTest.java
9 9
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor;
10 10

  
11 11
public class OaiCollectorPluginRealTest {
12
	
12

  
13 13
	private OaiCollectorPlugin oai;
14 14

  
15 15
	private static final String BASE_URL = "http://oai.d.efg.research-infrastructures.eu/oai.do";
......
32 32
		iface.setParams(new HashMap<String, String>());
33 33
		iface.getParams().put("format", FORMAT);
34 34
		iface.getParams().put("set", SETS);
35
		
35

  
36 36
		int count = 0;
37
		for(String s : oai.collect(iface, null, null)) {
37
		for (String s : oai.collect(iface, null, null)) {
38 38
			count++;
39 39
		}
40 40
		System.out.println("TOTAL: " + count);
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/test/java/eu/dnetlib/data/collector/fileSystem/FileSystemCollectorPluginTest.java
1
package eu.dnetlib.data.collector.fileSystem;
2

  
3
import java.util.HashMap;
4

  
5
import org.junit.Assert;
6
import org.junit.Ignore;
7
import org.junit.Test;
8

  
9
import eu.dnetlib.data.collector.plugins.filesystem.FilesystemCollectorPlugin;
10
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
11
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor;
12

  
13
public class FileSystemCollectorPluginTest {
14

  
15
	@Ignore
16
	@Test
17
	public void test() throws CollectorServiceException {
18
		InterfaceDescriptor descr = new InterfaceDescriptor();
19
		HashMap<String, String> params = new HashMap<String, String>();
20
		params.put("extensions", "suca, xml");
21
		descr.setBaseUrl("file:///home/sandro/cordaTEST");
22
		descr.setParams(params);
23

  
24
		FilesystemCollectorPlugin plugin = new FilesystemCollectorPlugin();
25
		Iterable<String> result = plugin.collect(descr, null, null);
26
		int i = 0;
27
		for (String s : result) {
28
			i++;
29
			Assert.assertNotNull(s);
30
		}
31
		System.out.println("Total " + i);
32

  
33
	}
34

  
35
}
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/main/java/eu/dnetlib/data/collector/plugins/filesystem/FilesystemCollectorPlugin.java
1 1
package eu.dnetlib.data.collector.plugins.filesystem;
2 2

  
3
import java.net.URISyntaxException;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7

  
8 3
import eu.dnetlib.data.collector.plugin.CollectorPlugin;
9 4
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
10 5
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor;
......
16 11
 */
17 12
public class FilesystemCollectorPlugin implements CollectorPlugin {
18 13

  
19
	private static final Log log = LogFactory.getLog(FilesystemCollectorPlugin.class);
20

  
21 14
	@Override
22
	public Iterable<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate) throws CollectorServiceException {
15
	public Iterable<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate)
16
			throws CollectorServiceException {
23 17

  
24 18
		final String baseUrl = interfaceDescriptor.getBaseUrl();
25

  
26 19
		if ((baseUrl == null) || baseUrl.isEmpty()) throw new CollectorServiceException("Param 'baseurl' is null or empty");
20
		return new FilesystemIterable(interfaceDescriptor);
27 21

  
28
		try {
29
			return new FilesystemIterable(interfaceDescriptor);
30
		} catch (URISyntaxException e) {
31
			log.error("Filesystem collector failed! ", e);
32
			return null;
33
		}
34 22
	}
35 23

  
36 24
	@Override
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/main/java/eu/dnetlib/data/collector/plugins/filesystem/FilesystemIterable.java
1 1
package eu.dnetlib.data.collector.plugins.filesystem;
2 2

  
3
import java.io.File;
4
import java.io.FileInputStream;
3 5
import java.io.IOException;
4
import java.net.URISyntaxException;
6
import java.net.MalformedURLException;
7
import java.net.URL;
5 8
import java.util.Iterator;
6
import java.util.concurrent.ArrayBlockingQueue;
7
import java.util.concurrent.BlockingQueue;
8
import java.util.concurrent.ExecutorService;
9
import java.util.concurrent.Executors;
10 9

  
10
import org.apache.commons.io.FileUtils;
11
import org.apache.commons.io.IOUtils;
11 12
import org.apache.commons.logging.Log;
12 13
import org.apache.commons.logging.LogFactory;
13 14

  
14
import eu.dnetlib.data.collector.plugins.filesystem.FileWalker.FileType;
15
import com.google.common.base.Function;
16
import com.google.common.base.Splitter;
17
import com.google.common.collect.Iterables;
18
import com.google.common.collect.Iterators;
19

  
20
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
15 21
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor;
16 22

  
17 23
/**
24
 * The Class FilesystemIterable.
18 25
 * 
19
 * @author andrea
20
 * 
26
 * @author Sandro, Michele
21 27
 */
22 28
public class FilesystemIterable implements Iterable<String> {
23 29

  
30
	/** The Constant log. */
24 31
	private static final Log log = LogFactory.getLog(FilesystemIterable.class);
25 32

  
26
	/** Queue sentinel **/
27
	public static final String done = "DONE";
33
	/** The base dir. */
34
	private File baseDir;
28 35

  
29
	private final BlockingQueue<String> queue;
36
	/** The extensions. */
37
	private String[] extensions;
30 38

  
31
	private final String basePath;
32

  
33
	private ExecutorService producer;
34

  
35
	public FilesystemIterable(final InterfaceDescriptor descriptor) throws URISyntaxException {
36

  
37
		basePath = descriptor.getBaseUrl();
38
		this.producer = Executors.newSingleThreadExecutor();
39
		this.queue = new ArrayBlockingQueue<String>(20);
39
	/**
40
	 * Instantiates a new filesystem iterable.
41
	 * 
42
	 * @param descriptor
43
	 *            the descriptor
44
	 * @throws CollectorServiceException
45
	 *             the collector service exception
46
	 */
47
	public FilesystemIterable(final InterfaceDescriptor descriptor) throws CollectorServiceException {
48
		try {
49
			final String baseUrl = descriptor.getBaseUrl();
50
			URL basePath = new URL(baseUrl);
51
			this.baseDir = new File(basePath.getPath());
52
			if (!baseDir.exists()) throw new CollectorServiceException(String.format("The base ULR %s, does not exist", basePath.getPath()));
53
			this.extensions = Iterables
54
					.toArray(Splitter.on(",").omitEmptyStrings().trimResults().split(descriptor.getParams().get("extensions")), String.class);
55
		} catch (MalformedURLException e) {
56
			throw new CollectorServiceException("Filesystem collector failed! ", e);
57
		}
40 58
	}
41 59

  
60
	/**
61
	 * {@inheritDoc}
62
	 * 
63
	 * @see java.lang.Iterable#iterator()
64
	 */
42 65
	@Override
43 66
	public Iterator<String> iterator() {
44
		doReadFromFTP();
45
		return new FilesystemBlockingStream<String>(queue);
46
	}
67
		return Iterators.transform(FileUtils.iterateFiles(baseDir, extensions, true), new Function<File, String>() {
47 68

  
48
	private void doReadFromFTP() {
49

  
50
		final FileWalker<String> walker = new StringFileWalker(queue, FileType.TEXT, basePath);
51
		producer.execute(new Runnable() {
52

  
53 69
			@Override
54
			public void run() {
70
			public String apply(final File input) {
71
				FileInputStream fileInputStream = null;
55 72
				try {
56
					walker.doWalk();
57
				} catch (IOException e) {
58
					throw new IllegalStateException(e);
73
					fileInputStream = new FileInputStream(input);
74
					String s = IOUtils.toString(fileInputStream);
75
					return s.startsWith("\uFEFF") ? s.substring(1) : s;
76
				} catch (Exception e) {
77
					log.error("Unable to read " + input.getPath());
78
					return null;
79
				} finally {
80
					if (fileInputStream != null) {
81
						try {
82
							fileInputStream.close();
83
						} catch (IOException e) {
84
							log.error("Unable to close inputstream for  " + input.getPath());
85
						}
86
					}
59 87
				}
60
				log.info("finished to iterate under " + basePath);
61 88
			}
62 89
		});
63 90
	}

Also available in: Unified diff