Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.doiresolver;
2

    
3
import eu.dnetlib.data.collector.plugins.filesystem.FileSystemIterator;
4
import org.apache.commons.lang.StringUtils;
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7

    
8
import java.io.IOException;
9
import java.nio.file.Files;
10
import java.nio.file.Paths;
11
import java.util.Iterator;
12
import java.util.concurrent.ArrayBlockingQueue;
13
import java.util.concurrent.TimeUnit;
14
import java.util.stream.Stream;
15

    
16
public class DOIResolverIterator implements Iterator<String> {
17

    
18
    private static final Log log = LogFactory.getLog(DOIResolverIterator.class);
19

    
20
    protected static final String STARTER = "FIRE";
21
    protected static final String TERMINATOR = "ARNOLD";
22
    protected static final String BAD_TERMINATOR = "BAD";
23
    protected static final String UNRESOLVED = "UNRESOLVED";
24
    protected static long TIMEOUT = 5;
25
    protected static TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS;
26

    
27
    /** Path to the dir that contains the files, each a csv with a list of DOIs, one per line. **/
28
    private String baseDir;
29
    private String fromDate;
30

    
31
    private ArrayBlockingQueue<String> queue;
32

    
33
    private CrossrefResolver crossrefResolver;
34

    
35

    
36
    public DOIResolverIterator(final String baseDir, final CrossrefResolver crossrefResolver, final String fromDate) {
37
        this.baseDir = baseDir;
38
        this.fromDate = fromDate;
39
        this.queue = new ArrayBlockingQueue<>(100);
40
        this.crossrefResolver = crossrefResolver;
41
        init();
42
    }
43

    
44
    private void init(){
45
        log.info("Init");
46

    
47
        new Thread(() -> {
48
            try{
49
                final FileSystemIterator fsi = new FileSystemIterator(baseDir, "csv", fromDate);
50
                // put first item in the queue
51
                if(queue.offer(STARTER)) {
52
                    // read the file, ask the resolvers, put results in a shared queue
53
                    //whatever exceptions, add terminator to the queue
54
                    while (fsi.hasNext()) {
55
                        String filePath = fsi.next();
56
                        try (Stream<String> stream = Files.lines(Paths.get(filePath))) {
57

    
58
                            stream.forEach(doi -> {
59
                                try {
60
                                    String resolved = resolve(doi);
61
                                    if(!resolved.equals(UNRESOLVED)) queue.offer(resolved, TIMEOUT, TIMEOUT_UNIT);
62
                                } catch (InterruptedException e) {
63
                                    log.error("DOI processing aborted, cannot offer resolved doi: "+doi+" . Did the consumer die?");
64
                                    log.error(e);
65
                                    queue.offer(BAD_TERMINATOR);
66
                                }
67
                            });
68

    
69
                        } catch (IOException e) {
70
                            log.error("DOI processing aborted");
71
                            log.error(e);
72
                            queue.offer(BAD_TERMINATOR);
73
                        }
74
                    }
75
                }
76
            } catch (Exception e) {
77
                log.error("DOI processing aborted");
78
                log.error(e);
79
                queue.offer(BAD_TERMINATOR);
80
            }
81
            queue.offer(TERMINATOR);
82
            log.info("Finished processing DOI list");
83
        }
84
        ).start();
85
    }
86

    
87
    private String resolve(final String doi){
88
       log.debug("Resolving "+doi);
89
       log.debug("Crossref...");
90
       String record = crossrefResolver.resolve(cleanDOI(doi));
91
       if(StringUtils.isNotBlank(record)) return record;
92
       else {
93
           //try another resolver
94
           log.debug("Resolver returned blank item");
95
       }
96
       return UNRESOLVED;
97
    }
98

    
99
    /**
100
     * Returns the identifier part of the DOI only.
101
     * @param doi
102
     * @return the DOI
103
     */
104
    protected String cleanDOI(final String doi){
105
       return doi.replace("http://dx.doi.org/", "").replace("https://dx.doi.org/", "");
106
    }
107

    
108
    @Override
109
    public boolean hasNext() {
110
       return doHasNext();
111
    }
112

    
113
    private boolean doHasNext(){
114
        //If I get a null value, the queue is currently empty. so we wait for something
115
        String element = queue.peek();
116
        while(element == null) {
117
            try {
118
                log.debug("Sleeping while waiting for something in the queue");
119
                Thread.sleep(1000);
120
                element = queue.peek();
121
            } catch (InterruptedException e) {
122
                e.printStackTrace();
123
            }
124
        }
125
        log.debug("Found in queue element: "+element);
126
        switch(element){
127
            case TERMINATOR:
128
            case BAD_TERMINATOR:
129
                return false;
130
            case STARTER:
131
            case UNRESOLVED: //although they should not be inserted at all in the queue
132
                queue.poll();
133
                return doHasNext();
134
            default:
135
                return true;
136
        }
137
    }
138

    
139
    @Override
140
    public String next() {
141
        return queue.poll();
142
    }
143

    
144
    public String getBaseDir() {
145
        return baseDir;
146
    }
147

    
148
    public void setBaseDir(String baseDir) {
149
        this.baseDir = baseDir;
150
    }
151

    
152
    public CrossrefResolver getCrossrefResolver() {
153
        return crossrefResolver;
154
    }
155

    
156
    public void setCrossrefResolver(CrossrefResolver crossrefResolver) {
157
        this.crossrefResolver = crossrefResolver;
158
    }
159
}
(3-3/4)