Project

General

Profile

1
package eu.dnetlib.msro.workers.aggregation.collect.plugins.ftp;
2

    
3
import java.io.IOException;
4
import java.io.OutputStream;
5
import java.net.MalformedURLException;
6
import java.net.URL;
7
import java.util.Iterator;
8
import java.util.LinkedList;
9
import java.util.Queue;
10
import java.util.Set;
11

    
12
import org.apache.commons.io.output.ByteArrayOutputStream;
13
import org.apache.commons.logging.Log;
14
import org.apache.commons.logging.LogFactory;
15
import org.apache.commons.net.ftp.FTPClient;
16
import org.apache.commons.net.ftp.FTPFile;
17
import org.apache.commons.net.ftp.FTPReply;
18

    
19
import eu.dnetlib.msro.exceptions.CollectorServiceRuntimeException;
20

    
21
/**
22
 * @author Author: Andrea Mannocci
23
 */
24
public class FtpIterator implements Iterator<String> {
25

    
26
	private static final Log log = LogFactory.getLog(FtpIterator.class);
27

    
28
	private static final int MAX_RETRIES = 5;
29
	private static final int DEFAULT_TIMEOUT = 30000;
30
	private static final long BACKOFF_MILLIS = 10000;
31

    
32
	private FTPClient ftpClient;
33
	private String ftpServerAddress;
34
	private String remoteFtpBasePath;
35
	private String username;
36
	private String password;
37
	private boolean isRecursive;
38
	private Set<String> extensionsSet;
39

    
40
	private Queue<String> queue;
41

    
42
	public FtpIterator(final String baseUrl, final String username, final String password, final boolean isRecursive,
43
			final Set<String> extensionsSet) {
44
		this.username = username;
45
		this.password = password;
46
		this.isRecursive = isRecursive;
47
		this.extensionsSet = extensionsSet;
48
		try {
49
			final URL server = new URL(baseUrl);
50
			ftpServerAddress = server.getHost();
51
			remoteFtpBasePath = server.getPath();
52
		} catch (final MalformedURLException e1) {
53
			throw new CollectorServiceRuntimeException("Malformed URL exception " + baseUrl);
54
		}
55

    
56
		connectToFtpServer();
57
		initializeQueue();
58
	}
59

    
60
	private void connectToFtpServer() {
61
		ftpClient = new FTPClient();
62
		ftpClient.setDefaultTimeout(DEFAULT_TIMEOUT);
63
		ftpClient.setDataTimeout(DEFAULT_TIMEOUT);
64
		ftpClient.setConnectTimeout(DEFAULT_TIMEOUT);
65
		try {
66
			ftpClient.connect(ftpServerAddress);
67

    
68
			// try to login
69
			if (!ftpClient.login(username, password)) {
70
				ftpClient.logout();
71
				throw new CollectorServiceRuntimeException("Unable to login to FTP server " + ftpServerAddress);
72
			}
73
			final int reply = ftpClient.getReplyCode();
74
			if (!FTPReply.isPositiveCompletion(reply)) {
75
				ftpClient.disconnect();
76
				throw new CollectorServiceRuntimeException("Unable to connect to FTP server " + ftpServerAddress);
77
			}
78

    
79
			ftpClient.enterLocalPassiveMode();
80
			log.info("Connected to FTP server " + ftpServerAddress);
81
			log.info(String.format("FTP collecting from %s with recursion = %s", remoteFtpBasePath, isRecursive));
82
		} catch (final IOException e) {
83
			throw new CollectorServiceRuntimeException("Unable to connect to FTP server " + ftpServerAddress);
84
		}
85
	}
86

    
87
	private void disconnectFromFtpServer() {
88
		try {
89
			if (ftpClient.isConnected()) {
90
				ftpClient.logout();
91
				ftpClient.disconnect();
92
			}
93
		} catch (final IOException e) {
94
			log.error("Failed to logout & disconnect from the FTP server", e);
95
		}
96
	}
97

    
98
	private void initializeQueue() {
99
		queue = new LinkedList<String>();
100
		listDirectoryRecursive(remoteFtpBasePath, "");
101
	}
102

    
103
	private void listDirectoryRecursive(final String parentDir, final String currentDir) {
104
		String dirToList = parentDir;
105
		if (!currentDir.equals("")) {
106
			dirToList += "/" + currentDir;
107
		}
108
		FTPFile[] subFiles;
109
		try {
110
			subFiles = ftpClient.listFiles(dirToList);
111
			if ((subFiles != null) && (subFiles.length > 0)) {
112
				for (final FTPFile aFile : subFiles) {
113
					final String currentFileName = aFile.getName();
114
					if (currentFileName.equals(".") || currentFileName.equals("..")) {
115
						// skip parent directory and directory itself
116
						continue;
117
					}
118
					if (aFile.isDirectory()) {
119
						if (isRecursive) {
120
							listDirectoryRecursive(dirToList, currentFileName);
121
						}
122
					} else {
123
						// test the file for extensions compliance and, just in case, add it to the list.
124
						for (final String ext : extensionsSet) {
125
							if (currentFileName.endsWith(ext)) {
126
								queue.add(dirToList + "/" + currentFileName);
127
							}
128
						}
129
					}
130
				}
131
			}
132
		} catch (final IOException e) {
133
			throw new CollectorServiceRuntimeException("Unable to list FTP remote folder", e);
134
		}
135
	}
136

    
137
	@Override
138
	public boolean hasNext() {
139
		if (queue.isEmpty()) {
140
			disconnectFromFtpServer();
141
			return false;
142
		} else {
143
			return true;
144
		}
145
	}
146

    
147
	@Override
148
	public String next() {
149
		final String nextRemotePath = queue.remove();
150
		int nRepeat = 0;
151
		while (nRepeat < MAX_RETRIES) {
152
			try {
153
				final OutputStream baos = new ByteArrayOutputStream();
154
				if (!ftpClient.isConnected()) {
155
					connectToFtpServer();
156
				}
157
				ftpClient.retrieveFile(nextRemotePath, baos);
158

    
159
				log.debug(String.format("Collected file from FTP: %s%s", ftpServerAddress, nextRemotePath));
160
				return baos.toString();
161
			} catch (final IOException e) {
162
				nRepeat++;
163
				log.warn(String.format("An error occurred [%s] for %s%s, retrying.. [retried %s time(s)]", e.getMessage(), ftpServerAddress, nextRemotePath,
164
						nRepeat));
165
				disconnectFromFtpServer();
166
				try {
167
					Thread.sleep(BACKOFF_MILLIS);
168
				} catch (final InterruptedException e1) {
169
					log.error(e1);
170
				}
171
			}
172
		}
173
		throw new CollectorServiceRuntimeException(
174
				String.format("Impossible to retrieve FTP file %s after %s retries. Aborting FTP collection.", nextRemotePath, nRepeat));
175
	}
176

    
177
	@Override
178
	public void remove() {
179
		throw new UnsupportedOperationException();
180
	}
181
}
(2-2/2)