Revision 28806
Added by Sandro La Bruzzo almost 10 years ago
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/main/java/eu/dnetlib/data/collector/plugins/filesystem/StringFileWalker.java | ||
---|---|---|
1 |
package eu.dnetlib.data.collector.plugins.filesystem; |
|
2 |
|
|
3 |
import java.io.File; |
|
4 |
import java.io.FileInputStream; |
|
5 |
import java.io.IOException; |
|
6 |
import java.io.StringWriter; |
|
7 |
import java.util.concurrent.BlockingQueue; |
|
8 |
|
|
9 |
import org.apache.commons.io.IOUtils; |
|
10 |
import org.apache.commons.logging.Log; |
|
11 |
import org.apache.commons.logging.LogFactory; |
|
12 |
|
|
13 |
import com.google.common.collect.Lists; |
|
14 |
|
|
15 |
public class StringFileWalker extends FileWalker<String> { |
|
16 |
|
|
17 |
private static final Log log = LogFactory.getLog(StringFileWalker.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
18 |
|
|
19 |
@Override |
|
20 |
protected void doWalk() throws IOException { |
|
21 |
log.info("starting to iterate " + getType().toString() + " files under " + getSource().getAbsolutePath()); |
|
22 |
|
|
23 |
walk(getSource(), getQueue()); |
|
24 |
enqueue(getQueue(), FilesystemIterable.done); |
|
25 |
} |
|
26 |
|
|
27 |
@Override |
|
28 |
protected String readFile(final File file) throws IOException { |
|
29 |
|
|
30 |
FileInputStream fis = new FileInputStream(file); |
|
31 |
String fileContent = null; |
|
32 |
switch (getType()) { |
|
33 |
case TEXT: |
|
34 |
final StringWriter sw = new StringWriter(); |
|
35 |
IOUtils.copy(fis, sw); |
|
36 |
sw.flush(); |
|
37 |
fileContent = sw.toString(); |
|
38 |
// remove BOM if present |
|
39 |
if (fileContent.startsWith("\uFEFF")) { |
|
40 |
fileContent = fileContent.substring(1); |
|
41 |
} |
|
42 |
break; |
|
43 |
|
|
44 |
default: |
|
45 |
throw new UnsupportedOperationException("FileType should be one of: " + Lists.newArrayList(FileType.values())); |
|
46 |
} |
|
47 |
fis.close(); |
|
48 |
return fileContent; |
|
49 |
} |
|
50 |
|
|
51 |
public StringFileWalker(final BlockingQueue<String> queue, final FileType type, final String basePath) { |
|
52 |
super(queue, type, basePath); |
|
53 |
} |
|
54 |
|
|
55 |
} |
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/main/java/eu/dnetlib/data/collector/plugins/filesystem/FilesystemBlockingStream.java | ||
---|---|---|
1 |
package eu.dnetlib.data.collector.plugins.filesystem; |
|
2 |
|
|
3 |
import java.util.Iterator; |
|
4 |
import java.util.NoSuchElementException; |
|
5 |
import java.util.concurrent.BlockingQueue; |
|
6 |
import java.util.concurrent.TimeUnit; |
|
7 |
|
|
8 |
import org.apache.commons.logging.Log; |
|
9 |
import org.apache.commons.logging.LogFactory; |
|
10 |
|
|
11 |
/** |
|
12 |
* FilesystemBlockingStream fetches elements from a shared queue, which must be populated by a producer. |
|
13 |
* |
|
14 |
* @author andrea |
|
15 |
* |
|
16 |
* @param <T> |
|
17 |
* Type returned by the next() method |
|
18 |
*/ |
|
19 |
public class FilesystemBlockingStream<T> implements Iterator<T> { |
|
20 |
|
|
21 |
/** |
|
22 |
* Logger. |
|
23 |
*/ |
|
24 |
private static final Log log = LogFactory.getLog(FilesystemBlockingStream.class); |
|
25 |
|
|
26 |
/** |
|
27 |
* shared queue. |
|
28 |
*/ |
|
29 |
private BlockingQueue<T> queue; |
|
30 |
|
|
31 |
/** |
|
32 |
* next element to be returned. |
|
33 |
*/ |
|
34 |
private T nextReturn; |
|
35 |
|
|
36 |
/** |
|
37 |
* flag. |
|
38 |
*/ |
|
39 |
private boolean hasNext = true; |
|
40 |
|
|
41 |
/** |
|
42 |
* Builds a BlockingStream. |
|
43 |
* |
|
44 |
* @param queue |
|
45 |
* the shared queue. |
|
46 |
*/ |
|
47 |
public FilesystemBlockingStream(final BlockingQueue<T> queue) { |
|
48 |
super(); |
|
49 |
this.queue = queue; |
|
50 |
} |
|
51 |
|
|
52 |
@Override |
|
53 |
synchronized public boolean hasNext() { |
|
54 |
|
|
55 |
while (nextReturn == null) { |
|
56 |
|
|
57 |
try { |
|
58 |
nextReturn = queue.poll(2, TimeUnit.SECONDS); |
|
59 |
} catch (InterruptedException e) { |
|
60 |
log.error("queue timed out ", e); |
|
61 |
return false; |
|
62 |
} |
|
63 |
|
|
64 |
if ((FilesystemIterable.done == nextReturn)) { |
|
65 |
hasNext = false; |
|
66 |
} |
|
67 |
} |
|
68 |
return hasNext; |
|
69 |
} |
|
70 |
|
|
71 |
@Override |
|
72 |
synchronized public T next() { |
|
73 |
if (!hasNext()) throw new NoSuchElementException(); |
|
74 |
|
|
75 |
T retVal = nextReturn; |
|
76 |
nextReturn = null; |
|
77 |
return retVal; |
|
78 |
} |
|
79 |
|
|
80 |
@Override |
|
81 |
public void remove() { |
|
82 |
throw new UnsupportedOperationException("Not available."); |
|
83 |
} |
|
84 |
|
|
85 |
} |
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/main/java/eu/dnetlib/data/collector/plugins/filesystem/FileWalker.java | ||
---|---|---|
1 |
package eu.dnetlib.data.collector.plugins.filesystem; |
|
2 |
|
|
3 |
import java.io.File; |
|
4 |
import java.io.IOException; |
|
5 |
import java.util.Collection; |
|
6 |
import java.util.concurrent.BlockingQueue; |
|
7 |
|
|
8 |
import org.apache.commons.io.DirectoryWalker; |
|
9 |
import org.apache.commons.io.filefilter.FileFilterUtils; |
|
10 |
import org.apache.commons.io.filefilter.HiddenFileFilter; |
|
11 |
import org.apache.commons.io.filefilter.IOFileFilter; |
|
12 |
import org.apache.commons.io.filefilter.SuffixFileFilter; |
|
13 |
import org.apache.commons.logging.Log; |
|
14 |
import org.apache.commons.logging.LogFactory; |
|
15 |
|
|
16 |
/** |
|
17 |
* FileWalker runs recursively under a directory structure starting from a given path and for each file reads it's content and puts it in a |
|
18 |
* shared queue. |
|
19 |
* |
|
20 |
* Acts as a producer. |
|
21 |
* |
|
22 |
* @author claudio |
|
23 |
* |
|
24 |
* @param <T> |
|
25 |
* Type of expected content to extract from files NB. Generally strings. |
|
26 |
*/ |
|
27 |
public abstract class FileWalker<T> extends DirectoryWalker<T> { |
|
28 |
|
|
29 |
/** |
|
30 |
* Logger. |
|
31 |
*/ |
|
32 |
private static final Log log = LogFactory.getLog(FileWalker.class); |
|
33 |
|
|
34 |
/** |
|
35 |
* Shared queue. |
|
36 |
*/ |
|
37 |
private final BlockingQueue<T> queue; |
|
38 |
|
|
39 |
/** |
|
40 |
* Reference to the starting directory |
|
41 |
*/ |
|
42 |
private final File source; |
|
43 |
|
|
44 |
/** |
|
45 |
* specifies the file type; |
|
46 |
*/ |
|
47 |
private final FileType type; |
|
48 |
|
|
49 |
public enum FileType { |
|
50 |
TEXT, PDF, DOC |
|
51 |
} |
|
52 |
|
|
53 |
public static String IGNORE_PREFIX = "."; |
|
54 |
|
|
55 |
public static String IGNORE_SUFFIX = "~"; |
|
56 |
|
|
57 |
static IOFileFilter fileFilter = FileFilterUtils.notFileFilter(FileFilterUtils.or(new SuffixFileFilter("~"), HiddenFileFilter.HIDDEN)); |
|
58 |
|
|
59 |
/** |
|
60 |
* Builds a FileWalker. |
|
61 |
* |
|
62 |
* @param sourcePath |
|
63 |
* the |
|
64 |
* @param queue |
|
65 |
*/ |
|
66 |
public FileWalker(final BlockingQueue<T> queue, final FileType type, final String basePath) { |
|
67 |
super(fileFilter, -1); |
|
68 |
this.source = new File(basePath); |
|
69 |
this.type = type; |
|
70 |
this.queue = queue; |
|
71 |
} |
|
72 |
|
|
73 |
@Override |
|
74 |
protected void handleFile(final File file, final int depth, final Collection<T> results) throws IOException { |
|
75 |
enqueue((BlockingQueue<T>) results, readFile(file)); |
|
76 |
} |
|
77 |
|
|
78 |
@Override |
|
79 |
protected boolean handleDirectory(final File directory, final int depth, final Collection<T> results) throws IOException { |
|
80 |
if (directory.getName().startsWith(IGNORE_PREFIX)) return false; |
|
81 |
return super.handleDirectory(directory, depth, results); |
|
82 |
} |
|
83 |
|
|
84 |
/* Helpers */ |
|
85 |
/** |
|
86 |
* given a file, return its content as a T |
|
87 |
* |
|
88 |
* @param file |
|
89 |
* the source |
|
90 |
* @return the file content as an object of type T |
|
91 |
* @throws IOException |
|
92 |
*/ |
|
93 |
protected abstract T readFile(final File file) throws IOException; |
|
94 |
|
|
95 |
/** |
|
96 |
* Wrapper method, starts the walk and when it's done adds the flag to the queue. |
|
97 |
*/ |
|
98 |
protected abstract void doWalk() throws IOException; |
|
99 |
|
|
100 |
/** |
|
101 |
* Adds the element to the queue |
|
102 |
*/ |
|
103 |
protected void enqueue(final BlockingQueue<T> queue, final T element) { |
|
104 |
try { |
|
105 |
queue.put(element); |
|
106 |
} catch (InterruptedException e) { |
|
107 |
log.warn("ops... ", e); |
|
108 |
} |
|
109 |
} |
|
110 |
|
|
111 |
public BlockingQueue<T> getQueue() { |
|
112 |
return queue; |
|
113 |
} |
|
114 |
|
|
115 |
public File getSource() { |
|
116 |
return source; |
|
117 |
} |
|
118 |
|
|
119 |
public FileType getType() { |
|
120 |
return type; |
|
121 |
} |
|
122 |
|
|
123 |
} |
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/test/java/eu/dnetlib/data/collector/plugins/oai/OaiCollectorPluginRealTest.java | ||
---|---|---|
9 | 9 |
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor; |
10 | 10 |
|
11 | 11 |
public class OaiCollectorPluginRealTest { |
12 |
|
|
12 |
|
|
13 | 13 |
private OaiCollectorPlugin oai; |
14 | 14 |
|
15 | 15 |
private static final String BASE_URL = "http://oai.d.efg.research-infrastructures.eu/oai.do"; |
... | ... | |
32 | 32 |
iface.setParams(new HashMap<String, String>()); |
33 | 33 |
iface.getParams().put("format", FORMAT); |
34 | 34 |
iface.getParams().put("set", SETS); |
35 |
|
|
35 |
|
|
36 | 36 |
int count = 0; |
37 |
for(String s : oai.collect(iface, null, null)) { |
|
37 |
for (String s : oai.collect(iface, null, null)) {
|
|
38 | 38 |
count++; |
39 | 39 |
} |
40 | 40 |
System.out.println("TOTAL: " + count); |
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/test/java/eu/dnetlib/data/collector/fileSystem/FileSystemCollectorPluginTest.java | ||
---|---|---|
1 |
package eu.dnetlib.data.collector.fileSystem; |
|
2 |
|
|
3 |
import java.util.HashMap; |
|
4 |
|
|
5 |
import org.junit.Assert; |
|
6 |
import org.junit.Ignore; |
|
7 |
import org.junit.Test; |
|
8 |
|
|
9 |
import eu.dnetlib.data.collector.plugins.filesystem.FilesystemCollectorPlugin; |
|
10 |
import eu.dnetlib.data.collector.rmi.CollectorServiceException; |
|
11 |
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor; |
|
12 |
|
|
13 |
public class FileSystemCollectorPluginTest { |
|
14 |
|
|
15 |
@Ignore |
|
16 |
@Test |
|
17 |
public void test() throws CollectorServiceException { |
|
18 |
InterfaceDescriptor descr = new InterfaceDescriptor(); |
|
19 |
HashMap<String, String> params = new HashMap<String, String>(); |
|
20 |
params.put("extensions", "suca, xml"); |
|
21 |
descr.setBaseUrl("file:///home/sandro/cordaTEST"); |
|
22 |
descr.setParams(params); |
|
23 |
|
|
24 |
FilesystemCollectorPlugin plugin = new FilesystemCollectorPlugin(); |
|
25 |
Iterable<String> result = plugin.collect(descr, null, null); |
|
26 |
int i = 0; |
|
27 |
for (String s : result) { |
|
28 |
i++; |
|
29 |
Assert.assertNotNull(s); |
|
30 |
} |
|
31 |
System.out.println("Total " + i); |
|
32 |
|
|
33 |
} |
|
34 |
|
|
35 |
} |
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/main/java/eu/dnetlib/data/collector/plugins/filesystem/FilesystemCollectorPlugin.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.collector.plugins.filesystem; |
2 | 2 |
|
3 |
import java.net.URISyntaxException; |
|
4 |
|
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
7 |
|
|
8 | 3 |
import eu.dnetlib.data.collector.plugin.CollectorPlugin; |
9 | 4 |
import eu.dnetlib.data.collector.rmi.CollectorServiceException; |
10 | 5 |
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor; |
... | ... | |
16 | 11 |
*/ |
17 | 12 |
public class FilesystemCollectorPlugin implements CollectorPlugin { |
18 | 13 |
|
19 |
private static final Log log = LogFactory.getLog(FilesystemCollectorPlugin.class); |
|
20 |
|
|
21 | 14 |
@Override |
22 |
public Iterable<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate) throws CollectorServiceException { |
|
15 |
public Iterable<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate) |
|
16 |
throws CollectorServiceException { |
|
23 | 17 |
|
24 | 18 |
final String baseUrl = interfaceDescriptor.getBaseUrl(); |
25 |
|
|
26 | 19 |
if ((baseUrl == null) || baseUrl.isEmpty()) throw new CollectorServiceException("Param 'baseurl' is null or empty"); |
20 |
return new FilesystemIterable(interfaceDescriptor); |
|
27 | 21 |
|
28 |
try { |
|
29 |
return new FilesystemIterable(interfaceDescriptor); |
|
30 |
} catch (URISyntaxException e) { |
|
31 |
log.error("Filesystem collector failed! ", e); |
|
32 |
return null; |
|
33 |
} |
|
34 | 22 |
} |
35 | 23 |
|
36 | 24 |
@Override |
modules/dnet-modular-collector-service/branches/NewCSVPlugin/src/main/java/eu/dnetlib/data/collector/plugins/filesystem/FilesystemIterable.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.collector.plugins.filesystem; |
2 | 2 |
|
3 |
import java.io.File; |
|
4 |
import java.io.FileInputStream; |
|
3 | 5 |
import java.io.IOException; |
4 |
import java.net.URISyntaxException; |
|
6 |
import java.net.MalformedURLException; |
|
7 |
import java.net.URL; |
|
5 | 8 |
import java.util.Iterator; |
6 |
import java.util.concurrent.ArrayBlockingQueue; |
|
7 |
import java.util.concurrent.BlockingQueue; |
|
8 |
import java.util.concurrent.ExecutorService; |
|
9 |
import java.util.concurrent.Executors; |
|
10 | 9 |
|
10 |
import org.apache.commons.io.FileUtils; |
|
11 |
import org.apache.commons.io.IOUtils; |
|
11 | 12 |
import org.apache.commons.logging.Log; |
12 | 13 |
import org.apache.commons.logging.LogFactory; |
13 | 14 |
|
14 |
import eu.dnetlib.data.collector.plugins.filesystem.FileWalker.FileType; |
|
15 |
import com.google.common.base.Function; |
|
16 |
import com.google.common.base.Splitter; |
|
17 |
import com.google.common.collect.Iterables; |
|
18 |
import com.google.common.collect.Iterators; |
|
19 |
|
|
20 |
import eu.dnetlib.data.collector.rmi.CollectorServiceException; |
|
15 | 21 |
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor; |
16 | 22 |
|
17 | 23 |
/** |
24 |
* The Class FilesystemIterable. |
|
18 | 25 |
* |
19 |
* @author andrea |
|
20 |
* |
|
26 |
* @author Sandro, Michele |
|
21 | 27 |
*/ |
22 | 28 |
public class FilesystemIterable implements Iterable<String> { |
23 | 29 |
|
30 |
/** The Constant log. */ |
|
24 | 31 |
private static final Log log = LogFactory.getLog(FilesystemIterable.class); |
25 | 32 |
|
26 |
/** Queue sentinel **/
|
|
27 |
public static final String done = "DONE";
|
|
33 |
/** The base dir. */
|
|
34 |
private File baseDir;
|
|
28 | 35 |
|
29 |
private final BlockingQueue<String> queue; |
|
36 |
/** The extensions. */ |
|
37 |
private String[] extensions; |
|
30 | 38 |
|
31 |
private final String basePath; |
|
32 |
|
|
33 |
private ExecutorService producer; |
|
34 |
|
|
35 |
public FilesystemIterable(final InterfaceDescriptor descriptor) throws URISyntaxException { |
|
36 |
|
|
37 |
basePath = descriptor.getBaseUrl(); |
|
38 |
this.producer = Executors.newSingleThreadExecutor(); |
|
39 |
this.queue = new ArrayBlockingQueue<String>(20); |
|
39 |
/** |
|
40 |
* Instantiates a new filesystem iterable. |
|
41 |
* |
|
42 |
* @param descriptor |
|
43 |
* the descriptor |
|
44 |
* @throws CollectorServiceException |
|
45 |
* the collector service exception |
|
46 |
*/ |
|
47 |
public FilesystemIterable(final InterfaceDescriptor descriptor) throws CollectorServiceException { |
|
48 |
try { |
|
49 |
final String baseUrl = descriptor.getBaseUrl(); |
|
50 |
URL basePath = new URL(baseUrl); |
|
51 |
this.baseDir = new File(basePath.getPath()); |
|
52 |
if (!baseDir.exists()) throw new CollectorServiceException(String.format("The base ULR %s, does not exist", basePath.getPath())); |
|
53 |
this.extensions = Iterables |
|
54 |
.toArray(Splitter.on(",").omitEmptyStrings().trimResults().split(descriptor.getParams().get("extensions")), String.class); |
|
55 |
} catch (MalformedURLException e) { |
|
56 |
throw new CollectorServiceException("Filesystem collector failed! ", e); |
|
57 |
} |
|
40 | 58 |
} |
41 | 59 |
|
60 |
/** |
|
61 |
* {@inheritDoc} |
|
62 |
* |
|
63 |
* @see java.lang.Iterable#iterator() |
|
64 |
*/ |
|
42 | 65 |
@Override |
43 | 66 |
public Iterator<String> iterator() { |
44 |
doReadFromFTP(); |
|
45 |
return new FilesystemBlockingStream<String>(queue); |
|
46 |
} |
|
67 |
return Iterators.transform(FileUtils.iterateFiles(baseDir, extensions, true), new Function<File, String>() { |
|
47 | 68 |
|
48 |
private void doReadFromFTP() { |
|
49 |
|
|
50 |
final FileWalker<String> walker = new StringFileWalker(queue, FileType.TEXT, basePath); |
|
51 |
producer.execute(new Runnable() { |
|
52 |
|
|
53 | 69 |
@Override |
54 |
public void run() { |
|
70 |
public String apply(final File input) { |
|
71 |
FileInputStream fileInputStream = null; |
|
55 | 72 |
try { |
56 |
walker.doWalk(); |
|
57 |
} catch (IOException e) { |
|
58 |
throw new IllegalStateException(e); |
|
73 |
fileInputStream = new FileInputStream(input); |
|
74 |
String s = IOUtils.toString(fileInputStream); |
|
75 |
return s.startsWith("\uFEFF") ? s.substring(1) : s; |
|
76 |
} catch (Exception e) { |
|
77 |
log.error("Unable to read " + input.getPath()); |
|
78 |
return null; |
|
79 |
} finally { |
|
80 |
if (fileInputStream != null) { |
|
81 |
try { |
|
82 |
fileInputStream.close(); |
|
83 |
} catch (IOException e) { |
|
84 |
log.error("Unable to close inputstream for " + input.getPath()); |
|
85 |
} |
|
86 |
} |
|
59 | 87 |
} |
60 |
log.info("finished to iterate under " + basePath); |
|
61 | 88 |
} |
62 | 89 |
}); |
63 | 90 |
} |
Also available in: Unified diff
Implemented a new version of the collector plugin for the File system