Project

General

Profile

1 45395 michele.ar
package eu.dnetlib.msro.workers.aggregation.collect.plugins.ftp;
2
3
import java.io.IOException;
4
import java.io.OutputStream;
5
import java.net.MalformedURLException;
6
import java.net.URL;
7
import java.util.Iterator;
8
import java.util.LinkedList;
9
import java.util.Queue;
10
import java.util.Set;
11
12
import org.apache.commons.io.output.ByteArrayOutputStream;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.apache.commons.net.ftp.FTPClient;
16
import org.apache.commons.net.ftp.FTPFile;
17
import org.apache.commons.net.ftp.FTPReply;
18
19 45576 michele.ar
import eu.dnetlib.msro.exceptions.CollectorServiceRuntimeException;
20
21 45395 michele.ar
/**
22
 * @author Author: Andrea Mannocci
23
 */
24
public class FtpIterator implements Iterator<String> {
25
26
	private static final Log log = LogFactory.getLog(FtpIterator.class);
27
28
	private static final int MAX_RETRIES = 5;
29
	private static final int DEFAULT_TIMEOUT = 30000;
30
	private static final long BACKOFF_MILLIS = 10000;
31
32
	private FTPClient ftpClient;
33
	private String ftpServerAddress;
34
	private String remoteFtpBasePath;
35
	private String username;
36
	private String password;
37
	private boolean isRecursive;
38
	private Set<String> extensionsSet;
39
40
	private Queue<String> queue;
41
42
	public FtpIterator(final String baseUrl, final String username, final String password, final boolean isRecursive,
43
			final Set<String> extensionsSet) {
44
		this.username = username;
45
		this.password = password;
46
		this.isRecursive = isRecursive;
47
		this.extensionsSet = extensionsSet;
48
		try {
49 45576 michele.ar
			final URL server = new URL(baseUrl);
50
			ftpServerAddress = server.getHost();
51
			remoteFtpBasePath = server.getPath();
52
		} catch (final MalformedURLException e1) {
53 45395 michele.ar
			throw new CollectorServiceRuntimeException("Malformed URL exception " + baseUrl);
54
		}
55
56
		connectToFtpServer();
57
		initializeQueue();
58
	}
59
60
	private void connectToFtpServer() {
61
		ftpClient = new FTPClient();
62
		ftpClient.setDefaultTimeout(DEFAULT_TIMEOUT);
63
		ftpClient.setDataTimeout(DEFAULT_TIMEOUT);
64
		ftpClient.setConnectTimeout(DEFAULT_TIMEOUT);
65
		try {
66
			ftpClient.connect(ftpServerAddress);
67
68
			// try to login
69
			if (!ftpClient.login(username, password)) {
70
				ftpClient.logout();
71
				throw new CollectorServiceRuntimeException("Unable to login to FTP server " + ftpServerAddress);
72
			}
73 45576 michele.ar
			final int reply = ftpClient.getReplyCode();
74 45395 michele.ar
			if (!FTPReply.isPositiveCompletion(reply)) {
75
				ftpClient.disconnect();
76
				throw new CollectorServiceRuntimeException("Unable to connect to FTP server " + ftpServerAddress);
77
			}
78
79
			ftpClient.enterLocalPassiveMode();
80
			log.info("Connected to FTP server " + ftpServerAddress);
81
			log.info(String.format("FTP collecting from %s with recursion = %s", remoteFtpBasePath, isRecursive));
82 45576 michele.ar
		} catch (final IOException e) {
83 45395 michele.ar
			throw new CollectorServiceRuntimeException("Unable to connect to FTP server " + ftpServerAddress);
84
		}
85
	}
86
87
	private void disconnectFromFtpServer() {
88
		try {
89
			if (ftpClient.isConnected()) {
90
				ftpClient.logout();
91
				ftpClient.disconnect();
92
			}
93 45576 michele.ar
		} catch (final IOException e) {
94 45395 michele.ar
			log.error("Failed to logout & disconnect from the FTP server", e);
95
		}
96
	}
97
98
	private void initializeQueue() {
99
		queue = new LinkedList<String>();
100
		listDirectoryRecursive(remoteFtpBasePath, "");
101
	}
102
103
	private void listDirectoryRecursive(final String parentDir, final String currentDir) {
104
		String dirToList = parentDir;
105
		if (!currentDir.equals("")) {
106
			dirToList += "/" + currentDir;
107
		}
108
		FTPFile[] subFiles;
109
		try {
110
			subFiles = ftpClient.listFiles(dirToList);
111
			if ((subFiles != null) && (subFiles.length > 0)) {
112 45576 michele.ar
				for (final FTPFile aFile : subFiles) {
113
					final String currentFileName = aFile.getName();
114 45395 michele.ar
					if (currentFileName.equals(".") || currentFileName.equals("..")) {
115
						// skip parent directory and directory itself
116
						continue;
117
					}
118
					if (aFile.isDirectory()) {
119
						if (isRecursive) {
120
							listDirectoryRecursive(dirToList, currentFileName);
121
						}
122
					} else {
123
						// test the file for extensions compliance and, just in case, add it to the list.
124 45576 michele.ar
						for (final String ext : extensionsSet) {
125 45395 michele.ar
							if (currentFileName.endsWith(ext)) {
126
								queue.add(dirToList + "/" + currentFileName);
127
							}
128
						}
129
					}
130
				}
131
			}
132 45576 michele.ar
		} catch (final IOException e) {
133 45395 michele.ar
			throw new CollectorServiceRuntimeException("Unable to list FTP remote folder", e);
134
		}
135
	}
136
137
	@Override
138
	public boolean hasNext() {
139
		if (queue.isEmpty()) {
140
			disconnectFromFtpServer();
141
			return false;
142
		} else {
143
			return true;
144
		}
145
	}
146
147
	@Override
148
	public String next() {
149 45576 michele.ar
		final String nextRemotePath = queue.remove();
150 45395 michele.ar
		int nRepeat = 0;
151
		while (nRepeat < MAX_RETRIES) {
152
			try {
153 45576 michele.ar
				final OutputStream baos = new ByteArrayOutputStream();
154 45395 michele.ar
				if (!ftpClient.isConnected()) {
155
					connectToFtpServer();
156
				}
157
				ftpClient.retrieveFile(nextRemotePath, baos);
158
159
				log.debug(String.format("Collected file from FTP: %s%s", ftpServerAddress, nextRemotePath));
160
				return baos.toString();
161 45576 michele.ar
			} catch (final IOException e) {
162 45395 michele.ar
				nRepeat++;
163
				log.warn(String.format("An error occurred [%s] for %s%s, retrying.. [retried %s time(s)]", e.getMessage(), ftpServerAddress, nextRemotePath,
164
						nRepeat));
165
				disconnectFromFtpServer();
166
				try {
167
					Thread.sleep(BACKOFF_MILLIS);
168 45576 michele.ar
				} catch (final InterruptedException e1) {
169 45395 michele.ar
					log.error(e1);
170
				}
171
			}
172
		}
173
		throw new CollectorServiceRuntimeException(
174
				String.format("Impossible to retrieve FTP file %s after %s retries. Aborting FTP collection.", nextRemotePath, nRepeat));
175
	}
176
177
	@Override
178
	public void remove() {
179
		throw new UnsupportedOperationException();
180
	}
181
}