Revision 59307
Added by Alessia Bardi over 3 years ago
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/doiresolver/DOIResolverIterator.java | ||
---|---|---|
10 | 10 |
import java.nio.file.Paths; |
11 | 11 |
import java.util.Iterator; |
12 | 12 |
import java.util.concurrent.ArrayBlockingQueue; |
13 |
import java.util.concurrent.TimeUnit; |
|
13 | 14 |
import java.util.stream.Stream; |
14 | 15 |
|
15 | 16 |
public class DOIResolverIterator implements Iterator<String> { |
16 | 17 |
|
17 | 18 |
private static final Log log = LogFactory.getLog(DOIResolverIterator.class); |
18 | 19 |
|
19 |
private static final String STARTER = "FIRE"; |
|
20 |
private static final String TERMINATOR = "ARNOLD"; |
|
21 |
private static final String BAD_TERMINATOR = "BAD"; |
|
22 |
private static final String UNRESOLVED = "UNRESOLVED"; |
|
20 |
protected static final String STARTER = "FIRE"; |
|
21 |
protected static final String TERMINATOR = "ARNOLD"; |
|
22 |
protected static final String BAD_TERMINATOR = "BAD"; |
|
23 |
protected static final String UNRESOLVED = "UNRESOLVED"; |
|
24 |
protected static long TIMEOUT = 5; |
|
25 |
protected static TimeUnit TIMEOUT_UNIT = TimeUnit.SECONDS; |
|
23 | 26 |
|
24 | 27 |
/** Path to the dir that contains the files, each a csv with a list of DOIs, one per line. **/ |
25 | 28 |
private String baseDir; |
... | ... | |
52 | 55 |
String filePath = fsi.next(); |
53 | 56 |
try (Stream<String> stream = Files.lines(Paths.get(filePath))) { |
54 | 57 |
|
55 |
stream.forEach(doi -> queue.offer(resolve(doi))); |
|
58 |
stream.forEach(doi -> { |
|
59 |
try { |
|
60 |
String resolved = resolve(doi); |
|
61 |
if(!resolved.equals(UNRESOLVED)) queue.offer(resolved, TIMEOUT, TIMEOUT_UNIT); |
|
62 |
} catch (InterruptedException e) { |
|
63 |
log.error("DOI processing aborted, cannot offer resolved doi: "+doi+" . Did the consumer die?"); |
|
64 |
log.error(e); |
|
65 |
queue.offer(BAD_TERMINATOR); |
|
66 |
} |
|
67 |
}); |
|
56 | 68 |
|
57 | 69 |
} catch (IOException e) { |
58 | 70 |
log.error("DOI processing aborted"); |
... | ... | |
95 | 107 |
|
96 | 108 |
@Override |
97 | 109 |
public boolean hasNext() { |
110 |
return doHasNext(); |
|
111 |
} |
|
112 |
|
|
113 |
private boolean doHasNext(){ |
|
98 | 114 |
//If I get a null value, the queue is currently empty. so we wait for something |
99 |
if(queue.peek() == null){ |
|
115 |
String element = queue.peek(); |
|
116 |
while(element == null) { |
|
100 | 117 |
try { |
101 |
log.debug("Sleeping 10 ms while waiting for something in the queue"); |
|
102 |
Thread.sleep(10); |
|
118 |
log.debug("Sleeping while waiting for something in the queue"); |
|
119 |
Thread.sleep(1000); |
|
120 |
element = queue.peek(); |
|
103 | 121 |
} catch (InterruptedException e) { |
104 | 122 |
e.printStackTrace(); |
105 | 123 |
} |
106 |
return hasNext(); |
|
107 | 124 |
} |
108 |
String element = queue.peek(); |
|
109 | 125 |
log.debug("Found in queue element: "+element); |
110 |
if(element.equals(TERMINATOR) || element.equals(BAD_TERMINATOR)){ |
|
111 |
return false; |
|
112 |
} |
|
113 |
if(element.equals(UNRESOLVED) || element.equals(STARTER)){ |
|
114 |
queue.poll(); |
|
115 |
return hasNext(); |
|
116 |
} |
|
117 |
return true; |
|
118 |
|
|
126 |
switch(element){ |
|
127 |
case TERMINATOR: |
|
128 |
case BAD_TERMINATOR: |
|
129 |
return false; |
|
130 |
case STARTER: |
|
131 |
case UNRESOLVED: //although they should not be inserted at all in the queue |
|
132 |
queue.poll(); |
|
133 |
return doHasNext(); |
|
134 |
default: |
|
135 |
return true; |
|
136 |
} |
|
119 | 137 |
} |
120 | 138 |
|
121 | 139 |
@Override |
Also available in: Unified diff
This should fix the problem of stackoverflow on the recursive hasNext