Project

General

Profile

« Previous | Next » 

Revision 49006

FTP metadata collector plugin now supports incremental harvesting

View differences:

modules/dnet-modular-collector-service/trunk/src/test/java/eu/dnetlib/data/collector/plugins/ftp/FtpIteratorTest.java
3 3
import java.util.Set;
4 4

  
5 5
import com.google.common.collect.Sets;
6
import org.junit.Ignore;
7 6
import org.junit.Test;
8 7

  
9
@Ignore
8
import static org.junit.Assert.assertFalse;
9
import static org.junit.Assert.assertTrue;
10

  
10 11
public class FtpIteratorTest {
11 12

  
12 13
	private String baseUrl = "ftp://ftp.eagle.research-infrastructures.eu/content/ELTE";
......
17 18

  
18 19
	@Test
19 20
	public void test() {
20
		final FtpIterator iterator = new FtpIterator(baseUrl, username, password, isRecursive, extensions);
21
		final FtpIterator iterator = new FtpIterator(baseUrl, username, password, isRecursive, extensions, null);
22
		int i =5;
23
		while (iterator.hasNext() && i > 0) {
24
			iterator.next();
25
			i--;
26
		}
27
	}
21 28

  
22
		while (iterator.hasNext()) {
29
	@Test
30
	public void testIncremental() {
31
		final FtpIterator iterator = new FtpIterator(baseUrl, username, password, isRecursive, extensions, "2016-01-04");
32
		assertTrue(iterator.hasNext());
33
		int i =5;
34
		while (iterator.hasNext() && i > 0) {
23 35
			iterator.next();
36
			i--;
24 37
		}
25 38
	}
26 39

  
40
	@Test
41
	public void testIncrementalNoRecords() {
42
		final FtpIterator iterator = new FtpIterator(baseUrl, username, password, isRecursive, extensions, "2017-01-04");
43
		assertFalse(iterator.hasNext());
44

  
45
	}
46

  
27 47
}
modules/dnet-modular-collector-service/trunk/src/main/java/eu/dnetlib/data/collector/plugins/ftp/FtpCollectorPlugin.java
1 1
package eu.dnetlib.data.collector.plugins.ftp;
2 2

  
3
import java.util.Iterator;
4
import java.util.Set;
5

  
3 6
import com.google.common.base.Splitter;
4 7
import com.google.common.collect.Sets;
5 8
import eu.dnetlib.data.collector.plugin.AbstractCollectorPlugin;
......
7 10
import eu.dnetlib.data.collector.rmi.InterfaceDescriptor;
8 11
import org.springframework.beans.factory.annotation.Required;
9 12

  
10
import java.util.Iterator;
11
import java.util.Set;
12

  
13 13
/**
14 14
 *
15 15
 * @author Author: Andrea Mannocci
......
35 35
		if ((recursive == null) || recursive.isEmpty()) { throw new CollectorServiceException("Param 'recursive' is null or empty"); }
36 36
		if ((extensions == null) || extensions.isEmpty()) { throw new CollectorServiceException("Param 'extensions' is null or empty"); }
37 37

  
38
		if (fromDate != null && !fromDate.matches("\\d{4}-\\d{2}-\\d{2}")) { throw new CollectorServiceException("Invalid date (YYYY-MM-DD): " + fromDate); }
39

  
38 40
		return new Iterable<String>() {
39 41

  
40 42
			boolean isRecursive = "true".equals(recursive);
......
43 45

  
44 46
			@Override
45 47
			public Iterator<String> iterator() {
46
				return getFtpIteratorFactory().newIterator(baseUrl, username, password, isRecursive, extensionsSet);
48
				return getFtpIteratorFactory().newIterator(baseUrl, username, password, isRecursive, extensionsSet, fromDate);
47 49
			}
48 50

  
49 51
			private Set<String> parseSet(final String extensions) {
modules/dnet-modular-collector-service/trunk/src/main/java/eu/dnetlib/data/collector/plugins/ftp/FtpIterator.java
1 1
package eu.dnetlib.data.collector.plugins.ftp;
2 2

  
3
import java.io.IOException;
4
import java.io.OutputStream;
5
import java.net.MalformedURLException;
6
import java.net.URL;
7
import java.util.*;
8

  
3 9
import eu.dnetlib.data.collector.rmi.CollectorServiceRuntimeException;
4 10
import org.apache.commons.io.output.ByteArrayOutputStream;
11
import org.apache.commons.lang.StringUtils;
5 12
import org.apache.commons.logging.Log;
6 13
import org.apache.commons.logging.LogFactory;
7 14
import org.apache.commons.net.ftp.FTPClient;
8 15
import org.apache.commons.net.ftp.FTPFile;
9 16
import org.apache.commons.net.ftp.FTPReply;
17
import org.joda.time.DateTime;
18
import org.joda.time.format.DateTimeFormat;
19
import org.joda.time.format.DateTimeFormatter;
10 20

  
11
import java.io.IOException;
12
import java.io.OutputStream;
13
import java.net.MalformedURLException;
14
import java.net.URL;
15
import java.util.Iterator;
16
import java.util.LinkedList;
17
import java.util.Queue;
18
import java.util.Set;
19

  
20 21
/**
21 22
 *
22 23
 * @author Author: Andrea Mannocci
......
37 38
	private String password;
38 39
	private boolean isRecursive;
39 40
	private Set<String> extensionsSet;
41
	private boolean incremental;
42
	private DateTime fromDate = null;
43
	private DateTimeFormatter simpleDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd");
40 44

  
41 45
	private Queue<String> queue;
42 46

  
43 47
	public FtpIterator(final String baseUrl, final String username, final String password, final boolean isRecursive,
44
			final Set<String> extensionsSet) {
48
			final Set<String> extensionsSet, String fromDate) {
45 49
		this.username = username;
46 50
		this.password = password;
47 51
		this.isRecursive = isRecursive;
48 52
		this.extensionsSet = extensionsSet;
53
		this.incremental = StringUtils.isNotBlank(fromDate);
54
		if (incremental) {
55
			//I expect fromDate in the format 'yyyy-MM-dd'. See class eu.dnetlib.msro.workflows.nodes.collect.FindDateRangeForIncrementalHarvestingJobNode .
56
			this.fromDate = DateTime.parse(fromDate, simpleDateTimeFormatter);
57
			log.debug("fromDate string: " + fromDate + " -- parsed: " + this.fromDate.toString());
58
		}
49 59
		try {
50 60
			URL server = new URL(baseUrl);
51 61
			this.ftpServerAddress = server.getHost();
......
112 122
			if ((subFiles != null) && (subFiles.length > 0)) {
113 123
				for (FTPFile aFile : subFiles) {
114 124
					String currentFileName = aFile.getName();
125

  
115 126
					if (currentFileName.equals(".") || currentFileName.equals("..")) {
116 127
						// skip parent directory and directory itself
117 128
						continue;
......
124 135
						// test the file for extensions compliance and, just in case, add it to the list.
125 136
						for (String ext : extensionsSet) {
126 137
							if (currentFileName.endsWith(ext)) {
127
								queue.add(dirToList + "/" + currentFileName);
138
								//incremental mode: let's check the last update date
139
								if(incremental){
140
									Calendar timestamp = aFile.getTimestamp();
141
									DateTime lastModificationDate = new DateTime(timestamp);
142
									if(lastModificationDate.isAfter(fromDate)){
143
										queue.add(dirToList + "/" + currentFileName);
144
										log.debug(currentFileName + " has changed and must be re-collected");
145
									} else {
146
										if (log.isDebugEnabled()) {
147
											log.debug(currentFileName + " has not changed since last collection");
148
										}
149
									}
150
								}
151
								else {
152
									//not incremental: just add it to the queue
153
									queue.add(dirToList + "/" + currentFileName);
154
								}
128 155
							}
129 156
						}
130 157
					}
modules/dnet-modular-collector-service/trunk/src/main/java/eu/dnetlib/data/collector/plugins/ftp/FtpIteratorFactory.java
14 14
										final String username,
15 15
										final String password,
16 16
										final boolean isRecursive,
17
										final Set<String> extensionsSet) {
18
		return new FtpIterator(baseUrl, username, password, isRecursive, extensionsSet);
17
										final Set<String> extensionsSet, final String fromDate) {
18
		return new FtpIterator(baseUrl, username, password, isRecursive, extensionsSet, fromDate);
19 19
	}
20 20
}

Also available in: Unified diff