Project

General

Profile

« Previous | Next » 

Revision 52614

changes in the implementation of the iterator

View differences:

modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/httpfilename/HTTPWithFileNameCollectorIterable.java
1 1
package eu.dnetlib.data.collector.plugins.httpfilename;
2 2

  
3
import java.util.ArrayList;
4
import java.util.Iterator;
5
import java.util.NoSuchElementException;
6
import java.util.Objects;
3
import java.util.*;
7 4
import java.util.concurrent.ArrayBlockingQueue;
8 5
import java.util.concurrent.TimeUnit;
9 6

  
......
23 20
public class HTTPWithFileNameCollectorIterable implements Iterable<String> {
24 21

  
25 22
    private static final Log log = LogFactory.getLog(HTTPWithFileNameCollectorIterable.class);
26
    private static final String TERMINATOR = "FINITO";
23

  
27 24
    private static final String JUNK = "<resource><url>%s</url><DOI>JUNK</DOI></resource>";
28 25
    public static final String APP_JSON = "application/json";
29 26
    public static final String APP_XML = "application/xml";
30 27
    public static final String TEXT_HTML = "text/html";
31 28
    private final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
32 29

  
33
    private long waitTime = 60L;
34 30

  
35
    private final ArrayList<String> urls = new ArrayList<>();
36
    private final ArrayList<String> metas = new ArrayList<String>();
31

  
32

  
37 33
    private String filterParam;
38 34

  
39 35
    int total = 0;
40 36
    int filtered = 0;
41 37

  
42 38
    public HTTPWithFileNameCollectorIterable(String startUrl, String filter){
43
        if (!startUrl.isEmpty())
44
            urls.add(startUrl);
39

  
45 40
        this.filterParam = filter;
46
        Thread ft = new Thread(new FillMetaQueue());
41
        Thread ft = new Thread(new FillMetaQueue(startUrl) );
47 42
        ft.start();
48 43
    }
49 44

  
50 45

  
51 46
    @Override
52 47
    public Iterator<String> iterator() {
53
        return new Iterator<String>(){
48
        return new HttpWithFileNameCollectorIterator(queue);
49
    }
54 50

  
55
            private String last = null;
56
            private boolean exec_next = true;
51
    private class FillMetaQueue implements Runnable {
52
        final Connector c = new Connector();
57 53

  
58
            @Override
59
            public boolean hasNext() {
60
                if(exec_next){
61
                    try {
62
                        last = queue.poll(waitTime, TimeUnit.SECONDS);
63
                        exec_next = false;
64
                    }catch(InterruptedException e){
65
                        log.warn(String.format("could not find elements to consume for more than %s%s", waitTime, TimeUnit.SECONDS));
66
                        throw new NoSuchElementException(e.getMessage());
67
                    }
68
                }
54
        private final List<String> metas = Collections.synchronizedList(new ArrayList<String>());
55
        private final List<String> urls = Collections.synchronizedList(new ArrayList<>());
69 56

  
70
                return !(Objects.equals(last, TERMINATOR));
57
        public FillMetaQueue(String startUrl){
58
            if(!startUrl.isEmpty()){
59
                urls.add(startUrl);
71 60
            }
61
        }
72 62

  
73
            @Override
74
            public String next() {
75
                exec_next = true;
76
                return last;
77
            }
78 63

  
79
        };
80
    }
81

  
82
    private class FillMetaQueue implements Runnable {
83

  
84
        final Connector c = new Connector();
85

  
86 64
        public void fillQueue() {
87 65
            String url;
66

  
88 67
            while((metas.size()>0 || urls.size() > 0 )) {
89 68
                log.debug("metas.size() = " + metas.size() + " urls.size() = " + urls.size() + " queue.size() = " +queue.size());
90 69
                if (metas.size() > 0) {
......
99 78
                            String ret = c.getResponse();
100 79
                            if (ret != null && ret.length()>0) {
101 80
                                if (!containsFilter(ret))
102
                                    queue.offer(addFilePath(ret, url, url.endsWith(".json")), waitTime, TimeUnit.SECONDS);
81
                                    queue.offer(addFilePath(ret, url, url.endsWith(".json")), HttpWithFileNameCollectorIterator.waitTime, TimeUnit.SECONDS);
103 82
                                else
104 83
                                    filtered++;
105 84
                                total++;
......
122 101
                        } else if(c.responseTypeContains(APP_JSON) || c.responseTypeContains(APP_XML)){
123 102
                            try {
124 103
                                final String element = addFilePath(c.getResponse(), url, c.responseTypeContains(APP_JSON));
125
                                queue.offer(element, waitTime, TimeUnit.SECONDS);
104
                                queue.offer(element, HttpWithFileNameCollectorIterator.waitTime, TimeUnit.SECONDS);
126 105
                            } catch (InterruptedException e) {
127 106
                                log.info("not inserted in queue element associate to url " + url + " error: " + e.getMessage() );
128 107
                            }
......
132 111

  
133 112
            }
134 113
            try {
135
                queue.offer(TERMINATOR, waitTime, TimeUnit.SECONDS);
114
                queue.offer(HttpWithFileNameCollectorIterator.TERMINATOR, HttpWithFileNameCollectorIterator.waitTime, TimeUnit.SECONDS);
136 115
            } catch (InterruptedException e) {
137
                throw new IllegalStateException(String.format("could not add element to queue for more than %s%s", waitTime, TimeUnit.SECONDS), e);
116
                throw new IllegalStateException(String.format("could not add element to queue for more than %s%s", HttpWithFileNameCollectorIterator.waitTime, TimeUnit.SECONDS), e);
138 117
            }
139 118

  
140 119
        }
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/httpfilename/HttpWithFileNameCollectorIterator.java
1
package eu.dnetlib.data.collector.plugins.httpfilename;
2

  
3
import org.apache.commons.logging.Log;
4
import org.apache.commons.logging.LogFactory;
5

  
6
import java.util.Iterator;
7
import java.util.NoSuchElementException;
8
import java.util.Objects;
9
import java.util.concurrent.ArrayBlockingQueue;
10
import java.util.concurrent.TimeUnit;
11

  
12
/**
13
 * Created by miriam on 25/06/2018.
14
 */
15
public class HttpWithFileNameCollectorIterator implements Iterator<String> {
16
    public static final String TERMINATOR = "FINITO";
17
    private static final Log log = LogFactory.getLog(HttpWithFileNameCollectorIterator.class);
18

  
19
     private final ArrayBlockingQueue<String> queue;
20

  
21
    public static final long waitTime = 60L;
22

  
23
    private String last;
24

  
25
    public HttpWithFileNameCollectorIterator(ArrayBlockingQueue<String> queue) {
26
        this.queue = queue;
27
        extractFromQueue();
28
    }
29

  
30
    @Override
31
        public boolean hasNext() {
32

  
33

  
34
            return !(Objects.equals(last, TERMINATOR) || Objects.equals(last,null));
35
        }
36

  
37
        @Override
38
        public String next() {
39
            try{
40

  
41
                return last;
42

  
43
            }finally{
44
                extractFromQueue();
45
            }
46

  
47
        }
48

  
49
    private void extractFromQueue() {
50
        try {
51
            last = queue.poll(waitTime, TimeUnit.SECONDS);
52
        }catch(InterruptedException e){
53
            log.warn(String.format("could not find elements to consume for more than %s%s", waitTime, TimeUnit.SECONDS));
54
            throw new NoSuchElementException(e.getMessage());
55
        }
56
    }
57

  
58

  
59
}

Also available in: Unified diff