Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.datacite;
2

    
3

    
4
import java.io.ByteArrayOutputStream;
5
import java.io.IOException;
6
import java.net.URL;
7
import java.util.ArrayDeque;
8
import java.util.Iterator;
9
import java.util.Objects;
10
import java.util.Queue;
11
import java.util.zip.DataFormatException;
12
import java.util.zip.Inflater;
13

    
14
import com.google.gson.Gson;
15
import com.google.gson.GsonBuilder;
16
import eu.dnetlib.data.collector.plugins.datacite.schema.DataciteSchema;
17
import eu.dnetlib.data.collector.plugins.datacite.schema.Result;
18
import org.apache.commons.codec.binary.Base64;
19
import org.apache.commons.io.IOUtils;
20
import org.apache.commons.lang3.StringUtils;
21
import org.apache.commons.logging.Log;
22
import org.apache.commons.logging.LogFactory;
23

    
24
public class DataciteESIterator implements Iterator<String> {
25

    
26
    private static final Log log = LogFactory.getLog(DataciteESIterator.class);
27

    
28
    private final long timestamp;
29

    
30
    private String scrollId;
31

    
32
    private Queue<String> currentPage;
33

    
34
    private final Gson g =  new GsonBuilder().create();
35

    
36
    private String baseURL;
37

    
38
    private static final String START_PATH = "new_scan";
39
    private static final String NEXT_PATH = "scan/%s";
40

    
41

    
42
    public DataciteESIterator(long timestamp, String baseUrl) throws Exception {
43
        this.timestamp = timestamp;
44
        this.baseURL = baseUrl;
45
        currentPage = new ArrayDeque<>();
46
        startRequest();
47
    }
48

    
49
    private static String decompression(final Result r) {
50
        try {
51
            byte[] byteArray = Base64.decodeBase64(r.getBody().getBytes());
52
            Inflater decompresser = new Inflater();
53
            decompresser.setInput(byteArray);
54
            ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length);
55
            byte[] buffer = new byte[8192];
56
            while (!decompresser.finished()) {
57
                int size = decompresser.inflate(buffer);
58
                bos.write(buffer, 0, size);
59
            }
60
            byte[] unzippeddata = bos.toByteArray();
61
            decompresser.end();
62

    
63
            return new String(unzippeddata);
64
        } catch (DataFormatException e) {
65
            log.warn("Exception when decompressing: "+e.getMessage());
66
            return null;
67
        }
68

    
69
    }
70

    
71
    private void fillQueue(final String hits) {
72
        if (StringUtils.isBlank(hits) || "[]".equalsIgnoreCase(hits.trim()))
73
            return;
74
        try {
75
            DataciteSchema datacitepage = g.fromJson(hits, DataciteSchema.class);
76
            this.scrollId = datacitepage.getScrollId();
77
            datacitepage.getResult().stream().map(DataciteESIterator::decompression).filter(Objects::nonNull).forEach(this.currentPage::add);
78
        } catch (Throwable e) {
79
            System.out.println(hits);
80
            e.printStackTrace();
81
        }
82
    }
83

    
84
    private void startRequest() throws Exception {
85
        String url = baseURL+"/"+START_PATH;
86
        final URL startUrl = new URL(timestamp >0 ? url + "?timestamp="+timestamp : url);
87
        fillQueue(IOUtils.toString(startUrl.openStream()));
88
    }
89

    
90
    private void getNextPage() throws IOException {
91
        String url = baseURL+"/"+NEXT_PATH;
92
        final URL startUrl = new URL(String.format(url,scrollId));
93
        fillQueue(IOUtils.toString(startUrl.openStream()));
94
    }
95

    
96

    
97
    @Override
98
    public boolean hasNext() {
99
        return  currentPage.size() >0;
100
    }
101

    
102
    @Override
103
    public String next() {
104

    
105
        if (currentPage.size() == 0) {
106

    
107
            return null;
108
        }
109

    
110
        String nextItem = currentPage.remove();
111
        if (currentPage.size() == 0) {
112
            try {
113
                getNextPage();
114
            } catch (Throwable e) {
115
                throw new RuntimeException(e);
116
            }
117
        }
118

    
119
        return nextItem;
120
    }
121

    
122
    public String getBaseURL() {
123
        return baseURL;
124
    }
125

    
126
    public void setBaseURL(final String baseURL) {
127
        this.baseURL = baseURL;
128
    }
129
}
(2-2/2)