Project

General

Profile

« Previous | Next » 

Revision 52496

used blocking methods in HTTPWithFileNameCollectorIterable

View differences:

HTTPWithFileNameCollectorIterable.java
1 1
package eu.dnetlib.data.collector.plugins.HTTPWithFileName;
2 2

  
3
import eu.dnetlib.data.collector.plugins.projects.gtr2.Gtr2ProjectsIterable;
3
import java.util.ArrayList;
4
import java.util.Iterator;
5
import java.util.Objects;
6
import java.util.concurrent.ArrayBlockingQueue;
7
import java.util.concurrent.TimeUnit;
8

  
4 9
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
5 10
import org.apache.commons.logging.Log;
6 11
import org.apache.commons.logging.LogFactory;
......
11 16
import org.jsoup.nodes.Element;
12 17
import org.jsoup.select.Elements;
13 18

  
14
import java.util.ArrayList;
15
import java.util.Iterator;
16

  
17
import java.util.concurrent.ArrayBlockingQueue;
18
import java.util.function.Consumer;
19

  
20 19
/**
21 20
 * Created by miriam on 04/05/2018.
22 21
 */
23 22
public class HTTPWithFileNameCollectorIterable implements Iterable<String> {
24 23
    private static final Log log = LogFactory.getLog(HTTPWithFileNameCollectorIterable.class);
25
    private final String TERMINATOR = "FINITO";
26
    final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
24
    private static final String TERMINATOR = "FINITO";
25
    private final ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(100);
27 26

  
27
    private long waitTime = 60L;
28

  
28 29
    private final ArrayList<String> urls = new ArrayList<>();
29 30
    private final ArrayList<String> metas = new ArrayList<String>();
30 31
    private String filterParam;
......
43 44

  
44 45
    @Override
45 46
    public Iterator<String> iterator() {
47
        return new Iterator<String>(){
46 48

  
49
            private String current;
47 50

  
48

  
49
        return new Iterator<String>(){
50

  
51 51
            @Override
52 52
            public boolean hasNext() {
53
                while (queue.isEmpty());
54
                if (queue.peek().equals(TERMINATOR))
55
                    log.info(String.format("Total number of metadata %d, Number of metadata filtered %d", total, filtered));
56
                return !queue.peek().equals(TERMINATOR);
57

  
53
                try {
54
                    current = queue.poll(waitTime, TimeUnit.SECONDS);
55
                } catch (InterruptedException e) {
56
                    log.warn(String.format("could not find elements to consume for more than %s%s", waitTime, TimeUnit.SECONDS));
57
                    return false;
58
                }
59
                return !Objects.equals(current, TERMINATOR);
58 60
            }
59 61

  
60 62
            @Override
61 63
            public String next() {
62
                return queue.poll(); 
64
                return current;
63 65
            }
64 66

  
65 67
        };
66 68
    }
67 69

  
68
    private class FillMetaQueue implements Runnable{
70
    private class FillMetaQueue implements Runnable {
69 71

  
72
        final Connector c = new Connector();
70 73

  
71
        Connector c = new Connector();
72

  
73 74
        public void fillQueue() {
74 75
            String url;
75
            while((metas.size()>0 || urls.size() > 0 )){
76
            while((metas.size()>0 || urls.size() > 0 )) {
76 77
                log.debug("metas.size() = " + metas.size() + " urls.size() = " + urls.size() + " queue.size() = " +queue.size());
77
                if (metas.size() > 0){
78
                if (metas.size() > 0) {
78 79
                    url = metas.remove(0);
79 80
                    try {
80 81
                        c.get(url);
......
86 87
                            String ret = c.getResponse();
87 88
                            if (ret != null && ret.length()>0) {
88 89
                                if (!containsFilter(ret))
89
                                    queue.put(addFilePath(ret, url, url.endsWith(".json")));
90
                                    queue.offer(addFilePath(ret, url, url.endsWith(".json")), waitTime, TimeUnit.SECONDS);
90 91
                                else
91 92
                                    filtered++;
92 93
                                total++;
......
94 95
                        } catch (InterruptedException e) {
95 96
                            log.info("not inserted in queue element associate to url " + url + " error: " + e.getMessage() );
96 97

  
97

  
98 98
                        }
99 99
                    }
100
                }else{
100
                } else {
101 101
                    url = urls.remove(0);
102 102
                    try {
103 103
                        c.get(url);
104 104
                    } catch (CollectorServiceException e) {
105 105
                        log.info("Impossible to collect url: " + url + " error: " + e.getMessage());
106 106
                    }
107
                    if(c.isStatusOk()){
107
                    if(c.isStatusOk()) {
108 108
                        if (c.responseTypeContains("text/html")){
109
                            recurFolder(c.getResponse(),url);
109
                            recurFolder(c.getResponse(), url);
110 110
                        }
111 111
                        else if(c.responseTypeContains("application/json") || c.responseTypeContains("application/xml")){
112 112
                            try {
113
                                queue.put(addFilePath(c.getResponse(),url, c.responseTypeContains("application/json")));
113
                                final String element = addFilePath(c.getResponse(), url, c.responseTypeContains("application/json"));
114
                                queue.offer(element, waitTime, TimeUnit.SECONDS);
114 115
                            } catch (InterruptedException e) {
115 116
                                log.info("not inserted in queue element associate to url " + url + " error: " + e.getMessage() );
116 117
                            }
......
121 122

  
122 123
            }
123 124
            try {
124
                queue.put(TERMINATOR);
125
                queue.offer(TERMINATOR, waitTime, TimeUnit.SECONDS);
125 126
            } catch (InterruptedException e) {
126
                e.printStackTrace();
127
                throw new IllegalStateException(String.format("could not add element to queue for more than %s%s", waitTime, TimeUnit.SECONDS), e);
127 128
            }
128 129

  
129 130
        }
......
145 146
            try {
146 147
                if(isJson)
147 148
                    meta = meta.substring(0, meta.length() - 1) + ",'downloadFileUrl':'" + path.substring(0, path.indexOf(".json")) + ".pdf'}";
148
                else{
149
                else {
149 150

  
150 151
                    if (meta.contains("<!DOCTYPE")) {
151 152
                        meta = meta.substring(meta.indexOf("<!DOCTYPE"));
......
153 154
                    }
154 155
                    int index = meta.lastIndexOf("</");
155 156
                    meta = meta.substring(0, index) + "<downloadFileUrl>" + path.substring(0, path.indexOf(".xml")) + ".pdf</downloadFileUrl>" + meta.substring(index);
156

  
157

  
158 157
                }
159

  
160
            }catch(Exception ex){
158
            } catch(Exception ex) {
161 159
                log.info("not file with extension .json or .xml");
162 160
            }
163 161

  
164 162

  
165
            if(isJson){
163
            if(isJson) {
166 164
                JSONObject jsonobj = null;
167 165
                try {
168 166
                    jsonobj = new JSONObject("{'resource':" + meta + "}");
169 167

  
170 168
                    return XML.toString(jsonobj);
171
                }catch(Exception e){
169
                } catch(Exception e) {
172 170
                    log.fatal("Impossible to transform json object to xml \n" + meta + "\n " + e.getMessage() + "\n" + url);
173 171
                   // throw new RuntimeException();
174 172
                    jsonobj = new JSONObject("{'resource':{'DOI':'JUNK','url':'"+path+"'}}");
......
199 197
        }
200 198
    }
201 199

  
202

  
203

  
204 200
}

Also available in: Unified diff