Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.doiresolver;
2

    
3
import eu.dnetlib.data.collector.plugins.filesystem.FileSystemIterator;
4
import org.apache.commons.lang.StringUtils;
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7

    
8
import java.io.IOException;
9
import java.nio.file.Files;
10
import java.nio.file.Paths;
11
import java.util.Iterator;
12
import java.util.concurrent.ArrayBlockingQueue;
13
import java.util.stream.Stream;
14

    
15
public class DOIResolverIterator implements Iterator<String> {
16

    
17
    private static final Log log = LogFactory.getLog(DOIResolverIterator.class);
18

    
19
    private static final String STARTER = "FIRE";
20
    private static final String TERMINATOR = "ARNOLD";
21
    private static final String BAD_TERMINATOR = "BAD";
22
    private static final String UNRESOLVED = "UNRESOLVED";
23

    
24
    /** Path to the dir that contains the files, each a csv with a list of DOIs, one per line. **/
25
    private String baseDir;
26
    private String fromDate;
27

    
28
    private ArrayBlockingQueue<String> queue;
29

    
30
    private CrossrefResolver crossrefResolver;
31

    
32

    
33
    public DOIResolverIterator(final String baseDir, final CrossrefResolver crossrefResolver, final String fromDate) {
34
        this.baseDir = baseDir;
35
        this.fromDate = fromDate;
36
        this.queue = new ArrayBlockingQueue<>(100);
37
        this.crossrefResolver = crossrefResolver;
38
        init();
39
    }
40

    
41
    private void init(){
42
        log.info("Init");
43

    
44
        new Thread(() -> {
45
            try{
46
                final FileSystemIterator fsi = new FileSystemIterator(baseDir, "csv", fromDate);
47
                // put first item in the queue
48
                if(queue.offer(STARTER)) {
49
                    // read the file, ask the resolvers, put results in a shared queue
50
                    //whatever exceptions, add terminator to the queue
51
                    while (fsi.hasNext()) {
52
                        String filePath = fsi.next();
53
                        try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
54

    
55
                            stream.forEach(doi -> queue.offer(resolve(doi)));
56

    
57
                        } catch (IOException e) {
58
                            log.error("DOI processing aborted");
59
                            log.error(e);
60
                            queue.offer(BAD_TERMINATOR);
61
                        }
62
                    }
63
                }
64
            } catch (Exception e) {
65
                log.error("DOI processing aborted");
66
                log.error(e);
67
                queue.offer(BAD_TERMINATOR);
68
            }
69
            queue.offer(TERMINATOR);
70
            log.info("Finished processing DOI list");
71
        }
72
        ).start();
73
    }
74

    
75
    private String resolve(final String doi){
76
       log.debug("Resolving "+doi);
77
       log.debug("Crossref...");
78
       String record = crossrefResolver.resolve(cleanDOI(doi));
79
       if(StringUtils.isNotBlank(record)) return record;
80
       else {
81
           //try another resolver
82
           log.debug("Resolver returned blank item");
83
       }
84
       return UNRESOLVED;
85
    }
86

    
87
    /**
88
     * Returns the identifier part of the DOI only.
89
     * @param doi
90
     * @return the DOI
91
     */
92
    protected String cleanDOI(final String doi){
93
       return doi.replace("http://dx.doi.org/", "").replace("https://dx.doi.org/", "");
94
    }
95

    
96
    @Override
97
    public boolean hasNext() {
98
        //If I get a null value, the queue is currently empty. so we wait for something
99
        if(queue.peek() == null){
100
            try {
101
                log.debug("Sleeping 10 ms while waiting for something in the queue");
102
                Thread.sleep(10);
103
            } catch (InterruptedException e) {
104
                e.printStackTrace();
105
            }
106
            return hasNext();
107
        }
108
        String element = queue.peek();
109
        log.debug("Found in queue element: "+element);
110
       if(element.equals(TERMINATOR) || element.equals(BAD_TERMINATOR)){
111
           return false;
112
       }
113
       if(element.equals(UNRESOLVED) || element.equals(STARTER)){
114
           queue.poll();
115
           return hasNext();
116
       }
117
        return true;
118

    
119
    }
120

    
121
    @Override
122
    public String next() {
123
        return queue.poll();
124
    }
125

    
126
    public String getBaseDir() {
127
        return baseDir;
128
    }
129

    
130
    public void setBaseDir(String baseDir) {
131
        this.baseDir = baseDir;
132
    }
133

    
134
    public CrossrefResolver getCrossrefResolver() {
135
        return crossrefResolver;
136
    }
137

    
138
    public void setCrossrefResolver(CrossrefResolver crossrefResolver) {
139
        this.crossrefResolver = crossrefResolver;
140
    }
141
}
(3-3/4)