Revision 55216
Added by Sandro La Bruzzo about 5 years ago
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/datacite/DataciteESIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.data.collector.plugins.datacite; |
|
2 |
|
|
3 |
|
|
4 |
import com.google.gson.Gson; |
|
5 |
import com.google.gson.GsonBuilder; |
|
6 |
|
|
7 |
import eu.dnetlib.data.collector.plugins.datacite.schema.DataciteSchema; |
|
8 |
import eu.dnetlib.data.collector.plugins.datacite.schema.Result; |
|
9 |
import org.apache.commons.codec.binary.Base64; |
|
10 |
import org.apache.commons.io.IOUtils; |
|
11 |
import org.apache.commons.lang3.StringUtils; |
|
12 |
|
|
13 |
import java.io.ByteArrayOutputStream; |
|
14 |
import java.io.IOException; |
|
15 |
import java.net.URL; |
|
16 |
import java.util.ArrayDeque; |
|
17 |
import java.util.Iterator; |
|
18 |
import java.util.Objects; |
|
19 |
import java.util.Queue; |
|
20 |
import java.util.zip.DataFormatException; |
|
21 |
import java.util.zip.Inflater; |
|
22 |
|
|
23 |
public class DataciteESIterator implements Iterator<String> { |
|
24 |
|
|
25 |
|
|
26 |
private final long timestamp; |
|
27 |
|
|
28 |
private String scrollId; |
|
29 |
|
|
30 |
private Queue<String> currentPage; |
|
31 |
|
|
32 |
private final Gson g = new GsonBuilder().create(); |
|
33 |
|
|
34 |
private static final String BASE_URL= "http://ip-90-147-167-25.ct1.garrservices.it:5000/new_scan"; |
|
35 |
|
|
36 |
private static final String NEXT_URL= "http://ip-90-147-167-25.ct1.garrservices.it:5000/scan/%s"; |
|
37 |
|
|
38 |
|
|
39 |
public DataciteESIterator(long timestamp) throws Exception { |
|
40 |
this.timestamp = timestamp; |
|
41 |
currentPage = new ArrayDeque<>(); |
|
42 |
startRequest(); |
|
43 |
} |
|
44 |
|
|
45 |
private static String decompression(final Result r) { |
|
46 |
try { |
|
47 |
byte[] byteArray = Base64.decodeBase64(r.getBody().getBytes()); |
|
48 |
Inflater decompresser = new Inflater(); |
|
49 |
decompresser.setInput(byteArray); |
|
50 |
ByteArrayOutputStream bos = new ByteArrayOutputStream(byteArray.length); |
|
51 |
byte[] buffer = new byte[8192]; |
|
52 |
while (!decompresser.finished()) { |
|
53 |
int size = decompresser.inflate(buffer); |
|
54 |
bos.write(buffer, 0, size); |
|
55 |
} |
|
56 |
byte[] unzippeddata = bos.toByteArray(); |
|
57 |
decompresser.end(); |
|
58 |
|
|
59 |
return new String(unzippeddata); |
|
60 |
} catch (DataFormatException e) { |
|
61 |
return null; |
|
62 |
} |
|
63 |
|
|
64 |
} |
|
65 |
|
|
66 |
private void fillQueue(final String hits) { |
|
67 |
if (StringUtils.isBlank(hits) || "[]".equalsIgnoreCase(hits.trim())) |
|
68 |
return; |
|
69 |
try { |
|
70 |
DataciteSchema datacitepage = g.fromJson(hits, DataciteSchema.class); |
|
71 |
this.scrollId = datacitepage.getScrollId(); |
|
72 |
datacitepage.getResult().stream().map(DataciteESIterator::decompression).filter(Objects::nonNull).forEach(this.currentPage::add); |
|
73 |
} catch (Throwable e) { |
|
74 |
System.out.println(hits); |
|
75 |
e.printStackTrace(); |
|
76 |
} |
|
77 |
} |
|
78 |
|
|
79 |
private void startRequest() throws Exception { |
|
80 |
final URL startUrl = new URL(timestamp >0 ?BASE_URL + "?timestamp="+timestamp: BASE_URL); |
|
81 |
fillQueue(IOUtils.toString(startUrl.openStream())); |
|
82 |
} |
|
83 |
|
|
84 |
private void getNextPage() throws IOException { |
|
85 |
final URL startUrl = new URL(String.format(NEXT_URL,scrollId)); |
|
86 |
fillQueue(IOUtils.toString(startUrl.openStream())); |
|
87 |
} |
|
88 |
|
|
89 |
|
|
90 |
@Override |
|
91 |
public boolean hasNext() { |
|
92 |
return currentPage.size() >0; |
|
93 |
} |
|
94 |
|
|
95 |
@Override |
|
96 |
public String next() { |
|
97 |
|
|
98 |
if (currentPage.size() == 0) { |
|
99 |
|
|
100 |
return null; |
|
101 |
} |
|
102 |
|
|
103 |
String nextItem = currentPage.remove(); |
|
104 |
if (currentPage.size() == 0) { |
|
105 |
try { |
|
106 |
getNextPage(); |
|
107 |
} catch (Throwable e) { |
|
108 |
throw new RuntimeException(e); |
|
109 |
} |
|
110 |
} |
|
111 |
|
|
112 |
return nextItem; |
|
113 |
} |
|
114 |
} |
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/datacite/schema/DataciteSchema.java | ||
---|---|---|
1 |
|
|
2 |
package eu.dnetlib.data.collector.plugins.datacite.schema; |
|
3 |
|
|
4 |
import java.util.List; |
|
5 |
import com.google.gson.annotations.Expose; |
|
6 |
import com.google.gson.annotations.SerializedName; |
|
7 |
|
|
8 |
public class DataciteSchema { |
|
9 |
|
|
10 |
@SerializedName("counter") |
|
11 |
@Expose |
|
12 |
private Integer counter; |
|
13 |
@SerializedName("result") |
|
14 |
@Expose |
|
15 |
private List<Result> result = null; |
|
16 |
@SerializedName("scroll_id") |
|
17 |
@Expose |
|
18 |
private String scrollId; |
|
19 |
@SerializedName("total") |
|
20 |
@Expose |
|
21 |
private Integer total; |
|
22 |
|
|
23 |
public Integer getCounter() { |
|
24 |
return counter; |
|
25 |
} |
|
26 |
|
|
27 |
public void setCounter(Integer counter) { |
|
28 |
this.counter = counter; |
|
29 |
} |
|
30 |
|
|
31 |
public List<Result> getResult() { |
|
32 |
return result; |
|
33 |
} |
|
34 |
|
|
35 |
public void setResult(List<Result> result) { |
|
36 |
this.result = result; |
|
37 |
} |
|
38 |
|
|
39 |
public String getScrollId() { |
|
40 |
return scrollId; |
|
41 |
} |
|
42 |
|
|
43 |
public void setScrollId(String scrollId) { |
|
44 |
this.scrollId = scrollId; |
|
45 |
} |
|
46 |
|
|
47 |
public Integer getTotal() { |
|
48 |
return total; |
|
49 |
} |
|
50 |
|
|
51 |
public void setTotal(Integer total) { |
|
52 |
this.total = total; |
|
53 |
} |
|
54 |
|
|
55 |
} |
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/datacite/schema/Result.java | ||
---|---|---|
1 |
|
|
2 |
package eu.dnetlib.data.collector.plugins.datacite.schema; |
|
3 |
|
|
4 |
import com.google.gson.annotations.Expose; |
|
5 |
import com.google.gson.annotations.SerializedName; |
|
6 |
|
|
7 |
public class Result { |
|
8 |
|
|
9 |
@SerializedName("body") |
|
10 |
@Expose |
|
11 |
private String body; |
|
12 |
@SerializedName("id") |
|
13 |
@Expose |
|
14 |
private String id; |
|
15 |
@SerializedName("originalId") |
|
16 |
@Expose |
|
17 |
private String originalId; |
|
18 |
@SerializedName("timestamp") |
|
19 |
@Expose |
|
20 |
private Integer timestamp; |
|
21 |
|
|
22 |
public String getBody() { |
|
23 |
return body; |
|
24 |
} |
|
25 |
|
|
26 |
public void setBody(String body) { |
|
27 |
this.body = body; |
|
28 |
} |
|
29 |
|
|
30 |
public String getId() { |
|
31 |
return id; |
|
32 |
} |
|
33 |
|
|
34 |
public void setId(String id) { |
|
35 |
this.id = id; |
|
36 |
} |
|
37 |
|
|
38 |
public String getOriginalId() { |
|
39 |
return originalId; |
|
40 |
} |
|
41 |
|
|
42 |
public void setOriginalId(String originalId) { |
|
43 |
this.originalId = originalId; |
|
44 |
} |
|
45 |
|
|
46 |
public Integer getTimestamp() { |
|
47 |
return timestamp; |
|
48 |
} |
|
49 |
|
|
50 |
public void setTimestamp(Integer timestamp) { |
|
51 |
this.timestamp = timestamp; |
|
52 |
} |
|
53 |
|
|
54 |
} |
modules/dnet-collector-plugins/trunk/src/main/java/eu/dnetlib/data/collector/plugins/datacite/DataciteCollectorPlugin.java | ||
---|---|---|
1 |
package eu.dnetlib.data.collector.plugins.datacite; |
|
2 |
|
|
3 |
import eu.dnetlib.data.collector.plugin.AbstractCollectorPlugin; |
|
4 |
import eu.dnetlib.data.collector.plugin.CollectorPlugin; |
|
5 |
import eu.dnetlib.data.collector.rmi.CollectorServiceException; |
|
6 |
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor; |
|
7 |
import eu.dnetlib.miscutils.datetime.DateUtils; |
|
8 |
|
|
9 |
import org.apache.commons.lang3.StringUtils; |
|
10 |
|
|
11 |
import java.util.Date; |
|
12 |
|
|
13 |
public class DataciteCollectorPlugin extends AbstractCollectorPlugin implements CollectorPlugin { |
|
14 |
|
|
15 |
|
|
16 |
@Override |
|
17 |
public Iterable<String> collect(InterfaceDescriptor interfaceDescriptor, String fromDate, String untilDate) throws CollectorServiceException { |
|
18 |
|
|
19 |
long timestamp= 0; |
|
20 |
if (StringUtils.isNotBlank(fromDate)) { |
|
21 |
Date parsed = new DateUtils().parse(fromDate); |
|
22 |
timestamp =parsed.getTime() /1000; |
|
23 |
} |
|
24 |
|
|
25 |
final long finalTimestamp = timestamp; |
|
26 |
return () -> { |
|
27 |
try { |
|
28 |
return new DataciteESIterator(finalTimestamp); |
|
29 |
} catch (Exception e) { |
|
30 |
throw new RuntimeException(e ); |
|
31 |
} |
|
32 |
}; |
|
33 |
} |
|
34 |
|
|
35 |
} |
modules/dnet-collector-plugins/trunk/src/main/resources/eu/dnetlib/data/collector/plugins/applicationContext-dnet-modular-collector-plugins.xml | ||
---|---|---|
8 | 8 |
http://cxf.apache.org/transports/http/configuration http://cxf.apache.org/schemas/configuration/http-conf.xsd"> |
9 | 9 |
|
10 | 10 |
|
11 |
<bean id="dataciteESPlugin" class="eu.dnetlib.data.collector.plugins.datacite.DataciteCollectorPlugin"> |
|
12 |
<property name="protocolDescriptor"> |
|
13 |
<bean class="eu.dnetlib.data.collector.rmi.ProtocolDescriptor" p:name="dataciteESPlugins"/> |
|
14 |
</property> |
|
15 |
</bean> |
|
16 |
|
|
11 | 17 |
<bean id="schemaOrgPlugin" class="eu.dnetlib.data.collector.plugins.schemaorg.SchemaOrgPlugin"> |
12 | 18 |
<property name="protocolDescriptor"> |
13 | 19 |
<bean class="eu.dnetlib.data.collector.rmi.ProtocolDescriptor" p:name="schemaorg"> |
Also available in: Unified diff
implemented Datacite collector plugin from Elasticsearch dump