Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.ftp;
2

    
3
import java.io.IOException;
4
import java.io.OutputStream;
5
import java.net.MalformedURLException;
6
import java.net.URL;
7
import java.util.*;
8

    
9
import eu.dnetlib.data.collector.rmi.CollectorServiceRuntimeException;
10
import org.apache.commons.io.output.ByteArrayOutputStream;
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.apache.commons.net.ftp.FTPClient;
15
import org.apache.commons.net.ftp.FTPFile;
16
import org.apache.commons.net.ftp.FTPReply;
17
import org.joda.time.DateTime;
18
import org.joda.time.format.DateTimeFormat;
19
import org.joda.time.format.DateTimeFormatter;
20

    
21
/**
22
 *
23
 * @author Author: Andrea Mannocci
24
 *
25
 */
26
public class FtpIterator implements Iterator<String> {
27

    
28
	private static final Log log = LogFactory.getLog(FtpIterator.class);
29

    
30
	private static final int MAX_RETRIES = 5;
31
	private static final int DEFAULT_TIMEOUT = 30000;
32
	private static final long BACKOFF_MILLIS = 10000;
33

    
34
	private FTPClient ftpClient;
35
	private String ftpServerAddress;
36
	private String remoteFtpBasePath;
37
	private String username;
38
	private String password;
39
	private boolean isRecursive;
40
	private Set<String> extensionsSet;
41
	private boolean incremental;
42
	private DateTime fromDate = null;
43
	private DateTimeFormatter simpleDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd");
44

    
45
	private Queue<String> queue;
46

    
47
	public FtpIterator(final String baseUrl, final String username, final String password, final boolean isRecursive,
48
			final Set<String> extensionsSet, String fromDate) {
49
		this.username = username;
50
		this.password = password;
51
		this.isRecursive = isRecursive;
52
		this.extensionsSet = extensionsSet;
53
		this.incremental = StringUtils.isNotBlank(fromDate);
54
		if (incremental) {
55
			//I expect fromDate in the format 'yyyy-MM-dd'. See class eu.dnetlib.msro.workflows.nodes.collect.FindDateRangeForIncrementalHarvestingJobNode .
56
			this.fromDate = DateTime.parse(fromDate, simpleDateTimeFormatter);
57
			log.debug("fromDate string: " + fromDate + " -- parsed: " + this.fromDate.toString());
58
		}
59
		try {
60
			URL server = new URL(baseUrl);
61
			this.ftpServerAddress = server.getHost();
62
			this.remoteFtpBasePath = server.getPath();
63
		} catch (MalformedURLException e1) {
64
			throw new CollectorServiceRuntimeException("Malformed URL exception " + baseUrl);
65
		}
66

    
67
		connectToFtpServer();
68
		initializeQueue();
69
	}
70

    
71
	private void connectToFtpServer() {
72
		ftpClient = new FTPClient();
73
		ftpClient.setDefaultTimeout(DEFAULT_TIMEOUT);
74
		ftpClient.setDataTimeout(DEFAULT_TIMEOUT);
75
		ftpClient.setConnectTimeout(DEFAULT_TIMEOUT);
76
		try {
77
			ftpClient.connect(ftpServerAddress);
78

    
79
			// try to login
80
			if (!ftpClient.login(username, password)) {
81
				ftpClient.logout();
82
				throw new CollectorServiceRuntimeException("Unable to login to FTP server " + ftpServerAddress);
83
			}
84
			int reply = ftpClient.getReplyCode();
85
			if (!FTPReply.isPositiveCompletion(reply)) {
86
				ftpClient.disconnect();
87
				throw new CollectorServiceRuntimeException("Unable to connect to FTP server " + ftpServerAddress);
88
			}
89

    
90
			ftpClient.enterLocalPassiveMode();
91
			log.info("Connected to FTP server " + ftpServerAddress);
92
			log.info(String.format("FTP collecting from %s with recursion = %s", remoteFtpBasePath, isRecursive));
93
		} catch (IOException e) {
94
			throw new CollectorServiceRuntimeException("Unable to connect to FTP server " + ftpServerAddress);
95
		}
96
	}
97

    
98
	private void disconnectFromFtpServer() {
99
		try {
100
			if (ftpClient.isConnected()) {
101
				ftpClient.logout();
102
				ftpClient.disconnect();
103
			}
104
		} catch (IOException e) {
105
			log.error("Failed to logout & disconnect from the FTP server", e);
106
		}
107
	}
108

    
109
	private void initializeQueue() {
110
		queue = new LinkedList<String>();
111
		listDirectoryRecursive(remoteFtpBasePath, "");
112
	}
113

    
114
	private void listDirectoryRecursive(final String parentDir, final String currentDir) {
115
		String dirToList = parentDir;
116
		if (!currentDir.equals("")) {
117
			dirToList += "/" + currentDir;
118
		}
119
		FTPFile[] subFiles;
120
		try {
121
			subFiles = ftpClient.listFiles(dirToList);
122
			if ((subFiles != null) && (subFiles.length > 0)) {
123
				for (FTPFile aFile : subFiles) {
124
					String currentFileName = aFile.getName();
125

    
126
					if (currentFileName.equals(".") || currentFileName.equals("..")) {
127
						// skip parent directory and directory itself
128
						continue;
129
					}
130
					if (aFile.isDirectory()) {
131
						if (isRecursive) {
132
							listDirectoryRecursive(dirToList, currentFileName);
133
						}
134
					} else {
135
						// test the file for extensions compliance and, just in case, add it to the list.
136
						for (String ext : extensionsSet) {
137
							if (currentFileName.endsWith(ext)) {
138
								//incremental mode: let's check the last update date
139
								if(incremental){
140
									Calendar timestamp = aFile.getTimestamp();
141
									DateTime lastModificationDate = new DateTime(timestamp);
142
									if(lastModificationDate.isAfter(fromDate)){
143
										queue.add(dirToList + "/" + currentFileName);
144
										log.debug(currentFileName + " has changed and must be re-collected");
145
									} else {
146
										if (log.isDebugEnabled()) {
147
											log.debug(currentFileName + " has not changed since last collection");
148
										}
149
									}
150
								}
151
								else {
152
									//not incremental: just add it to the queue
153
									queue.add(dirToList + "/" + currentFileName);
154
								}
155
							}
156
						}
157
					}
158
				}
159
			}
160
		} catch (IOException e) {
161
			throw new CollectorServiceRuntimeException("Unable to list FTP remote folder", e);
162
		}
163
	}
164

    
165
	@Override
166
	public boolean hasNext() {
167
		if (queue.isEmpty()) {
168
			disconnectFromFtpServer();
169
			return false;
170
		} else {
171
			return true;
172
		}
173
	}
174

    
175
	@Override
176
	public String next() {
177
		String nextRemotePath = queue.remove();
178
		int nRepeat = 0;
179
		while (nRepeat < MAX_RETRIES) {
180
			try {
181
				OutputStream baos = new ByteArrayOutputStream();
182
				if (!ftpClient.isConnected()) {
183
					connectToFtpServer();
184
				}
185
				ftpClient.retrieveFile(nextRemotePath, baos);
186

    
187
				log.debug(String.format("Collected file from FTP: %s%s", ftpServerAddress, nextRemotePath));
188
				return baos.toString();
189
			} catch (IOException e) {
190
				nRepeat++;
191
				log.warn(String.format("An error occurred [%s] for %s%s, retrying.. [retried %s time(s)]", e.getMessage(), ftpServerAddress, nextRemotePath,
192
						nRepeat));
193
				disconnectFromFtpServer();
194
				try {
195
					Thread.sleep(BACKOFF_MILLIS);
196
				} catch (InterruptedException e1) {
197
					log.error(e1);
198
				}
199
			}
200
		}
201
		throw new CollectorServiceRuntimeException(String.format("Impossible to retrieve FTP file %s after %s retries. Aborting FTP collection.", nextRemotePath, nRepeat));
202
	}
203

    
204
	@Override
205
	public void remove() {
206
		throw new UnsupportedOperationException();
207
	}
208
}
(2-2/3)