Project

General

Profile

« Previous | Next » 

Revision 59095

DOIResolver plugin now supports multiple csv files in the input folder (baseURL) and incremental

View differences:

DOIResolverIterator.java
1 1
package eu.dnetlib.data.collector.plugins.doiresolver;
2 2

  
3
import eu.dnetlib.data.collector.plugins.filesystem.FileSystemIterator;
3 4
import org.apache.commons.lang.StringUtils;
4 5
import org.apache.commons.logging.Log;
5 6
import org.apache.commons.logging.LogFactory;
......
9 10
import java.nio.file.Paths;
10 11
import java.util.Iterator;
11 12
import java.util.concurrent.ArrayBlockingQueue;
13
import java.util.stream.Stream;
12 14

  
13 15
public class DOIResolverIterator implements Iterator<String> {
14 16

  
......
19 21
    private static final String BAD_TERMINATOR = "BAD";
20 22
    private static final String UNRESOLVED = "UNRESOLVED";
21 23

  
22
    /** Path to the file that contains a list of DOIs, one per line. **/
23
    private String filePath;
24
    /** Path to the dir that contains the files, each a csv with a list of DOIs, one per line. **/
25
    private String baseDir;
26
    private String fromDate;
24 27

  
25 28
    private ArrayBlockingQueue<String> queue;
26 29

  
27 30
    private CrossrefResolver crossrefResolver;
28 31

  
29 32

  
30
    public DOIResolverIterator(final String filePath, final CrossrefResolver crossrefResolver) {
31
        this.filePath = filePath;
33
    public DOIResolverIterator(final String baseDir, final CrossrefResolver crossrefResolver, final String fromDate) {
34
        this.baseDir = baseDir;
35
        this.fromDate = fromDate;
32 36
        this.queue = new ArrayBlockingQueue<>(100);
33 37
        this.crossrefResolver = crossrefResolver;
34 38
        init();
......
36 40

  
37 41
    private void init(){
38 42
        log.info("Init");
43

  
39 44
        new Thread(() -> {
40
            int count = 0;
41
            // put first item in the queue
42
            if(queue.offer(STARTER)) {
43
                // read the file, ask the resolvers, put results in a shared queue
44
                //whatever exceptions, add terminator to the queue
45
                try{
46
                    Files.lines(Paths.get(filePath)).forEach(doi -> queue.offer(resolve(doi)));
47
                } catch (IOException e) {
48
                    log.error("DOI processing aborted");
49
                    log.error(e);
50
                    queue.offer(BAD_TERMINATOR);
45
            try{
46
                final FileSystemIterator fsi = new FileSystemIterator(baseDir, "csv", fromDate);
47
                // put first item in the queue
48
                if(queue.offer(STARTER)) {
49
                    // read the file, ask the resolvers, put results in a shared queue
50
                    //whatever exceptions, add terminator to the queue
51
                    while (fsi.hasNext()) {
52
                        String filePath = fsi.next();
53
                        try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
54

  
55
                            stream.forEach(doi -> queue.offer(resolve(doi)));
56

  
57
                        } catch (IOException e) {
58
                            log.error("DOI processing aborted");
59
                            log.error(e);
60
                            queue.offer(BAD_TERMINATOR);
61
                        }
62
                    }
51 63
                }
64
            } catch (Exception e) {
65
                log.error("DOI processing aborted");
66
                log.error(e);
67
                queue.offer(BAD_TERMINATOR);
52 68
            }
53 69
            queue.offer(TERMINATOR);
54 70
            log.info("Finished processing DOI list");
......
107 123
        return queue.poll();
108 124
    }
109 125

  
110
    public String getFilePath() {
111
        return filePath;
126
    public String getBaseDir() {
127
        return baseDir;
112 128
    }
113 129

  
114
    public void setFilePath(String filePath) {
115
        this.filePath = filePath;
130
    public void setBaseDir(String baseDir) {
131
        this.baseDir = baseDir;
116 132
    }
117 133

  
118 134
    public CrossrefResolver getCrossrefResolver() {

Also available in: Unified diff