1 |
1 |
package eu.dnetlib.data.collector.plugins.httpfilename;
|
2 |
2 |
|
3 |
|
import java.util.ArrayList;
|
4 |
|
import java.util.Iterator;
|
5 |
|
import java.util.NoSuchElementException;
|
6 |
|
import java.util.Objects;
|
|
3 |
import java.util.*;
|
7 |
4 |
import java.util.concurrent.ArrayBlockingQueue;
|
8 |
5 |
import java.util.concurrent.TimeUnit;
|
9 |
6 |
|
... | ... | |
23 |
20 |
public class HTTPWithFileNameCollectorIterable implements Iterable<String> {
|
24 |
21 |
|
25 |
22 |
private static final Log log = LogFactory.getLog(HTTPWithFileNameCollectorIterable.class);
|
26 |
|
private static final String TERMINATOR = "FINITO";
|
|
23 |
|
27 |
24 |
private static final String JUNK = "<resource><url>%s</url><DOI>JUNK</DOI></resource>";
|
28 |
25 |
public static final String APP_JSON = "application/json";
|
29 |
26 |
public static final String APP_XML = "application/xml";
|
30 |
27 |
public static final String TEXT_HTML = "text/html";
|
31 |
28 |
private final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
|
32 |
29 |
|
33 |
|
private long waitTime = 60L;
|
34 |
30 |
|
35 |
|
private final ArrayList<String> urls = new ArrayList<>();
|
36 |
|
private final ArrayList<String> metas = new ArrayList<String>();
|
|
31 |
|
|
32 |
|
37 |
33 |
private String filterParam;
|
38 |
34 |
|
39 |
35 |
int total = 0;
|
40 |
36 |
int filtered = 0;
|
41 |
37 |
|
42 |
38 |
public HTTPWithFileNameCollectorIterable(String startUrl, String filter){
|
43 |
|
if (!startUrl.isEmpty())
|
44 |
|
urls.add(startUrl);
|
|
39 |
|
45 |
40 |
this.filterParam = filter;
|
46 |
|
Thread ft = new Thread(new FillMetaQueue());
|
|
41 |
Thread ft = new Thread(new FillMetaQueue(startUrl) );
|
47 |
42 |
ft.start();
|
48 |
43 |
}
|
49 |
44 |
|
50 |
45 |
|
51 |
46 |
@Override
|
52 |
47 |
public Iterator<String> iterator() {
|
53 |
|
return new Iterator<String>(){
|
|
48 |
return new HttpWithFileNameCollectorIterator(queue);
|
|
49 |
}
|
54 |
50 |
|
55 |
|
private String last = null;
|
56 |
|
private boolean exec_next = true;
|
|
51 |
private class FillMetaQueue implements Runnable {
|
|
52 |
final Connector c = new Connector();
|
57 |
53 |
|
58 |
|
@Override
|
59 |
|
public boolean hasNext() {
|
60 |
|
if(exec_next){
|
61 |
|
try {
|
62 |
|
last = queue.poll(waitTime, TimeUnit.SECONDS);
|
63 |
|
exec_next = false;
|
64 |
|
}catch(InterruptedException e){
|
65 |
|
log.warn(String.format("could not find elements to consume for more than %s%s", waitTime, TimeUnit.SECONDS));
|
66 |
|
throw new NoSuchElementException(e.getMessage());
|
67 |
|
}
|
68 |
|
}
|
|
54 |
private final List<String> metas = Collections.synchronizedList(new ArrayList<String>());
|
|
55 |
private final List<String> urls = Collections.synchronizedList(new ArrayList<>());
|
69 |
56 |
|
70 |
|
return !(Objects.equals(last, TERMINATOR));
|
|
57 |
public FillMetaQueue(String startUrl){
|
|
58 |
if(!startUrl.isEmpty()){
|
|
59 |
urls.add(startUrl);
|
71 |
60 |
}
|
|
61 |
}
|
72 |
62 |
|
73 |
|
@Override
|
74 |
|
public String next() {
|
75 |
|
exec_next = true;
|
76 |
|
return last;
|
77 |
|
}
|
78 |
63 |
|
79 |
|
};
|
80 |
|
}
|
81 |
|
|
82 |
|
private class FillMetaQueue implements Runnable {
|
83 |
|
|
84 |
|
final Connector c = new Connector();
|
85 |
|
|
86 |
64 |
public void fillQueue() {
|
87 |
65 |
String url;
|
|
66 |
|
88 |
67 |
while((metas.size()>0 || urls.size() > 0 )) {
|
89 |
68 |
log.debug("metas.size() = " + metas.size() + " urls.size() = " + urls.size() + " queue.size() = " +queue.size());
|
90 |
69 |
if (metas.size() > 0) {
|
... | ... | |
99 |
78 |
String ret = c.getResponse();
|
100 |
79 |
if (ret != null && ret.length()>0) {
|
101 |
80 |
if (!containsFilter(ret))
|
102 |
|
queue.offer(addFilePath(ret, url, url.endsWith(".json")), waitTime, TimeUnit.SECONDS);
|
|
81 |
queue.offer(addFilePath(ret, url, url.endsWith(".json")), HttpWithFileNameCollectorIterator.waitTime, TimeUnit.SECONDS);
|
103 |
82 |
else
|
104 |
83 |
filtered++;
|
105 |
84 |
total++;
|
... | ... | |
122 |
101 |
} else if(c.responseTypeContains(APP_JSON) || c.responseTypeContains(APP_XML)){
|
123 |
102 |
try {
|
124 |
103 |
final String element = addFilePath(c.getResponse(), url, c.responseTypeContains(APP_JSON));
|
125 |
|
queue.offer(element, waitTime, TimeUnit.SECONDS);
|
|
104 |
queue.offer(element, HttpWithFileNameCollectorIterator.waitTime, TimeUnit.SECONDS);
|
126 |
105 |
} catch (InterruptedException e) {
|
127 |
106 |
log.info("not inserted in queue element associate to url " + url + " error: " + e.getMessage() );
|
128 |
107 |
}
|
... | ... | |
132 |
111 |
|
133 |
112 |
}
|
134 |
113 |
try {
|
135 |
|
queue.offer(TERMINATOR, waitTime, TimeUnit.SECONDS);
|
|
114 |
queue.offer(HttpWithFileNameCollectorIterator.TERMINATOR, HttpWithFileNameCollectorIterator.waitTime, TimeUnit.SECONDS);
|
136 |
115 |
} catch (InterruptedException e) {
|
137 |
|
throw new IllegalStateException(String.format("could not add element to queue for more than %s%s", waitTime, TimeUnit.SECONDS), e);
|
|
116 |
throw new IllegalStateException(String.format("could not add element to queue for more than %s%s", HttpWithFileNameCollectorIterator.waitTime, TimeUnit.SECONDS), e);
|
138 |
117 |
}
|
139 |
118 |
|
140 |
119 |
}
|
changes in the implementation of the iterator