Revision 59095
Added by Alessia Bardi almost 4 years ago
DOIResolverIterator.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.collector.plugins.doiresolver; |
2 | 2 |
|
3 |
import eu.dnetlib.data.collector.plugins.filesystem.FileSystemIterator; |
|
3 | 4 |
import org.apache.commons.lang.StringUtils; |
4 | 5 |
import org.apache.commons.logging.Log; |
5 | 6 |
import org.apache.commons.logging.LogFactory; |
... | ... | |
9 | 10 |
import java.nio.file.Paths; |
10 | 11 |
import java.util.Iterator; |
11 | 12 |
import java.util.concurrent.ArrayBlockingQueue; |
13 |
import java.util.stream.Stream; |
|
12 | 14 |
|
13 | 15 |
public class DOIResolverIterator implements Iterator<String> { |
14 | 16 |
|
... | ... | |
19 | 21 |
private static final String BAD_TERMINATOR = "BAD"; |
20 | 22 |
private static final String UNRESOLVED = "UNRESOLVED"; |
21 | 23 |
|
22 |
/** Path to the file that contains a list of DOIs, one per line. **/ |
|
23 |
private String filePath; |
|
24 |
/** Path to the dir that contains the files, each a csv with a list of DOIs, one per line. **/ |
|
25 |
private String baseDir; |
|
26 |
private String fromDate; |
|
24 | 27 |
|
25 | 28 |
private ArrayBlockingQueue<String> queue; |
26 | 29 |
|
27 | 30 |
private CrossrefResolver crossrefResolver; |
28 | 31 |
|
29 | 32 |
|
30 |
public DOIResolverIterator(final String filePath, final CrossrefResolver crossrefResolver) { |
|
31 |
this.filePath = filePath; |
|
33 |
public DOIResolverIterator(final String baseDir, final CrossrefResolver crossrefResolver, final String fromDate) { |
|
34 |
this.baseDir = baseDir; |
|
35 |
this.fromDate = fromDate; |
|
32 | 36 |
this.queue = new ArrayBlockingQueue<>(100); |
33 | 37 |
this.crossrefResolver = crossrefResolver; |
34 | 38 |
init(); |
... | ... | |
36 | 40 |
|
37 | 41 |
private void init(){ |
38 | 42 |
log.info("Init"); |
43 |
|
|
39 | 44 |
new Thread(() -> { |
40 |
int count = 0; |
|
41 |
// put first item in the queue |
|
42 |
if(queue.offer(STARTER)) { |
|
43 |
// read the file, ask the resolvers, put results in a shared queue |
|
44 |
//whatever exceptions, add terminator to the queue |
|
45 |
try{ |
|
46 |
Files.lines(Paths.get(filePath)).forEach(doi -> queue.offer(resolve(doi))); |
|
47 |
} catch (IOException e) { |
|
48 |
log.error("DOI processing aborted"); |
|
49 |
log.error(e); |
|
50 |
queue.offer(BAD_TERMINATOR); |
|
45 |
try{ |
|
46 |
final FileSystemIterator fsi = new FileSystemIterator(baseDir, "csv", fromDate); |
|
47 |
// put first item in the queue |
|
48 |
if(queue.offer(STARTER)) { |
|
49 |
// read the file, ask the resolvers, put results in a shared queue |
|
50 |
//whatever exceptions, add terminator to the queue |
|
51 |
while (fsi.hasNext()) { |
|
52 |
String filePath = fsi.next(); |
|
53 |
try (Stream<String> stream = Files.lines(Paths.get(filePath))) { |
|
54 |
|
|
55 |
stream.forEach(doi -> queue.offer(resolve(doi))); |
|
56 |
|
|
57 |
} catch (IOException e) { |
|
58 |
log.error("DOI processing aborted"); |
|
59 |
log.error(e); |
|
60 |
queue.offer(BAD_TERMINATOR); |
|
61 |
} |
|
62 |
} |
|
51 | 63 |
} |
64 |
} catch (Exception e) { |
|
65 |
log.error("DOI processing aborted"); |
|
66 |
log.error(e); |
|
67 |
queue.offer(BAD_TERMINATOR); |
|
52 | 68 |
} |
53 | 69 |
queue.offer(TERMINATOR); |
54 | 70 |
log.info("Finished processing DOI list"); |
... | ... | |
107 | 123 |
return queue.poll(); |
108 | 124 |
} |
109 | 125 |
|
110 |
public String getFilePath() {
|
|
111 |
return filePath;
|
|
126 |
public String getBaseDir() {
|
|
127 |
return baseDir;
|
|
112 | 128 |
} |
113 | 129 |
|
114 |
public void setFilePath(String filePath) {
|
|
115 |
this.filePath = filePath;
|
|
130 |
public void setBaseDir(String baseDir) {
|
|
131 |
this.baseDir = baseDir;
|
|
116 | 132 |
} |
117 | 133 |
|
118 | 134 |
public CrossrefResolver getCrossrefResolver() { |
Also available in: Unified diff
DOIResolver plugin now supports multiple csv files in the input folder (baseURL) and incremental