Project

General

Profile

« Previous | Next » 

Revision 55216

implemented Datacite collector plugin from Elasticsearch dump

View differences:

modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/datacite/DataciteESIterator.java
1
package eu.dnetlib.data.collector.plugins.datacite;
2

  
3

  
4
import com.google.gson.Gson;
5
import com.google.gson.GsonBuilder;
6

  
7
import eu.dnetlib.data.collector.plugins.datacite.schema.DataciteSchema;
8
import eu.dnetlib.data.collector.plugins.datacite.schema.Result;
9
import org.apache.commons.codec.binary.Base64;
10
import org.apache.commons.io.IOUtils;
11
import org.apache.commons.lang3.StringUtils;
12

  
13
import java.io.ByteArrayOutputStream;
14
import java.io.IOException;
15
import java.net.URL;
16
import java.util.ArrayDeque;
17
import java.util.Iterator;
18
import java.util.Objects;
19
import java.util.Queue;
20
import java.util.zip.DataFormatException;
21
import java.util.zip.Inflater;
22

  
23
public class DataciteESIterator implements Iterator<String> {
24

  
25

  
26
    private final long timestamp;
27

  
28
    private String scrollId;
29

  
30
    private Queue<String> currentPage;
31

  
32
    private final Gson g =  new GsonBuilder().create();
33

  
34
    private static final String BASE_URL= "http://ip-90-147-167-25.ct1.garrservices.it:5000/new_scan";
35

  
36
    private static final String NEXT_URL= "http://ip-90-147-167-25.ct1.garrservices.it:5000/scan/%s";
37

  
38

  
39
    public DataciteESIterator(long timestamp) throws Exception {
40
        this.timestamp = timestamp;
41
        currentPage = new ArrayDeque<>();
42
        startRequest();
43
    }
44

  
45
    private static String decompression(final Result r) {
46
        try {
47
            byte[] byteArray = Base64.decodeBase64(r.getBody().getBytes());
48
            Inflater decompresser = new Inflater();
49
            decompresser.setInput(byteArray);
50
            ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length);
51
            byte[] buffer = new byte[8192];
52
            while (!decompresser.finished()) {
53
                int size = decompresser.inflate(buffer);
54
                bos.write(buffer, 0, size);
55
            }
56
            byte[] unzippeddata = bos.toByteArray();
57
            decompresser.end();
58

  
59
            return new String(unzippeddata);
60
        } catch (DataFormatException e) {
61
            return null;
62
        }
63

  
64
    }
65

  
66
    private void fillQueue(final String hits) {
67
        if (StringUtils.isBlank(hits) || "[]".equalsIgnoreCase(hits.trim()))
68
            return;
69
        try {
70
            DataciteSchema datacitepage = g.fromJson(hits, DataciteSchema.class);
71
            this.scrollId = datacitepage.getScrollId();
72
            datacitepage.getResult().stream().map(DataciteESIterator::decompression).filter(Objects::nonNull).forEach(this.currentPage::add);
73
        } catch (Throwable e) {
74
            System.out.println(hits);
75
            e.printStackTrace();
76
        }
77
    }
78

  
79
    private void startRequest() throws Exception {
80
        final URL startUrl = new URL(timestamp >0 ?BASE_URL + "?timestamp="+timestamp: BASE_URL);
81
        fillQueue(IOUtils.toString(startUrl.openStream()));
82
    }
83

  
84
    private void getNextPage() throws IOException {
85
        final URL startUrl = new URL(String.format(NEXT_URL,scrollId));
86
        fillQueue(IOUtils.toString(startUrl.openStream()));
87
    }
88

  
89

  
90
    @Override
91
    public boolean hasNext() {
92
        return  currentPage.size() >0;
93
    }
94

  
95
    @Override
96
    public String next() {
97

  
98
        if (currentPage.size() == 0) {
99

  
100
            return null;
101
        }
102

  
103
        String nextItem = currentPage.remove();
104
        if (currentPage.size() == 0) {
105
            try {
106
                getNextPage();
107
            } catch (Throwable e) {
108
                throw new RuntimeException(e);
109
            }
110
        }
111

  
112
        return nextItem;
113
    }
114
}
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/datacite/schema/DataciteSchema.java
1

  
2
package eu.dnetlib.data.collector.plugins.datacite.schema;
3

  
4
import java.util.List;
5
import com.google.gson.annotations.Expose;
6
import com.google.gson.annotations.SerializedName;
7

  
8
public class DataciteSchema {
9

  
10
    @SerializedName("counter")
11
    @Expose
12
    private Integer counter;
13
    @SerializedName("result")
14
    @Expose
15
    private List<Result> result = null;
16
    @SerializedName("scroll_id")
17
    @Expose
18
    private String scrollId;
19
    @SerializedName("total")
20
    @Expose
21
    private Integer total;
22

  
23
    public Integer getCounter() {
24
        return counter;
25
    }
26

  
27
    public void setCounter(Integer counter) {
28
        this.counter = counter;
29
    }
30

  
31
    public List<Result> getResult() {
32
        return result;
33
    }
34

  
35
    public void setResult(List<Result> result) {
36
        this.result = result;
37
    }
38

  
39
    public String getScrollId() {
40
        return scrollId;
41
    }
42

  
43
    public void setScrollId(String scrollId) {
44
        this.scrollId = scrollId;
45
    }
46

  
47
    public Integer getTotal() {
48
        return total;
49
    }
50

  
51
    public void setTotal(Integer total) {
52
        this.total = total;
53
    }
54

  
55
}
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/datacite/schema/Result.java
1

  
2
package eu.dnetlib.data.collector.plugins.datacite.schema;
3

  
4
import com.google.gson.annotations.Expose;
5
import com.google.gson.annotations.SerializedName;
6

  
7
public class Result {
8

  
9
    @SerializedName("body")
10
    @Expose
11
    private String body;
12
    @SerializedName("id")
13
    @Expose
14
    private String id;
15
    @SerializedName("originalId")
16
    @Expose
17
    private String originalId;
18
    @SerializedName("timestamp")
19
    @Expose
20
    private Integer timestamp;
21

  
22
    public String getBody() {
23
        return body;
24
    }
25

  
26
    public void setBody(String body) {
27
        this.body = body;
28
    }
29

  
30
    public String getId() {
31
        return id;
32
    }
33

  
34
    public void setId(String id) {
35
        this.id = id;
36
    }
37

  
38
    public String getOriginalId() {
39
        return originalId;
40
    }
41

  
42
    public void setOriginalId(String originalId) {
43
        this.originalId = originalId;
44
    }
45

  
46
    public Integer getTimestamp() {
47
        return timestamp;
48
    }
49

  
50
    public void setTimestamp(Integer timestamp) {
51
        this.timestamp = timestamp;
52
    }
53

  
54
}
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/datacite/DataciteCollectorPlugin.java
1
package eu.dnetlib.data.collector.plugins.datacite;
2

  
3
import eu.dnetlib.data.collector.plugin.AbstractCollectorPlugin;
4
import eu.dnetlib.data.collector.plugin.CollectorPlugin;
5
import eu.dnetlib.data.collector.rmi.CollectorServiceException;
6
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor;
7
import eu.dnetlib.miscutils.datetime.DateUtils;
8

  
9
import org.apache.commons.lang3.StringUtils;
10

  
11
import java.util.Date;
12

  
13
public class DataciteCollectorPlugin extends AbstractCollectorPlugin implements CollectorPlugin {
14

  
15

  
16
    @Override
17
    public Iterable<String> collect(InterfaceDescriptor interfaceDescriptor, String fromDate, String untilDate) throws CollectorServiceException {
18

  
19
        long timestamp= 0;
20
        if (StringUtils.isNotBlank(fromDate)) {
21
            Date parsed = new DateUtils().parse(fromDate);
22
            timestamp =parsed.getTime() /1000;
23
        }
24

  
25
        final long finalTimestamp = timestamp;
26
        return () -> {
27
            try {
28
                return new DataciteESIterator(finalTimestamp);
29
            } catch (Exception e) {
30
                throw new RuntimeException(e );
31
            }
32
        };
33
    }
34

  
35
}
modules/dnet-collector-plugins/trunk/src/main/resources/eu/dnetlib/data/collector/plugins/applicationContext-dnet-modular-collector-plugins.xml
8 8
       http://cxf.apache.org/transports/http/configuration http://cxf.apache.org/schemas/configuration/http-conf.xsd">
9 9

  
10 10

  
11
	<bean id="dataciteESPlugin" class="eu.dnetlib.data.collector.plugins.datacite.DataciteCollectorPlugin">
12
		<property name="protocolDescriptor">
13
			<bean class="eu.dnetlib.data.collector.rmi.ProtocolDescriptor" p:name="dataciteESPlugins"/>
14
		</property>
15
	</bean>
16

  
11 17
	<bean id="schemaOrgPlugin" class="eu.dnetlib.data.collector.plugins.schemaorg.SchemaOrgPlugin">
12 18
		<property name="protocolDescriptor">
13 19
			<bean class="eu.dnetlib.data.collector.rmi.ProtocolDescriptor" p:name="schemaorg">

Also available in: Unified diff