Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.doiresolver;
2

    
3
import eu.dnetlib.data.collector.plugins.utils.JsonUtils;
4
import org.apache.commons.lang.StringUtils;
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7

    
8
import java.io.IOException;
9
import java.nio.file.Files;
10
import java.nio.file.Paths;
11
import java.util.Iterator;
12
import java.util.concurrent.ArrayBlockingQueue;
13

    
14
public class DOIResolverIterator implements Iterator<String> {
15

    
16
    private static final Log log = LogFactory.getLog(DOIResolverIterator.class);
17

    
18
    private static final String STARTER = "FIRE";
19
    private static final String TERMINATOR = "ARNOLD";
20
    private static final String BAD_TERMINATOR = "BAD";
21
    private static final String UNRESOLVED = "UNRESOLVED";
22

    
23
    /** Path to the file that contains a list of DOIs, one per line. **/
24
    private String filePath;
25

    
26
    private ArrayBlockingQueue<String> queue;
27

    
28
    private CrossrefResolver crossrefResolver;
29

    
30

    
31
    public DOIResolverIterator(final String filePath, final CrossrefResolver crossrefResolver) {
32
        this.filePath = filePath;
33
        this.queue = new ArrayBlockingQueue<>(100);
34
        this.crossrefResolver = crossrefResolver;
35
        init();
36
    }
37

    
38
    private void init(){
39
        new Thread(() -> {
40
            // put first item in the queue
41
            if(queue.offer(STARTER)) {
42
                // read the file, ask the resolvers, put results in a shared queue
43
                //whatever exceptions, add terminator to the queue
44
                try{
45
                    Files.lines(Paths.get(filePath)).forEach(doi -> queue.offer(resolve(doi)));
46
                } catch (IOException e) {
47
                    log.error(e);
48
                    queue.offer(BAD_TERMINATOR);
49
                }
50
            }
51
            queue.offer(TERMINATOR);
52

    
53

    
54
        }
55
        ).start();
56
    }
57

    
58
    private String resolve(final String doi){
59
       log.debug("Resolving "+doi);
60
       log.debug("Crossref...");
61
       String record = crossrefResolver.resolve(cleanDOI(doi));
62
       if(StringUtils.isNotBlank(record)) return record;
63
       else {
64
           //try another resolver
65
       }
66
       return UNRESOLVED;
67
    }
68

    
69
    /**
70
     * Returns the identifier part of the DOI only.
71
     * @param doi
72
     * @return the DOI
73
     */
74
    protected String cleanDOI(final String doi){
75
       return doi.replace("http://dx.doi.org/", "").replace("https://dx.doi.org/", "");
76
    }
77

    
78
    @Override
79
    public boolean hasNext() {
80
        //If I get a null value, the queue is currently empty. so we wait for something
81
        if(queue.peek() == null){
82
            try {
83
                Thread.sleep(10);
84
            } catch (InterruptedException e) {
85
                e.printStackTrace();
86
            }
87
            return hasNext();
88
        }
89
       if(queue.peek().equals(TERMINATOR) || queue.peek().equals(BAD_TERMINATOR)){
90
           return false;
91
       }
92
       if(queue.peek().equals(UNRESOLVED) || queue.peek().equals(STARTER)){
93
           queue.poll();
94
           return hasNext();
95
       }
96
        return true;
97

    
98
    }
99

    
100
    @Override
101
    public String next() {
102
        return queue.poll();
103
    }
104

    
105
    public String getFilePath() {
106
        return filePath;
107
    }
108

    
109
    public void setFilePath(String filePath) {
110
        this.filePath = filePath;
111
    }
112

    
113
    public CrossrefResolver getCrossrefResolver() {
114
        return crossrefResolver;
115
    }
116

    
117
    public void setCrossrefResolver(CrossrefResolver crossrefResolver) {
118
        this.crossrefResolver = crossrefResolver;
119
    }
120
}
(3-3/4)