Revision 45602
Added by Michele Artini about 7 years ago
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/WorkerConfiguration.java | ||
---|---|---|
1 |
package eu.dnetlib.msro; |
|
2 |
|
|
3 |
import org.springframework.context.annotation.Configuration; |
|
4 |
|
|
5 |
@Configuration |
|
6 |
public class WorkerConfiguration { |
|
7 |
|
|
8 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/exceptions/MSROException.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.exceptions; |
|
2 |
|
|
3 |
import eu.dnetlib.exceptions.DnetGenericException; |
|
4 |
|
|
5 |
public class MSROException extends DnetGenericException { |
|
6 |
|
|
7 |
/** |
|
8 |
* |
|
9 |
*/ |
|
10 |
private static final long serialVersionUID = 3522182470263128085L; |
|
11 |
|
|
12 |
public MSROException(final String message, final Throwable cause) { |
|
13 |
super(message, cause); |
|
14 |
} |
|
15 |
|
|
16 |
public MSROException(final String message) { |
|
17 |
super(message); |
|
18 |
} |
|
19 |
|
|
20 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/exceptions/CollectorServiceException.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.exceptions; |
|
2 |
|
|
3 |
import eu.dnetlib.exceptions.DnetGenericException; |
|
4 |
|
|
5 |
public class CollectorServiceException extends DnetGenericException { |
|
6 |
|
|
7 |
/** |
|
8 |
* |
|
9 |
*/ |
|
10 |
private static final long serialVersionUID = -2286150868535911426L; |
|
11 |
|
|
12 |
public CollectorServiceException(final String message, final Throwable cause) { |
|
13 |
super(message, cause); |
|
14 |
} |
|
15 |
|
|
16 |
public CollectorServiceException(final String message) { |
|
17 |
super(message); |
|
18 |
} |
|
19 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/exceptions/CollectorServiceRuntimeException.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.exceptions; |
|
2 |
|
|
3 |
import eu.dnetlib.exceptions.DnetGenericRuntimeException; |
|
4 |
|
|
5 |
public class CollectorServiceRuntimeException extends DnetGenericRuntimeException { |
|
6 |
|
|
7 |
/** |
|
8 |
* |
|
9 |
*/ |
|
10 |
private static final long serialVersionUID = -2286150868535911426L; |
|
11 |
|
|
12 |
public CollectorServiceRuntimeException(final String message, final Throwable cause) { |
|
13 |
super(message, cause); |
|
14 |
} |
|
15 |
|
|
16 |
public CollectorServiceRuntimeException(final String message) { |
|
17 |
super(message); |
|
18 |
} |
|
19 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/WorkerProperties.java | ||
---|---|---|
1 |
package eu.dnetlib.msro; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.ArrayList; |
|
5 |
import java.util.List; |
|
6 |
import java.util.Map; |
|
7 |
|
|
8 |
import javax.validation.constraints.Max; |
|
9 |
import javax.validation.constraints.Min; |
|
10 |
import javax.validation.constraints.NotNull; |
|
11 |
|
|
12 |
import org.apache.commons.logging.Log; |
|
13 |
import org.apache.commons.logging.LogFactory; |
|
14 |
import org.springframework.boot.context.properties.ConfigurationProperties; |
|
15 |
import org.springframework.stereotype.Component; |
|
16 |
|
|
17 |
import com.fasterxml.jackson.databind.ObjectMapper; |
|
18 |
|
|
19 |
@Component |
|
20 |
@ConfigurationProperties(prefix = "msro.worker") |
|
21 |
public class WorkerProperties { |
|
22 |
|
|
23 |
private static final Log log = LogFactory.getLog(WorkerProperties.class); |
|
24 |
|
|
25 |
private final ObjectMapper mapper = new ObjectMapper(); |
|
26 |
|
|
27 |
@Min(1) |
|
28 |
@Max(1024) |
|
29 |
private int maxSize; |
|
30 |
|
|
31 |
@NotNull |
|
32 |
private String datasourceProtocolsJson; |
|
33 |
|
|
34 |
@NotNull |
|
35 |
private String datasourceTypologiesJson; |
|
36 |
|
|
37 |
public String getDatasourceProtocolsJson() { |
|
38 |
return datasourceProtocolsJson; |
|
39 |
} |
|
40 |
|
|
41 |
public void setDatasourceProtocolsJson(final String datasourceProtocolsJson) { |
|
42 |
this.datasourceProtocolsJson = datasourceProtocolsJson; |
|
43 |
} |
|
44 |
|
|
45 |
public String getDatasourceTypologiesJson() { |
|
46 |
return datasourceTypologiesJson; |
|
47 |
} |
|
48 |
|
|
49 |
public void setDatasourceTypologiesJson(final String datasourceTypologiesJson) { |
|
50 |
this.datasourceTypologiesJson = datasourceTypologiesJson; |
|
51 |
} |
|
52 |
|
|
53 |
public int getMaxSize() { |
|
54 |
return maxSize; |
|
55 |
} |
|
56 |
|
|
57 |
public void setMaxSize(final int maxSize) { |
|
58 |
this.maxSize = maxSize; |
|
59 |
} |
|
60 |
|
|
61 |
public List<Map<String, String>> getDatasourceTypologies() { |
|
62 |
try { |
|
63 |
return mapper.readValue(datasourceTypologiesJson, List.class); |
|
64 |
} catch (final IOException e) { |
|
65 |
log.warn("Invalid JSON property: " + datasourceTypologiesJson, e); |
|
66 |
return new ArrayList<>(); |
|
67 |
} |
|
68 |
} |
|
69 |
|
|
70 |
public List<Map<String, String>> getDatasourceProtocols() { |
|
71 |
try { |
|
72 |
return mapper.readValue(datasourceProtocolsJson, List.class); |
|
73 |
} catch (final IOException e) { |
|
74 |
log.warn("Invalid JSON property: " + datasourceProtocolsJson, e); |
|
75 |
return new ArrayList<>(); |
|
76 |
} |
|
77 |
} |
|
78 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/DnetSimpleAggregationWorkerApplication.java | ||
---|---|---|
1 |
package eu.dnetlib.msro; |
|
2 |
|
|
3 |
import org.springframework.boot.SpringApplication; |
|
4 |
import org.springframework.boot.autoconfigure.SpringBootApplication; |
|
5 |
import org.springframework.context.annotation.ComponentScan; |
|
6 |
import org.springframework.scheduling.annotation.EnableScheduling; |
|
7 |
|
|
8 |
@SpringBootApplication |
|
9 |
@ComponentScan("eu.dnetlib") |
|
10 |
@EnableScheduling |
|
11 |
public class DnetSimpleAggregationWorkerApplication { |
|
12 |
|
|
13 |
public static void main(final String[] args) { |
|
14 |
SpringApplication.run(DnetSimpleAggregationWorkerApplication.class, args); |
|
15 |
} |
|
16 |
|
|
17 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/controller/MsroWorkerController.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.controller; |
|
2 |
|
|
3 |
import java.util.List; |
|
4 |
import java.util.stream.Collectors; |
|
5 |
|
|
6 |
import org.springframework.beans.BeansException; |
|
7 |
import org.springframework.context.ApplicationContext; |
|
8 |
import org.springframework.context.ApplicationContextAware; |
|
9 |
import org.springframework.web.bind.annotation.RequestMapping; |
|
10 |
import org.springframework.web.bind.annotation.RestController; |
|
11 |
|
|
12 |
import eu.dnetlib.enabling.annotations.DnetService; |
|
13 |
import eu.dnetlib.enabling.annotations.DnetServiceType; |
|
14 |
import eu.dnetlib.msro.workflows.nodes.AbstractProcessNode; |
|
15 |
import eu.dnetlib.msro.workflows.procs.ProcessNodeDetails; |
|
16 |
import eu.dnetlib.services.BaseService; |
|
17 |
|
|
18 |
@RestController |
|
19 |
@RequestMapping("/worker") |
|
20 |
@DnetService(DnetServiceType.msroWorker) |
|
21 |
public class MsroWorkerController extends BaseService implements ApplicationContextAware { |
|
22 |
|
|
23 |
private ApplicationContext applicationContext; |
|
24 |
|
|
25 |
@RequestMapping("nodes") |
|
26 |
public List<ProcessNodeDetails> listNodes() { |
|
27 |
return applicationContext.getBeansOfType(AbstractProcessNode.class) |
|
28 |
.values() |
|
29 |
.stream() |
|
30 |
.map(Object::getClass) |
|
31 |
.map(ProcessNodeDetails::prepare) |
|
32 |
.collect(Collectors.toList()); |
|
33 |
} |
|
34 |
|
|
35 |
@Override |
|
36 |
public void setApplicationContext(final ApplicationContext applicationContext) throws BeansException { |
|
37 |
this.applicationContext = applicationContext; |
|
38 |
} |
|
39 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/mongo/MongoDumpIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.mongo; |
|
2 |
|
|
3 |
import java.io.BufferedReader; |
|
4 |
import java.io.FileReader; |
|
5 |
import java.io.IOException; |
|
6 |
import java.util.Iterator; |
|
7 |
|
|
8 |
import com.google.gson.JsonElement; |
|
9 |
import com.google.gson.JsonObject; |
|
10 |
import com.google.gson.JsonParser; |
|
11 |
|
|
12 |
public class MongoDumpIterator implements Iterator<String> { |
|
13 |
|
|
14 |
private final BufferedReader inputStream; |
|
15 |
private String currentLine = null; |
|
16 |
|
|
17 |
public MongoDumpIterator(final FileReader inputStream) { |
|
18 |
this.inputStream = new BufferedReader(inputStream); |
|
19 |
this.currentLine = getNextLine(); |
|
20 |
} |
|
21 |
|
|
22 |
@Override |
|
23 |
public boolean hasNext() { |
|
24 |
return currentLine != null; |
|
25 |
|
|
26 |
} |
|
27 |
|
|
28 |
@Override |
|
29 |
public String next() { |
|
30 |
final String returnedString = this.currentLine; |
|
31 |
this.currentLine = getNextLine(); |
|
32 |
return returnedString; |
|
33 |
} |
|
34 |
|
|
35 |
@Override |
|
36 |
public void remove() { |
|
37 |
// TODO Auto-generated method stub |
|
38 |
|
|
39 |
} |
|
40 |
|
|
41 |
private String getNextLine() { |
|
42 |
try { |
|
43 |
String input = inputStream.readLine(); |
|
44 |
while (input != null) { |
|
45 |
JsonElement jElement = new JsonParser().parse(input); |
|
46 |
JsonObject jobject = jElement.getAsJsonObject(); |
|
47 |
if (jobject.has("body")) { return jobject.get("body").getAsString(); } |
|
48 |
input = inputStream.readLine(); |
|
49 |
} |
|
50 |
return null; |
|
51 |
|
|
52 |
} catch (IOException e) { |
|
53 |
return null; |
|
54 |
} |
|
55 |
} |
|
56 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/mongo/MongoDumpPlugin.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.mongo; |
|
2 |
|
|
3 |
import java.io.File; |
|
4 |
import java.io.FileNotFoundException; |
|
5 |
import java.io.FileReader; |
|
6 |
import java.util.Iterator; |
|
7 |
import java.util.stream.Stream; |
|
8 |
|
|
9 |
import org.springframework.stereotype.Component; |
|
10 |
|
|
11 |
import eu.dnetlib.miscutils.streams.DnetStreamSupport; |
|
12 |
import eu.dnetlib.msro.workers.aggregation.collect.CollectException; |
|
13 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin; |
|
14 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin; |
|
15 |
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
16 |
|
|
17 |
@Component |
|
18 |
@DnetCollectorPlugin("mongoDump") |
|
19 |
public class MongoDumpPlugin implements CollectorPlugin { |
|
20 |
|
|
21 |
@Override |
|
22 |
public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate) |
|
23 |
throws CollectException { |
|
24 |
try { |
|
25 |
final String baseUrl = interfaceDescriptor.getBaseUrl(); |
|
26 |
|
|
27 |
if ((baseUrl == null) || baseUrl.isEmpty()) { throw new CollectException("Param 'baseurl' is null or empty"); } |
|
28 |
final File f = new File(baseUrl); |
|
29 |
if (f.exists() == false) { throw new CollectException("the file at url " + baseUrl + " does not exists"); } |
|
30 |
|
|
31 |
final FileReader reader = new FileReader(f); |
|
32 |
|
|
33 |
final Iterator<String> iter = new MongoDumpIterator(reader); |
|
34 |
|
|
35 |
return DnetStreamSupport.generateStreamFromIterator(iter); |
|
36 |
} catch (final FileNotFoundException e) { |
|
37 |
throw new CollectException("Error unable to open inputStream", e); |
|
38 |
} |
|
39 |
} |
|
40 |
|
|
41 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/ProtocolDescriptor.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins; |
|
2 |
|
|
3 |
import java.util.ArrayList; |
|
4 |
import java.util.List; |
|
5 |
import javax.xml.bind.annotation.XmlRootElement; |
|
6 |
|
|
7 |
import org.springframework.beans.factory.annotation.Required; |
|
8 |
|
|
9 |
@XmlRootElement |
|
10 |
public class ProtocolDescriptor { |
|
11 |
|
|
12 |
private String name; |
|
13 |
private List<ProtocolParameter> params = new ArrayList<ProtocolParameter>(); |
|
14 |
|
|
15 |
public ProtocolDescriptor() { |
|
16 |
} |
|
17 |
|
|
18 |
public ProtocolDescriptor(final String name, final List<ProtocolParameter> params) { |
|
19 |
this.name = name; |
|
20 |
this.params = params; |
|
21 |
} |
|
22 |
|
|
23 |
public String getName() { |
|
24 |
return name; |
|
25 |
} |
|
26 |
|
|
27 |
@Required |
|
28 |
public void setName(final String name) { |
|
29 |
this.name = name; |
|
30 |
} |
|
31 |
|
|
32 |
public List<ProtocolParameter> getParams() { |
|
33 |
return params; |
|
34 |
} |
|
35 |
|
|
36 |
public void setParams(final List<ProtocolParameter> params) { |
|
37 |
this.params = params; |
|
38 |
} |
|
39 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/archive/zip/ZipCollectorPlugin.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.archive.zip; |
|
2 |
|
|
3 |
import java.io.File; |
|
4 |
import java.net.MalformedURLException; |
|
5 |
import java.net.URL; |
|
6 |
import java.util.Iterator; |
|
7 |
import java.util.stream.Stream; |
|
8 |
|
|
9 |
import org.springframework.stereotype.Component; |
|
10 |
|
|
11 |
import eu.dnetlib.miscutils.streams.DnetStreamSupport; |
|
12 |
import eu.dnetlib.msro.workers.aggregation.collect.CollectException; |
|
13 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin; |
|
14 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin; |
|
15 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.oai.engine.XmlCleaner; |
|
16 |
// import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
17 |
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
18 |
|
|
19 |
/** |
|
20 |
* Collector pluging for collecting a zipped folder of records |
|
21 |
* |
|
22 |
* @author Andrea |
|
23 |
*/ |
|
24 |
@Component |
|
25 |
@DnetCollectorPlugin("zip") |
|
26 |
public class ZipCollectorPlugin implements CollectorPlugin { |
|
27 |
|
|
28 |
@Override |
|
29 |
public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate) |
|
30 |
throws CollectException { |
|
31 |
|
|
32 |
final String baseUrl = interfaceDescriptor.getBaseUrl(); |
|
33 |
if ((baseUrl == null) || baseUrl.isEmpty()) { throw new CollectException("Param 'baseurl' is null or empty"); } |
|
34 |
|
|
35 |
try { |
|
36 |
final String zipPath = interfaceDescriptor.getBaseUrl(); |
|
37 |
final URL zipUrl = new URL(zipPath); |
|
38 |
final File zipFile = new File(zipUrl.getPath()); |
|
39 |
if (!zipFile.exists()) { throw new CollectException(String.format("The base ULR %s, does not exist", zipFile.getPath())); } |
|
40 |
|
|
41 |
final Iterator<String> zipIterator = new ZipIterator(zipFile.getAbsolutePath()); |
|
42 |
|
|
43 |
return DnetStreamSupport.generateStreamFromIterator(zipIterator) |
|
44 |
.map(s -> s.startsWith("\uFEFF") ? s.substring(1) : s) |
|
45 |
.map(XmlCleaner::cleanAllEntities); |
|
46 |
} catch (final MalformedURLException e) { |
|
47 |
throw new CollectException("Zip collector failed! ", e); |
|
48 |
} |
|
49 |
|
|
50 |
} |
|
51 |
|
|
52 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/archive/zip/ZipIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.archive.zip; |
|
2 |
|
|
3 |
import java.io.File; |
|
4 |
import java.io.IOException; |
|
5 |
import java.io.InputStream; |
|
6 |
import java.util.Enumeration; |
|
7 |
import java.util.Iterator; |
|
8 |
import java.util.zip.ZipEntry; |
|
9 |
import java.util.zip.ZipFile; |
|
10 |
|
|
11 |
import org.apache.commons.io.IOUtils; |
|
12 |
import org.apache.commons.logging.Log; |
|
13 |
import org.apache.commons.logging.LogFactory; |
|
14 |
|
|
15 |
public class ZipIterator implements Iterator<String> { |
|
16 |
|
|
17 |
/** |
|
18 |
* The Constant log. |
|
19 |
*/ |
|
20 |
private static final Log log = LogFactory.getLog(ZipIterator.class); |
|
21 |
|
|
22 |
ZipFile zipFile; |
|
23 |
Enumeration<? extends ZipEntry> entries; |
|
24 |
private String current; |
|
25 |
|
|
26 |
public ZipIterator(final String zipPath) { |
|
27 |
try { |
|
28 |
this.zipFile = new ZipFile(zipPath); |
|
29 |
this.entries = zipFile.entries(); |
|
30 |
this.current = findNext(); |
|
31 |
} catch (IOException e) { |
|
32 |
log.error("Problems opening the .zip file " + zipPath, e); |
|
33 |
} |
|
34 |
} |
|
35 |
|
|
36 |
public ZipIterator(final File file) { |
|
37 |
try { |
|
38 |
this.zipFile = new ZipFile(file); |
|
39 |
this.entries = zipFile.entries(); |
|
40 |
this.current = findNext(); |
|
41 |
} catch (IOException e) { |
|
42 |
log.error("Problems opening the .zip file " + zipFile.getName(), e); |
|
43 |
} |
|
44 |
} |
|
45 |
|
|
46 |
@Override |
|
47 |
public boolean hasNext() { |
|
48 |
return current != null; |
|
49 |
} |
|
50 |
|
|
51 |
@Override |
|
52 |
public String next() { |
|
53 |
String ret = new String(current); |
|
54 |
current = findNext(); |
|
55 |
return ret; |
|
56 |
} |
|
57 |
|
|
58 |
@Override |
|
59 |
public void remove() { |
|
60 |
} |
|
61 |
|
|
62 |
private synchronized String findNext() { |
|
63 |
ZipEntry entry = null; |
|
64 |
while (entries.hasMoreElements() && (entry = entries.nextElement()).isDirectory()) { |
|
65 |
log.debug("Skipping Zip entry " + entry.getName()); |
|
66 |
} |
|
67 |
|
|
68 |
if (entry == null) { |
|
69 |
return null; |
|
70 |
} else { |
|
71 |
log.debug("Extracting " + entry.getName()); |
|
72 |
try { |
|
73 |
InputStream stream = zipFile.getInputStream(entry); |
|
74 |
return IOUtils.toString(stream); |
|
75 |
} catch (IOException e) { |
|
76 |
log.error("Problems extracting entry " + entry.getName(), e); |
|
77 |
return null; |
|
78 |
} |
|
79 |
} |
|
80 |
|
|
81 |
} |
|
82 |
|
|
83 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/archive/targz/TarGzIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.archive.targz; |
|
2 |
|
|
3 |
import java.io.*; |
|
4 |
import java.util.Iterator; |
|
5 |
import java.util.zip.GZIPInputStream; |
|
6 |
|
|
7 |
import org.apache.commons.compress.archivers.tar.TarArchiveEntry; |
|
8 |
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream; |
|
9 |
import org.apache.commons.logging.Log; |
|
10 |
import org.apache.commons.logging.LogFactory; |
|
11 |
|
|
12 |
public class TarGzIterator implements Iterator<String> { |
|
13 |
|
|
14 |
/** |
|
15 |
* The Constant log. |
|
16 |
*/ |
|
17 |
private static final Log log = LogFactory.getLog(TarGzIterator.class); |
|
18 |
|
|
19 |
private TarArchiveInputStream tarInputStream; |
|
20 |
private String current; |
|
21 |
|
|
22 |
public TarGzIterator(final String tarGzPath) { |
|
23 |
try { |
|
24 |
this.tarInputStream = new TarArchiveInputStream(new BufferedInputStream(new GZIPInputStream(new FileInputStream(tarGzPath)))); |
|
25 |
this.current = findNext(); |
|
26 |
} catch (FileNotFoundException e) { |
|
27 |
log.error("Tar.gz file not found: " + tarGzPath, e); |
|
28 |
} catch (IOException e) { |
|
29 |
log.error("Problem opening tar.gz file " + tarGzPath, e); |
|
30 |
} |
|
31 |
} |
|
32 |
|
|
33 |
public TarGzIterator(final File tarGzFile) { |
|
34 |
try { |
|
35 |
this.tarInputStream = new TarArchiveInputStream(new BufferedInputStream(new GZIPInputStream(new FileInputStream(tarGzFile)))); |
|
36 |
this.current = findNext(); |
|
37 |
} catch (FileNotFoundException e) { |
|
38 |
log.error("Tar.gz file not found: " + tarGzFile.getAbsolutePath(), e); |
|
39 |
} catch (IOException e) { |
|
40 |
log.error("Problem opening tar.gz file " + tarGzFile.getAbsolutePath(), e); |
|
41 |
} |
|
42 |
} |
|
43 |
|
|
44 |
@Override |
|
45 |
public boolean hasNext() { |
|
46 |
return current != null; |
|
47 |
} |
|
48 |
|
|
49 |
@Override |
|
50 |
public String next() { |
|
51 |
String ret = new String(current); |
|
52 |
current = findNext(); |
|
53 |
return ret; |
|
54 |
} |
|
55 |
|
|
56 |
@Override |
|
57 |
public void remove() { |
|
58 |
} |
|
59 |
|
|
60 |
private synchronized String findNext() { |
|
61 |
TarArchiveEntry entry = null; |
|
62 |
try { |
|
63 |
while (null != (entry = tarInputStream.getNextTarEntry()) && !entry.isFile()) { |
|
64 |
log.debug("Skipping TAR entry " + entry.getName()); |
|
65 |
} |
|
66 |
} catch (IOException e) { |
|
67 |
log.error("Error during tar.gz extraction", e); |
|
68 |
} |
|
69 |
|
|
70 |
if (entry == null) { |
|
71 |
return null; |
|
72 |
} else { |
|
73 |
log.debug("Extracting " + entry.getName()); |
|
74 |
byte[] content = new byte[(int) entry.getSize()]; |
|
75 |
try { |
|
76 |
tarInputStream.read(content, 0, content.length); |
|
77 |
return new String(content); |
|
78 |
} catch (IOException e) { |
|
79 |
log.error("Impossible to extract file " + entry.getName(), e); |
|
80 |
return null; |
|
81 |
} |
|
82 |
|
|
83 |
} |
|
84 |
} |
|
85 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/archive/targz/TarGzCollectorPlugin.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.archive.targz; |
|
2 |
|
|
3 |
import java.io.File; |
|
4 |
import java.net.MalformedURLException; |
|
5 |
import java.net.URL; |
|
6 |
import java.util.stream.Stream; |
|
7 |
|
|
8 |
import org.springframework.stereotype.Component; |
|
9 |
|
|
10 |
import eu.dnetlib.miscutils.streams.DnetStreamSupport; |
|
11 |
import eu.dnetlib.msro.workers.aggregation.collect.CollectException; |
|
12 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin; |
|
13 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin; |
|
14 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.oai.engine.XmlCleaner; |
|
15 |
// import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
16 |
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
17 |
|
|
18 |
/** |
|
19 |
* Collector pluging for collecting a .tar.gz folder of records |
|
20 |
* |
|
21 |
* @author andrea |
|
22 |
*/ |
|
23 |
|
|
24 |
@Component |
|
25 |
@DnetCollectorPlugin("targz") |
|
26 |
public class TarGzCollectorPlugin implements CollectorPlugin { |
|
27 |
|
|
28 |
@Override |
|
29 |
public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate) |
|
30 |
throws CollectException { |
|
31 |
try { |
|
32 |
final String baseUrl = interfaceDescriptor.getBaseUrl(); |
|
33 |
if ((baseUrl == null) || baseUrl.isEmpty()) { throw new CollectException("Param 'baseurl' is null or empty"); } |
|
34 |
|
|
35 |
final String tarGzPath = interfaceDescriptor.getBaseUrl(); |
|
36 |
final URL tarGzUrl = new URL(tarGzPath); |
|
37 |
final File tarGzFile = new File(tarGzUrl.getPath()); |
|
38 |
|
|
39 |
if (!tarGzFile.exists()) { throw new CollectException(String.format("The base ULR %s, does not exist", tarGzFile.getPath())); } |
|
40 |
|
|
41 |
final TarGzIterator tgzIterator = new TarGzIterator(tarGzFile.getAbsolutePath()); |
|
42 |
|
|
43 |
return DnetStreamSupport.generateStreamFromIterator(tgzIterator) |
|
44 |
.map(s -> s.startsWith("\uFEFF") ? s.substring(1) : s) |
|
45 |
.map(XmlCleaner::cleanAllEntities); |
|
46 |
} catch (final MalformedURLException e) { |
|
47 |
throw new CollectException("TarGz collector failed! ", e); |
|
48 |
} |
|
49 |
} |
|
50 |
|
|
51 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/ProtocolParameter.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins; |
|
2 |
|
|
3 |
import org.springframework.beans.factory.annotation.Required; |
|
4 |
|
|
5 |
import eu.dnetlib.msro.workers.aggregation.collect.functions.NullFunction; |
|
6 |
import eu.dnetlib.msro.workers.aggregation.collect.functions.ParamValuesFunction; |
|
7 |
|
|
8 |
public class ProtocolParameter { |
|
9 |
|
|
10 |
private String name; |
|
11 |
private boolean optional = false; |
|
12 |
private ProtocolParameterType type = ProtocolParameterType.TEXT; |
|
13 |
private String regex = null; |
|
14 |
private Class<? extends ParamValuesFunction> populateFunction = NullFunction.class; |
|
15 |
private boolean functionPopulated = false; |
|
16 |
|
|
17 |
public ProtocolParameter() {} |
|
18 |
|
|
19 |
public ProtocolParameter(final String name, final boolean optional, final ProtocolParameterType type, final String regex) { |
|
20 |
this(name, optional, type, regex, null); |
|
21 |
} |
|
22 |
|
|
23 |
public ProtocolParameter(final String name, final boolean optional, final ProtocolParameterType type, final String regex, |
|
24 |
final Class<? extends ParamValuesFunction> populateFunction) { |
|
25 |
this.name = name; |
|
26 |
this.optional = optional; |
|
27 |
this.type = type; |
|
28 |
this.regex = regex; |
|
29 |
this.populateFunction = populateFunction; |
|
30 |
functionPopulated = this.populateFunction != null; |
|
31 |
} |
|
32 |
|
|
33 |
public String getName() { |
|
34 |
return name; |
|
35 |
} |
|
36 |
|
|
37 |
@Required |
|
38 |
public void setName(final String name) { |
|
39 |
this.name = name; |
|
40 |
} |
|
41 |
|
|
42 |
public boolean isOptional() { |
|
43 |
return optional; |
|
44 |
} |
|
45 |
|
|
46 |
public void setOptional(final boolean optional) { |
|
47 |
this.optional = optional; |
|
48 |
} |
|
49 |
|
|
50 |
public ProtocolParameterType getType() { |
|
51 |
return type; |
|
52 |
} |
|
53 |
|
|
54 |
public void setType(final ProtocolParameterType type) { |
|
55 |
this.type = type; |
|
56 |
} |
|
57 |
|
|
58 |
public String getRegex() { |
|
59 |
return regex; |
|
60 |
} |
|
61 |
|
|
62 |
public void setRegex(final String regex) { |
|
63 |
this.regex = regex; |
|
64 |
} |
|
65 |
|
|
66 |
public Class<? extends ParamValuesFunction> getPopulateFunction() { |
|
67 |
return populateFunction; |
|
68 |
} |
|
69 |
|
|
70 |
public void setPopulateFunction(final Class<? extends ParamValuesFunction> populateFunction) { |
|
71 |
this.populateFunction = populateFunction; |
|
72 |
functionPopulated = this.populateFunction != null; |
|
73 |
} |
|
74 |
|
|
75 |
public boolean isFunctionPopulated() { |
|
76 |
return functionPopulated; |
|
77 |
} |
|
78 |
|
|
79 |
public void setFunctionPopulated(final boolean functionPopulated) { |
|
80 |
this.functionPopulated = functionPopulated; |
|
81 |
} |
|
82 |
|
|
83 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/filesystem/FilesystemCollectorPlugin.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.filesystem; |
|
2 |
|
|
3 |
import java.io.File; |
|
4 |
import java.io.FileInputStream; |
|
5 |
import java.net.MalformedURLException; |
|
6 |
import java.net.URL; |
|
7 |
import java.util.stream.Stream; |
|
8 |
|
|
9 |
import org.apache.commons.io.IOUtils; |
|
10 |
import org.apache.commons.logging.Log; |
|
11 |
import org.apache.commons.logging.LogFactory; |
|
12 |
import org.springframework.stereotype.Component; |
|
13 |
|
|
14 |
import eu.dnetlib.miscutils.streams.DnetStreamSupport; |
|
15 |
import eu.dnetlib.msro.workers.aggregation.collect.CollectException; |
|
16 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin; |
|
17 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam; |
|
18 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin; |
|
19 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.ProtocolParameterType; |
|
20 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.oai.engine.XmlCleaner; |
|
21 |
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
22 |
|
|
23 |
/** |
|
24 |
* @author andrea |
|
25 |
*/ |
|
26 |
@Component |
|
27 |
@DnetCollectorPlugin(value = "filesystem", parameters = { |
|
28 |
@DnetCollectorParam(value = "extensions", type = ProtocolParameterType.LIST) |
|
29 |
}) |
|
30 |
public class FilesystemCollectorPlugin implements CollectorPlugin { |
|
31 |
|
|
32 |
private static final Log log = LogFactory.getLog(FilesystemCollectorPlugin.class); |
|
33 |
|
|
34 |
@Override |
|
35 |
public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate) |
|
36 |
throws CollectException { |
|
37 |
|
|
38 |
final String baseUrl = interfaceDescriptor.getBaseUrl(); |
|
39 |
if ((baseUrl == null) || baseUrl.isEmpty()) { throw new CollectException("Param 'baseurl' is null or empty"); } |
|
40 |
|
|
41 |
try { |
|
42 |
final URL basePath = new URL(baseUrl); |
|
43 |
final File baseDir = new File(basePath.getPath()); |
|
44 |
if (!baseDir.exists()) { throw new CollectException(String.format("The base ULR %s, does not exist", basePath.getPath())); } |
|
45 |
final String extension = interfaceDescriptor.getParams().get("extensions"); |
|
46 |
|
|
47 |
final FileSystemIterator fsi = new FileSystemIterator(baseDir.getAbsolutePath(), extension); |
|
48 |
|
|
49 |
return DnetStreamSupport.generateStreamFromIterator(fsi) |
|
50 |
.map(inputFileName -> { |
|
51 |
try (FileInputStream fileInputStream = new FileInputStream(inputFileName)) { |
|
52 |
final String s = IOUtils.toString(fileInputStream); |
|
53 |
return XmlCleaner.cleanAllEntities(s.startsWith("\uFEFF") ? s.substring(1) : s); |
|
54 |
} catch (final Exception e) { |
|
55 |
log.error("Unable to read " + inputFileName); |
|
56 |
return ""; |
|
57 |
} |
|
58 |
}); |
|
59 |
} catch (final MalformedURLException e) { |
|
60 |
throw new CollectException("Filesystem collector failed! ", e); |
|
61 |
} |
|
62 |
|
|
63 |
} |
|
64 |
|
|
65 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/filesystem/FileSystemIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.filesystem; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.nio.file.Files; |
|
5 |
import java.nio.file.Path; |
|
6 |
import java.nio.file.Paths; |
|
7 |
import java.util.Iterator; |
|
8 |
import java.util.Set; |
|
9 |
|
|
10 |
import com.google.common.collect.Iterators; |
|
11 |
import com.google.common.collect.Sets; |
|
12 |
import org.apache.commons.io.FilenameUtils; |
|
13 |
import org.apache.commons.logging.Log; |
|
14 |
import org.apache.commons.logging.LogFactory; |
|
15 |
|
|
16 |
/** |
|
17 |
* Class enabling lazy & recursive iteration of a filesystem tree. The iterator iterates over file paths. |
|
18 |
* |
|
19 |
* @author Andrea |
|
20 |
*/ |
|
21 |
public class FileSystemIterator implements Iterator<String> { |
|
22 |
|
|
23 |
/** |
|
24 |
* The logger |
|
25 |
*/ |
|
26 |
private static final Log log = LogFactory.getLog(FileSystemIterator.class); |
|
27 |
|
|
28 |
private Set<String> extensions; |
|
29 |
private Iterator<Path> pathIterator; |
|
30 |
private String current; |
|
31 |
|
|
32 |
public FileSystemIterator(final String baseDir, final String extensions) { |
|
33 |
this.extensions = Sets.newHashSet(extensions.split(",")); |
|
34 |
try { |
|
35 |
this.pathIterator = Files.newDirectoryStream(Paths.get(baseDir)).iterator(); |
|
36 |
this.current = walkTillNext(); |
|
37 |
} catch (IOException e) { |
|
38 |
log.error("Cannot initialize File System Iterator. Is this path correct? " + baseDir); |
|
39 |
throw new RuntimeException("Filesystem collection error.", e); |
|
40 |
} |
|
41 |
} |
|
42 |
|
|
43 |
@Override |
|
44 |
public boolean hasNext() { |
|
45 |
return current != null; |
|
46 |
} |
|
47 |
|
|
48 |
@Override |
|
49 |
public synchronized String next() { |
|
50 |
String pivot = new String(current); |
|
51 |
current = walkTillNext(); |
|
52 |
log.debug("Returning: " + pivot); |
|
53 |
return pivot; |
|
54 |
} |
|
55 |
|
|
56 |
@Override |
|
57 |
public void remove() { |
|
58 |
} |
|
59 |
|
|
60 |
/** |
|
61 |
* Walk the filesystem recursively until it finds a candidate. Strategies: a) For any directory found during the walk, an iterator is |
|
62 |
* built and concat to the main one; b) Any file is checked against admitted extensions |
|
63 |
* |
|
64 |
* @return the next element to be returned by next call of this.next() |
|
65 |
*/ |
|
66 |
private synchronized String walkTillNext() { |
|
67 |
while (pathIterator.hasNext()) { |
|
68 |
Path nextFilePath = pathIterator.next(); |
|
69 |
if (Files.isDirectory(nextFilePath)) { |
|
70 |
// concat |
|
71 |
try { |
|
72 |
pathIterator = Iterators.concat(pathIterator, Files.newDirectoryStream(nextFilePath).iterator()); |
|
73 |
log.debug("Adding folder iterator: " + nextFilePath.toString()); |
|
74 |
} catch (IOException e) { |
|
75 |
log.error("Cannot create folder iterator! Is this path correct? " + nextFilePath.toString()); |
|
76 |
return null; |
|
77 |
} |
|
78 |
} else { |
|
79 |
if (extensions.contains(FilenameUtils.getExtension(nextFilePath.toString()))) { |
|
80 |
log.debug("Returning: " + nextFilePath.toString()); |
|
81 |
return nextFilePath.toString(); |
|
82 |
} |
|
83 |
} |
|
84 |
} |
|
85 |
return null; |
|
86 |
} |
|
87 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/ProtocolParameterType.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins; |
|
2 |
|
|
3 |
public enum ProtocolParameterType { |
|
4 |
TEXT, NUMBER, LIST, BOOLEAN |
|
5 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/CollectorPlugin.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins; |
|
2 |
|
|
3 |
import java.util.Arrays; |
|
4 |
import java.util.List; |
|
5 |
import java.util.stream.Collectors; |
|
6 |
import java.util.stream.Stream; |
|
7 |
|
|
8 |
import org.apache.commons.lang3.StringUtils; |
|
9 |
|
|
10 |
import eu.dnetlib.msro.workers.aggregation.collect.CollectException; |
|
11 |
import eu.dnetlib.msro.workers.aggregation.collect.functions.NullFunction; |
|
12 |
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
13 |
|
|
14 |
public interface CollectorPlugin { |
|
15 |
|
|
16 |
Stream<String> collect(InterfaceDescriptor interfaceDescriptor, String fromDate, String untilDate) throws CollectException; |
|
17 |
|
|
18 |
default String getProtocol() { |
|
19 |
return getClass().getAnnotation(DnetCollectorPlugin.class).value(); |
|
20 |
} |
|
21 |
|
|
22 |
default List<String> listNameParameters() { |
|
23 |
return Arrays.stream(getClass().getAnnotation(DnetCollectorPlugin.class).parameters()) |
|
24 |
.map(DnetCollectorParam::value) |
|
25 |
.collect(Collectors.toList()); |
|
26 |
} |
|
27 |
|
|
28 |
default ProtocolDescriptor getProtocolDescriptor() { |
|
29 |
final DnetCollectorPlugin ann = getClass().getAnnotation(DnetCollectorPlugin.class); |
|
30 |
|
|
31 |
final List<ProtocolParameter> params = Arrays.stream(ann.parameters()) |
|
32 |
.map(ap -> { |
|
33 |
final ProtocolParameter p = new ProtocolParameter(); |
|
34 |
p.setName(ap.value()); |
|
35 |
p.setType(ap.type()); |
|
36 |
p.setOptional(ap.optional()); |
|
37 |
p.setPopulateFunction(ap.populateFunction()); |
|
38 |
p.setFunctionPopulated(ap.populateFunction() != NullFunction.class); |
|
39 |
if (StringUtils.isNotBlank(ap.regex())) { |
|
40 |
p.setRegex(ap.regex()); |
|
41 |
} |
|
42 |
return p; |
|
43 |
}) |
|
44 |
.collect(Collectors.toList()); |
|
45 |
|
|
46 |
return new ProtocolDescriptor(ann.value(), params); |
|
47 |
} |
|
48 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/filesfrommetadata/FilesFromMetadataCollectorPlugin.java | ||
---|---|---|
1 |
/** |
|
2 |
* |
|
3 |
*/ |
|
4 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.filesfrommetadata; |
|
5 |
|
|
6 |
import java.util.stream.Stream; |
|
7 |
|
|
8 |
import org.springframework.stereotype.Component; |
|
9 |
|
|
10 |
import eu.dnetlib.msro.workers.aggregation.collect.CollectException; |
|
11 |
import eu.dnetlib.msro.workers.aggregation.collect.functions.PopulateFileDownloadBasePath; |
|
12 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin; |
|
13 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam; |
|
14 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin; |
|
15 |
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
16 |
|
|
17 |
/** |
|
18 |
* @author sandro |
|
19 |
*/ |
|
20 |
@Component |
|
21 |
@DnetCollectorPlugin(value = "files_from_metadata", parameters = { |
|
22 |
@DnetCollectorParam(value = "basePath", populateFunction = PopulateFileDownloadBasePath.class) |
|
23 |
}) |
|
24 |
public class FilesFromMetadataCollectorPlugin implements CollectorPlugin { |
|
25 |
|
|
26 |
/** |
|
27 |
* {@inheritDoc} |
|
28 |
*/ |
|
29 |
@Override |
|
30 |
public Stream<String> collect(final InterfaceDescriptor arg0, final String arg1, final String arg2) throws CollectException { |
|
31 |
// TODO Auto-generated method stub |
|
32 |
return null; |
|
33 |
} |
|
34 |
|
|
35 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/ftp/FtpCollectorPlugin.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.ftp; |
|
2 |
|
|
3 |
import java.util.Iterator; |
|
4 |
import java.util.Set; |
|
5 |
import java.util.stream.Stream; |
|
6 |
|
|
7 |
import org.springframework.stereotype.Component; |
|
8 |
|
|
9 |
import com.google.common.base.Splitter; |
|
10 |
import com.google.common.collect.Sets; |
|
11 |
|
|
12 |
import eu.dnetlib.miscutils.streams.DnetStreamSupport; |
|
13 |
import eu.dnetlib.msro.workers.aggregation.collect.CollectException; |
|
14 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin; |
|
15 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam; |
|
16 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin; |
|
17 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.ProtocolParameterType; |
|
18 |
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
19 |
|
|
20 |
/** |
|
21 |
* @author Author: Andrea Mannocci |
|
22 |
*/ |
|
23 |
@Component |
|
24 |
@DnetCollectorPlugin(value = "ftp", parameters = { |
|
25 |
@DnetCollectorParam("username"), |
|
26 |
@DnetCollectorParam("password"), |
|
27 |
@DnetCollectorParam(value = "recursive", type = ProtocolParameterType.BOOLEAN), |
|
28 |
@DnetCollectorParam(value = "extensions", type = ProtocolParameterType.LIST) |
|
29 |
}) |
|
30 |
public class FtpCollectorPlugin implements CollectorPlugin { |
|
31 |
|
|
32 |
@Override |
|
33 |
public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate) |
|
34 |
throws CollectException { |
|
35 |
|
|
36 |
final String baseUrl = interfaceDescriptor.getBaseUrl(); |
|
37 |
final String username = interfaceDescriptor.getParams().get("username"); |
|
38 |
final String password = interfaceDescriptor.getParams().get("password"); |
|
39 |
final String recursive = interfaceDescriptor.getParams().get("recursive"); |
|
40 |
final String extensions = interfaceDescriptor.getParams().get("extensions"); |
|
41 |
|
|
42 |
if ((baseUrl == null) || baseUrl.isEmpty()) { throw new CollectException("Param 'baseurl' is null or empty"); } |
|
43 |
if ((username == null) || username.isEmpty()) { throw new CollectException("Param 'username' is null or empty"); } |
|
44 |
if ((password == null) || password.isEmpty()) { throw new CollectException("Param 'password' is null or empty"); } |
|
45 |
if ((recursive == null) || recursive.isEmpty()) { throw new CollectException("Param 'recursive' is null or empty"); } |
|
46 |
if ((extensions == null) || extensions.isEmpty()) { throw new CollectException("Param 'extensions' is null or empty"); } |
|
47 |
|
|
48 |
final boolean isRecursive = "true".equals(recursive); |
|
49 |
|
|
50 |
final Set<String> extensionsSet = parseSet(extensions); |
|
51 |
|
|
52 |
final Iterator<String> iter = new FtpIterator(baseUrl, username, password, isRecursive, extensionsSet); |
|
53 |
|
|
54 |
return DnetStreamSupport.generateStreamFromIterator(iter); |
|
55 |
} |
|
56 |
|
|
57 |
private Set<String> parseSet(final String extensions) { |
|
58 |
return Sets.newHashSet(Splitter.on(",").omitEmptyStrings().trimResults().split(extensions)); |
|
59 |
} |
|
60 |
|
|
61 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/ftp/FtpIterator.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.ftp; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.io.OutputStream; |
|
5 |
import java.net.MalformedURLException; |
|
6 |
import java.net.URL; |
|
7 |
import java.util.Iterator; |
|
8 |
import java.util.LinkedList; |
|
9 |
import java.util.Queue; |
|
10 |
import java.util.Set; |
|
11 |
|
|
12 |
import org.apache.commons.io.output.ByteArrayOutputStream; |
|
13 |
import org.apache.commons.logging.Log; |
|
14 |
import org.apache.commons.logging.LogFactory; |
|
15 |
import org.apache.commons.net.ftp.FTPClient; |
|
16 |
import org.apache.commons.net.ftp.FTPFile; |
|
17 |
import org.apache.commons.net.ftp.FTPReply; |
|
18 |
|
|
19 |
import eu.dnetlib.msro.exceptions.CollectorServiceRuntimeException; |
|
20 |
|
|
21 |
/** |
|
22 |
* @author Author: Andrea Mannocci |
|
23 |
*/ |
|
24 |
public class FtpIterator implements Iterator<String> { |
|
25 |
|
|
26 |
private static final Log log = LogFactory.getLog(FtpIterator.class); |
|
27 |
|
|
28 |
private static final int MAX_RETRIES = 5; |
|
29 |
private static final int DEFAULT_TIMEOUT = 30000; |
|
30 |
private static final long BACKOFF_MILLIS = 10000; |
|
31 |
|
|
32 |
private FTPClient ftpClient; |
|
33 |
private String ftpServerAddress; |
|
34 |
private String remoteFtpBasePath; |
|
35 |
private String username; |
|
36 |
private String password; |
|
37 |
private boolean isRecursive; |
|
38 |
private Set<String> extensionsSet; |
|
39 |
|
|
40 |
private Queue<String> queue; |
|
41 |
|
|
42 |
public FtpIterator(final String baseUrl, final String username, final String password, final boolean isRecursive, |
|
43 |
final Set<String> extensionsSet) { |
|
44 |
this.username = username; |
|
45 |
this.password = password; |
|
46 |
this.isRecursive = isRecursive; |
|
47 |
this.extensionsSet = extensionsSet; |
|
48 |
try { |
|
49 |
final URL server = new URL(baseUrl); |
|
50 |
ftpServerAddress = server.getHost(); |
|
51 |
remoteFtpBasePath = server.getPath(); |
|
52 |
} catch (final MalformedURLException e1) { |
|
53 |
throw new CollectorServiceRuntimeException("Malformed URL exception " + baseUrl); |
|
54 |
} |
|
55 |
|
|
56 |
connectToFtpServer(); |
|
57 |
initializeQueue(); |
|
58 |
} |
|
59 |
|
|
60 |
private void connectToFtpServer() { |
|
61 |
ftpClient = new FTPClient(); |
|
62 |
ftpClient.setDefaultTimeout(DEFAULT_TIMEOUT); |
|
63 |
ftpClient.setDataTimeout(DEFAULT_TIMEOUT); |
|
64 |
ftpClient.setConnectTimeout(DEFAULT_TIMEOUT); |
|
65 |
try { |
|
66 |
ftpClient.connect(ftpServerAddress); |
|
67 |
|
|
68 |
// try to login |
|
69 |
if (!ftpClient.login(username, password)) { |
|
70 |
ftpClient.logout(); |
|
71 |
throw new CollectorServiceRuntimeException("Unable to login to FTP server " + ftpServerAddress); |
|
72 |
} |
|
73 |
final int reply = ftpClient.getReplyCode(); |
|
74 |
if (!FTPReply.isPositiveCompletion(reply)) { |
|
75 |
ftpClient.disconnect(); |
|
76 |
throw new CollectorServiceRuntimeException("Unable to connect to FTP server " + ftpServerAddress); |
|
77 |
} |
|
78 |
|
|
79 |
ftpClient.enterLocalPassiveMode(); |
|
80 |
log.info("Connected to FTP server " + ftpServerAddress); |
|
81 |
log.info(String.format("FTP collecting from %s with recursion = %s", remoteFtpBasePath, isRecursive)); |
|
82 |
} catch (final IOException e) { |
|
83 |
throw new CollectorServiceRuntimeException("Unable to connect to FTP server " + ftpServerAddress); |
|
84 |
} |
|
85 |
} |
|
86 |
|
|
87 |
private void disconnectFromFtpServer() { |
|
88 |
try { |
|
89 |
if (ftpClient.isConnected()) { |
|
90 |
ftpClient.logout(); |
|
91 |
ftpClient.disconnect(); |
|
92 |
} |
|
93 |
} catch (final IOException e) { |
|
94 |
log.error("Failed to logout & disconnect from the FTP server", e); |
|
95 |
} |
|
96 |
} |
|
97 |
|
|
98 |
private void initializeQueue() { |
|
99 |
queue = new LinkedList<String>(); |
|
100 |
listDirectoryRecursive(remoteFtpBasePath, ""); |
|
101 |
} |
|
102 |
|
|
103 |
private void listDirectoryRecursive(final String parentDir, final String currentDir) { |
|
104 |
String dirToList = parentDir; |
|
105 |
if (!currentDir.equals("")) { |
|
106 |
dirToList += "/" + currentDir; |
|
107 |
} |
|
108 |
FTPFile[] subFiles; |
|
109 |
try { |
|
110 |
subFiles = ftpClient.listFiles(dirToList); |
|
111 |
if ((subFiles != null) && (subFiles.length > 0)) { |
|
112 |
for (final FTPFile aFile : subFiles) { |
|
113 |
final String currentFileName = aFile.getName(); |
|
114 |
if (currentFileName.equals(".") || currentFileName.equals("..")) { |
|
115 |
// skip parent directory and directory itself |
|
116 |
continue; |
|
117 |
} |
|
118 |
if (aFile.isDirectory()) { |
|
119 |
if (isRecursive) { |
|
120 |
listDirectoryRecursive(dirToList, currentFileName); |
|
121 |
} |
|
122 |
} else { |
|
123 |
// test the file for extensions compliance and, just in case, add it to the list. |
|
124 |
for (final String ext : extensionsSet) { |
|
125 |
if (currentFileName.endsWith(ext)) { |
|
126 |
queue.add(dirToList + "/" + currentFileName); |
|
127 |
} |
|
128 |
} |
|
129 |
} |
|
130 |
} |
|
131 |
} |
|
132 |
} catch (final IOException e) { |
|
133 |
throw new CollectorServiceRuntimeException("Unable to list FTP remote folder", e); |
|
134 |
} |
|
135 |
} |
|
136 |
|
|
137 |
@Override |
|
138 |
public boolean hasNext() { |
|
139 |
if (queue.isEmpty()) { |
|
140 |
disconnectFromFtpServer(); |
|
141 |
return false; |
|
142 |
} else { |
|
143 |
return true; |
|
144 |
} |
|
145 |
} |
|
146 |
|
|
147 |
@Override |
|
148 |
public String next() { |
|
149 |
final String nextRemotePath = queue.remove(); |
|
150 |
int nRepeat = 0; |
|
151 |
while (nRepeat < MAX_RETRIES) { |
|
152 |
try { |
|
153 |
final OutputStream baos = new ByteArrayOutputStream(); |
|
154 |
if (!ftpClient.isConnected()) { |
|
155 |
connectToFtpServer(); |
|
156 |
} |
|
157 |
ftpClient.retrieveFile(nextRemotePath, baos); |
|
158 |
|
|
159 |
log.debug(String.format("Collected file from FTP: %s%s", ftpServerAddress, nextRemotePath)); |
|
160 |
return baos.toString(); |
|
161 |
} catch (final IOException e) { |
|
162 |
nRepeat++; |
|
163 |
log.warn(String.format("An error occurred [%s] for %s%s, retrying.. [retried %s time(s)]", e.getMessage(), ftpServerAddress, nextRemotePath, |
|
164 |
nRepeat)); |
|
165 |
disconnectFromFtpServer(); |
|
166 |
try { |
|
167 |
Thread.sleep(BACKOFF_MILLIS); |
|
168 |
} catch (final InterruptedException e1) { |
|
169 |
log.error(e1); |
|
170 |
} |
|
171 |
} |
|
172 |
} |
|
173 |
throw new CollectorServiceRuntimeException( |
|
174 |
String.format("Impossible to retrieve FTP file %s after %s retries. Aborting FTP collection.", nextRemotePath, nRepeat)); |
|
175 |
} |
|
176 |
|
|
177 |
@Override |
|
178 |
public void remove() { |
|
179 |
throw new UnsupportedOperationException(); |
|
180 |
} |
|
181 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/csv/FileCSVCollectorPlugin.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.csv; |
|
2 |
|
|
3 |
import java.io.BufferedReader; |
|
4 |
import java.io.File; |
|
5 |
import java.io.FileReader; |
|
6 |
import java.io.IOException; |
|
7 |
import java.net.MalformedURLException; |
|
8 |
import java.net.URL; |
|
9 |
import java.util.Iterator; |
|
10 |
import java.util.Spliterator; |
|
11 |
import java.util.Spliterators; |
|
12 |
import java.util.stream.Stream; |
|
13 |
import java.util.stream.StreamSupport; |
|
14 |
|
|
15 |
import org.apache.commons.lang3.StringEscapeUtils; |
|
16 |
import org.apache.commons.logging.Log; |
|
17 |
import org.apache.commons.logging.LogFactory; |
|
18 |
import org.dom4j.Document; |
|
19 |
import org.dom4j.DocumentHelper; |
|
20 |
import org.dom4j.Element; |
|
21 |
import org.springframework.stereotype.Component; |
|
22 |
|
|
23 |
import eu.dnetlib.msro.workers.aggregation.collect.CollectException; |
|
24 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin; |
|
25 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam; |
|
26 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin; |
|
27 |
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
28 |
|
|
29 |
@Component |
|
30 |
@DnetCollectorPlugin(value = "fileCSV", parameters = { |
|
31 |
@DnetCollectorParam("header"), |
|
32 |
@DnetCollectorParam("separator"), |
|
33 |
@DnetCollectorParam("identifier"), |
|
34 |
@DnetCollectorParam("quote") |
|
35 |
}) |
|
36 |
public class FileCSVCollectorPlugin implements CollectorPlugin { |
|
37 |
|
|
38 |
private static final Log log = LogFactory.getLog(FileCSVCollectorPlugin.class); |
|
39 |
private String[] headers = null; |
|
40 |
private int identifierNumber; |
|
41 |
|
|
42 |
@Override |
|
43 |
public Stream<String> collect(final InterfaceDescriptor interfaceDescriptor, final String fromDate, final String untilDate) |
|
44 |
throws CollectException { |
|
45 |
final String header = interfaceDescriptor.getParams().get("header"); |
|
46 |
final String separator = StringEscapeUtils.unescapeJava(interfaceDescriptor.getParams().get("separator")); |
|
47 |
|
|
48 |
identifierNumber = Integer.parseInt(interfaceDescriptor.getParams().get("identifier")); |
|
49 |
URL u = null; |
|
50 |
try { |
|
51 |
u = new URL(interfaceDescriptor.getBaseUrl()); |
|
52 |
} catch (final MalformedURLException e1) { |
|
53 |
throw new CollectException("Invalid URL: " + interfaceDescriptor.getBaseUrl(), e1); |
|
54 |
} |
|
55 |
final String baseUrl = u.getPath(); |
|
56 |
|
|
57 |
log.info("base URL = " + baseUrl); |
|
58 |
|
|
59 |
try { |
|
60 |
final BufferedReader br = new BufferedReader(new FileReader(new File(baseUrl))); |
|
61 |
if ((header != null) && "true".equals(header.toLowerCase())) { |
|
62 |
headers = br.readLine().split(separator); |
|
63 |
} |
|
64 |
|
|
65 |
final Iterator<String> iter = new FileCSVIterator(br, separator); |
|
66 |
|
|
67 |
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iter, Spliterator.ORDERED), false); |
|
68 |
|
|
69 |
} catch (final Exception e) { |
|
70 |
throw new CollectException("Error iterating CSV", e); |
|
71 |
} |
|
72 |
} |
|
73 |
|
|
74 |
class FileCSVIterator implements Iterator<String> { |
|
75 |
|
|
76 |
private String next; |
|
77 |
|
|
78 |
private BufferedReader reader; |
|
79 |
|
|
80 |
private String separator; |
|
81 |
|
|
82 |
public FileCSVIterator(final BufferedReader reader, final String separator) { |
|
83 |
this.reader = reader; |
|
84 |
this.separator = separator; |
|
85 |
next = calculateNext(); |
|
86 |
} |
|
87 |
|
|
88 |
@Override |
|
89 |
public boolean hasNext() { |
|
90 |
return next != null; |
|
91 |
} |
|
92 |
|
|
93 |
@Override |
|
94 |
public String next() { |
|
95 |
final String s = next; |
|
96 |
next = calculateNext(); |
|
97 |
return s; |
|
98 |
} |
|
99 |
|
|
100 |
private String calculateNext() { |
|
101 |
try { |
|
102 |
final Document document = DocumentHelper.createDocument(); |
|
103 |
final Element root = document.addElement("csvRecord"); |
|
104 |
|
|
105 |
String newLine = reader.readLine(); |
|
106 |
|
|
107 |
// FOR SOME FILES IT RETURN NULL ALSO IF THE FILE IS NOT READY DONE |
|
108 |
if (newLine == null) { |
|
109 |
newLine = reader.readLine(); |
|
110 |
} |
|
111 |
if (newLine == null) { |
|
112 |
log.info("there is no line, closing RESULT SET"); |
|
113 |
|
|
114 |
reader.close(); |
|
115 |
return null; |
|
116 |
} |
|
117 |
final String[] currentRow = newLine.split(separator); |
|
118 |
|
|
119 |
if (currentRow != null) { |
|
120 |
|
|
121 |
for (int i = 0; i < currentRow.length; i++) { |
|
122 |
final String hAttribute = (headers != null) && (i < headers.length) ? headers[i] : "column" + i; |
|
123 |
|
|
124 |
final Element row = root.addElement("column"); |
|
125 |
if (i == identifierNumber) { |
|
126 |
row.addAttribute("isID", "true"); |
|
127 |
} |
|
128 |
row.addAttribute("name", hAttribute).addText(currentRow[i]); |
|
129 |
} |
|
130 |
return document.asXML(); |
|
131 |
} |
|
132 |
} catch (final IOException e) { |
|
133 |
log.error("Error calculating next csv element", e); |
|
134 |
} |
|
135 |
return null; |
|
136 |
} |
|
137 |
|
|
138 |
@Override |
|
139 |
public void remove() { |
|
140 |
throw new UnsupportedOperationException(); |
|
141 |
} |
|
142 |
|
|
143 |
} |
|
144 |
|
|
145 |
} |
modules/dnet-springboot-apps/trunk/dnet-simple-aggregation-worker/src/main/java/eu/dnetlib/msro/workers/aggregation/collect/plugins/csv/HttpCSVCollectorPlugin.java | ||
---|---|---|
1 |
package eu.dnetlib.msro.workers.aggregation.collect.plugins.csv; |
|
2 |
|
|
3 |
import java.io.InputStreamReader; |
|
4 |
import java.io.Reader; |
|
5 |
import java.net.URL; |
|
6 |
import java.util.Iterator; |
|
7 |
import java.util.Set; |
|
8 |
import java.util.stream.Stream; |
|
9 |
|
|
10 |
import org.apache.commons.csv.CSVFormat; |
|
11 |
import org.apache.commons.csv.CSVParser; |
|
12 |
import org.apache.commons.lang3.StringUtils; |
|
13 |
import org.apache.commons.logging.Log; |
|
14 |
import org.apache.commons.logging.LogFactory; |
|
15 |
import org.dom4j.Document; |
|
16 |
import org.dom4j.DocumentHelper; |
|
17 |
import org.dom4j.Element; |
|
18 |
import org.springframework.stereotype.Component; |
|
19 |
|
|
20 |
import com.google.common.collect.Iterators; |
|
21 |
|
|
22 |
import eu.dnetlib.miscutils.streams.DnetStreamSupport; |
|
23 |
import eu.dnetlib.msro.workers.aggregation.collect.CollectException; |
|
24 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.CollectorPlugin; |
|
25 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorParam; |
|
26 |
import eu.dnetlib.msro.workers.aggregation.collect.plugins.DnetCollectorPlugin; |
|
27 |
import eu.dnetlib.msro.workers.aggregation.objects.InterfaceDescriptor; |
|
28 |
|
|
29 |
/** |
|
30 |
* The Class HttpCSVCollectorPlugin. |
|
31 |
*/ |
|
32 |
@Component |
|
33 |
@DnetCollectorPlugin(value = "httpCSV", parameters = { |
|
34 |
@DnetCollectorParam("separator"), |
|
35 |
@DnetCollectorParam("identifier"), |
|
36 |
@DnetCollectorParam("quote"), |
|
37 |
}) |
|
38 |
public class HttpCSVCollectorPlugin implements CollectorPlugin { |
|
39 |
|
|
40 |
private static final Log log = LogFactory.getLog(HttpCSVCollectorPlugin.class); |
|
41 |
|
|
42 |
/* |
|
43 |
* (non-Javadoc) |
|
44 |
* |
|
45 |
* @see eu.dnetlib.msro.workers.aggregation.collect.plugin.CollectorPlugin#collect(eu.dnetlib.msro.workers.aggregation.collect.rmi. |
|
46 |
* InterfaceDescriptor, java.lang.String, java.lang.String) |
|
47 |
*/ |
|
48 |
@Override |
|
49 |
public Stream<String> collect(final InterfaceDescriptor descriptor, final String fromDate, final String untilDate) throws CollectException { |
|
50 |
return DnetStreamSupport.generateStreamFromIterator(getIterator(descriptor)); |
|
51 |
} |
|
52 |
|
|
53 |
private Iterator<String> getIterator(final InterfaceDescriptor descriptor) { |
|
54 |
try { |
|
55 |
final URL url = new URL(descriptor.getBaseUrl()); |
|
56 |
url.openConnection(); |
|
57 |
|
|
58 |
final String separatorString = descriptor.getParams().get("separator"); |
|
59 |
final String identifier = descriptor.getParams().get("identifier"); |
|
60 |
final String quote = descriptor.getParams().get("quote"); |
|
61 |
final char separator = separatorString.equals("\\t") || StringUtils.isBlank(separatorString) ? '\t' : separatorString.charAt(0); |
|
62 |
|
|
63 |
final CSVFormat format = StringUtils.isBlank(quote) ? CSVFormat.EXCEL.withHeader().withDelimiter(separator) |
|
64 |
: CSVFormat.EXCEL.withHeader().withDelimiter(separator).withQuote(quote.charAt(0)); |
Also available in: Unified diff