Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.datacite;
2

    
3

    
4
import com.google.gson.Gson;
5
import com.google.gson.GsonBuilder;
6

    
7
import eu.dnetlib.data.collector.plugins.datacite.schema.DataciteSchema;
8
import eu.dnetlib.data.collector.plugins.datacite.schema.Result;
9
import org.apache.commons.codec.binary.Base64;
10
import org.apache.commons.io.IOUtils;
11
import org.apache.commons.lang3.StringUtils;
12

    
13
import java.io.ByteArrayOutputStream;
14
import java.io.IOException;
15
import java.net.URL;
16
import java.util.ArrayDeque;
17
import java.util.Iterator;
18
import java.util.Objects;
19
import java.util.Queue;
20
import java.util.zip.DataFormatException;
21
import java.util.zip.Inflater;
22

    
23
public class DataciteESIterator implements Iterator<String> {
24

    
25

    
26
    private final long timestamp;
27

    
28
    private String scrollId;
29

    
30
    private Queue<String> currentPage;
31

    
32
    private final Gson g =  new GsonBuilder().create();
33

    
34
    private static final String BASE_URL= "http://ip-90-147-167-25.ct1.garrservices.it:5000/new_scan";
35

    
36
    private static final String NEXT_URL= "http://ip-90-147-167-25.ct1.garrservices.it:5000/scan/%s";
37

    
38

    
39
    public DataciteESIterator(long timestamp) throws Exception {
40
        this.timestamp = timestamp;
41
        currentPage = new ArrayDeque<>();
42
        startRequest();
43
    }
44

    
45
    private static String decompression(final Result r) {
46
        try {
47
            byte[] byteArray = Base64.decodeBase64(r.getBody().getBytes());
48
            Inflater decompresser = new Inflater();
49
            decompresser.setInput(byteArray);
50
            ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length);
51
            byte[] buffer = new byte[8192];
52
            while (!decompresser.finished()) {
53
                int size = decompresser.inflate(buffer);
54
                bos.write(buffer, 0, size);
55
            }
56
            byte[] unzippeddata = bos.toByteArray();
57
            decompresser.end();
58

    
59
            return new String(unzippeddata);
60
        } catch (DataFormatException e) {
61
            return null;
62
        }
63

    
64
    }
65

    
66
    private void fillQueue(final String hits) {
67
        if (StringUtils.isBlank(hits) || "[]".equalsIgnoreCase(hits.trim()))
68
            return;
69
        try {
70
            DataciteSchema datacitepage = g.fromJson(hits, DataciteSchema.class);
71
            this.scrollId = datacitepage.getScrollId();
72
            datacitepage.getResult().stream().map(DataciteESIterator::decompression).filter(Objects::nonNull).forEach(this.currentPage::add);
73
        } catch (Throwable e) {
74
            System.out.println(hits);
75
            e.printStackTrace();
76
        }
77
    }
78

    
79
    private void startRequest() throws Exception {
80
        final URL startUrl = new URL(timestamp >0 ?BASE_URL + "?timestamp="+timestamp: BASE_URL);
81
        fillQueue(IOUtils.toString(startUrl.openStream()));
82
    }
83

    
84
    private void getNextPage() throws IOException {
85
        final URL startUrl = new URL(String.format(NEXT_URL,scrollId));
86
        fillQueue(IOUtils.toString(startUrl.openStream()));
87
    }
88

    
89

    
90
    @Override
91
    public boolean hasNext() {
92
        return  currentPage.size() >0;
93
    }
94

    
95
    @Override
96
    public String next() {
97

    
98
        if (currentPage.size() == 0) {
99

    
100
            return null;
101
        }
102

    
103
        String nextItem = currentPage.remove();
104
        if (currentPage.size() == 0) {
105
            try {
106
                getNextPage();
107
            } catch (Throwable e) {
108
                throw new RuntimeException(e);
109
            }
110
        }
111

    
112
        return nextItem;
113
    }
114
}
(2-2/2)