Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.sftp;
2

    
3
import java.io.OutputStream;
4
import java.net.URI;
5
import java.net.URISyntaxException;
6
import java.time.Instant;
7
import java.time.LocalDate;
8
import java.time.LocalDateTime;
9
import java.time.ZoneId;
10
import java.time.format.DateTimeFormatter;
11
import java.time.format.DateTimeParseException;
12
import java.util.LinkedList;
13
import java.util.Queue;
14
import java.util.Set;
15
import java.util.Vector;
16

    
17
import com.jcraft.jsch.*;
18
import eu.dnetlib.data.collector.ThreadSafeIterator;
19
import eu.dnetlib.rmi.data.CollectorServiceRuntimeException;
20
import org.apache.commons.io.output.ByteArrayOutputStream;
21
import org.apache.commons.lang3.StringUtils;
22
import org.apache.commons.logging.Log;
23
import org.apache.commons.logging.LogFactory;
24

    
25
/**
26
 * Created by andrea on 11/01/16.
27
 */
28
public class SftpIterator extends ThreadSafeIterator {
29

    
30
	private static final Log log = LogFactory.getLog(SftpIterator.class);
31

    
32
	private static final int MAX_RETRIES = 2;
33
	private static final int DEFAULT_TIMEOUT = 30000;
34
	private static final long BACKOFF_MILLIS = 10000;
35

    
36
	//params for simple authentication mode
37
	private String username;
38
	private String password;
39

    
40
	//params for pubkey authentication mode
41
	private String prvKeyFile;
42
	private String passPhrase;
43
	private String knownHostsFile;
44

    
45
	private String baseUrl;
46
	private String sftpURIScheme;
47
	private String sftpServerAddress;
48
	private String remoteSftpBasePath;
49

    
50
	private boolean isRecursive;
51
	private Set<String> extensionsSet;
52
	private boolean incremental;
53

    
54
	private Session sftpSession;
55
	private ChannelSftp sftpChannel;
56

    
57
	private Queue<String> queue;
58

    
59
	private LocalDateTime fromDate = null;
60
	private DateTimeFormatter simpleDateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
61
	private DateTimeFormatter fullDateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ss'Z'");
62

    
63
	private void init(final String baseUrl, final boolean isRecursive, final Set<String> extensionsSet, final String fromDate) {
64
		this.baseUrl = baseUrl;
65
		this.isRecursive = isRecursive;
66
		this.extensionsSet = extensionsSet;
67
		this.incremental = StringUtils.isNotBlank(fromDate);
68
		if (incremental) {
69
			try {
70
				this.fromDate = LocalDateTime.parse(fromDate, fullDateTimeFormatter);
71
			}catch(DateTimeParseException dtpe){
72
				//try with the simple formatter
73
				this.fromDate = LocalDateTime.from(LocalDate.parse(fromDate, simpleDateTimeFormatter).atStartOfDay());
74
			}
75
			log.debug("fromDate string: " + fromDate + " -- parsed: " + this.fromDate.toString());
76
		}
77
		try {
78
			URI sftpServer = new URI(this.baseUrl);
79
			this.sftpURIScheme = sftpServer.getScheme();
80
			this.sftpServerAddress = sftpServer.getHost();
81
			this.remoteSftpBasePath = sftpServer.getPath();
82
		} catch (URISyntaxException e) {
83
			throw new CollectorServiceRuntimeException("Bad syntax in the URL " + baseUrl);
84
		}
85
	}
86

    
87
	public SftpIterator(String baseUrl, String username, String password, boolean isRecursive, Set<String> extensionsSet, String fromDate) {
88
		init(baseUrl, isRecursive, extensionsSet, fromDate);
89
		this.username = username;
90
		this.password = password;
91
		connectToSftpServerSimpleAuth();
92
		initializeQueue();
93
	}
94

    
95
	public SftpIterator(final String baseUrl, final String username, final String prvKeyFilePath, final String passPhrase, final String knownHostsFile, final boolean isRecursive, final Set<String> extensionsSet, final String fromDate) {
96
		init(baseUrl, isRecursive, extensionsSet, fromDate);
97
		this.username = username;
98
		this.passPhrase = passPhrase;
99
		this.prvKeyFile = prvKeyFilePath;
100
		this.knownHostsFile = knownHostsFile;
101
		connectToSftpServerPubKeyAuth();
102
		initializeQueue();
103
	}
104

    
105
	private void connectToSftpServerPubKeyAuth() {
106
		JSch jsch = new JSch();
107
		log.info("Connecting to "+sftpServerAddress+" with PubKey authentication");
108
		log.info("Username "+ username);
109
		log.info("Private key path: "+prvKeyFile);
110
		if(StringUtils.isNotBlank(passPhrase)){
111
			log.info("with Pass phrase");
112
		}
113
		log.info("Known host file path: "+knownHostsFile);
114
		try {
115
			jsch.setKnownHosts(this.knownHostsFile);
116
			jsch.addIdentity(this.prvKeyFile, this.passPhrase);
117
			sftpSession = jsch.getSession(username, sftpServerAddress);
118
			sftpSession.connect();
119
			openChannelOnBasePath();
120
		} catch (JSchException e) {
121
			throw new CollectorServiceRuntimeException("Unable to create a session on remote SFTP server via Public key authentication.", e);
122
		}
123

    
124
	}
125

    
126
	private void connectToSftpServerSimpleAuth() {
127
		JSch jsch = new JSch();
128
		try {
129
			JSch.setConfig("StrictHostKeyChecking", "no");
130
			sftpSession = jsch.getSession(username, sftpServerAddress);
131
			sftpSession.setPassword(password);
132
			sftpSession.connect();
133
			openChannelOnBasePath();
134
		} catch (JSchException e) {
135
			throw new CollectorServiceRuntimeException("Unable to create a session on remote SFTP server via simple authentication.", e);
136
		}
137
	}
138

    
139
	private void openChannelOnBasePath() {
140
		String fullPath = "";
141
		try {
142
			Channel channel = sftpSession.openChannel(sftpURIScheme);
143
			channel.connect();
144
			sftpChannel = (ChannelSftp) channel;
145
			String pwd = sftpChannel.pwd();
146
			log.debug("PWD from server: " + pwd);
147
			fullPath = pwd + remoteSftpBasePath;
148
			sftpChannel.cd(fullPath);
149
			log.debug("PWD from server 2 after 'cd " + fullPath + "' : " + sftpChannel.pwd());
150
			log.info("Connected to SFTP server " + sftpServerAddress);
151
		} catch (JSchException e) {
152
			throw new CollectorServiceRuntimeException("Unable to open/connect SFTP channel.", e);
153
		} catch (SftpException e) {
154
			throw new CollectorServiceRuntimeException("Unable to access the remote path "+fullPath+" on the SFTP server.", e);
155
		}
156
	}
157

    
158
	private void disconnectFromSftpServer() {
159
		sftpChannel.exit();
160
		sftpSession.disconnect();
161
	}
162

    
163
	private void initializeQueue() {
164
		queue = new LinkedList<>();
165
		log.info(String.format("SFTP collector plugin collecting from %s with recursion = %s, incremental = %s with fromDate=%s", remoteSftpBasePath,
166
				isRecursive,
167
				incremental, fromDate));
168
		listDirectoryRecursive(".", "");
169
	}
170

    
171
	private void listDirectoryRecursive(final String parentDir, final String currentDir) {
172
		String dirToList = parentDir;
173
		if (StringUtils.isNotBlank(currentDir)) {
174
			dirToList += "/" + currentDir;
175
		}
176
		log.debug("PARENT DIR: " + parentDir);
177
		log.debug("DIR TO LIST: " + dirToList);
178
		try {
179
			Vector<ChannelSftp.LsEntry> ls = sftpChannel.ls(dirToList);
180
			for (ChannelSftp.LsEntry entry : ls) {
181
				String currentFileName = entry.getFilename();
182
				if (currentFileName.equals(".") || currentFileName.equals("..")) {
183
					// skip parent directory and directory itself
184
					continue;
185
				}
186

    
187
				SftpATTRS attrs = entry.getAttrs();
188
				if (attrs.isDir()) {
189
					if (isRecursive) {
190
						listDirectoryRecursive(dirToList, currentFileName);
191
					}
192
				} else {
193
					// test the file for extensions compliance and, just in case, add it to the list.
194
					for (String ext : extensionsSet) {
195
						if (currentFileName.endsWith(ext)) {
196
							//test if the file has been changed after the last collection date:
197
							if (incremental) {
198
								int mTime = attrs.getMTime();
199

    
200
								//int times are values reduced by the milliseconds, hence we multiply per 1000L
201

    
202
								Instant instant = Instant.ofEpochMilli(mTime * 1000L);
203
								LocalDateTime dt = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
204
								if (dt.isAfter(fromDate)) {
205
									queue.add(dirToList+"/"+currentFileName);
206
									log.debug(dirToList+"/"+currentFileName + " has changed and must be re-collected");
207
								} else {
208
									if (log.isDebugEnabled()) {
209
										log.debug(dirToList+"/"+currentFileName + " has not changed since last collection");
210
									}
211
								}
212
							} else {
213
								//if it is not incremental, just add it to the queue
214
								queue.add(dirToList+"/"+currentFileName);
215
							}
216

    
217
						}
218
					}
219
				}
220
			}
221
		} catch (SftpException e) {
222
			throw new CollectorServiceRuntimeException("Cannot list the sftp remote directory", e);
223

    
224
		}
225
	}
226

    
227
	@Override
228
	public boolean doHasNext() {
229
		return !queue.isEmpty();
230
	}
231

    
232
	@Override
233
	public String doNext() {
234
		if(queue.isEmpty())
235
			throw new CollectorServiceRuntimeException("Unexpected empty queue in next()");
236
		String nextRemotePath = queue.remove();
237
		int nRepeat = 0;
238
		String fullPathFile = "";
239
		while (nRepeat < MAX_RETRIES) {
240
			try {
241
				OutputStream baos = new ByteArrayOutputStream();
242
				sftpChannel.get(nextRemotePath, baos);
243
				if (log.isDebugEnabled()) {
244
					fullPathFile  = sftpChannel.pwd() + "/" + nextRemotePath;
245
					log.debug(String.format("Collected file from SFTP: %s%s", sftpServerAddress, fullPathFile));
246
				}
247
				if (queue.isEmpty()) {
248
					disconnectFromSftpServer();
249
				}
250
				return baos.toString();
251
			} catch (SftpException e) {
252
				nRepeat++;
253
				log.warn(String.format("An error occurred [%s] for %s%s, retrying.. [retried %s time(s)]", e.getMessage(), sftpServerAddress, fullPathFile,
254
						nRepeat));
255
				e.printStackTrace();
256
				try {
257
					Thread.sleep(BACKOFF_MILLIS);
258
				} catch (InterruptedException e1) {
259
					log.error(e1);
260
				}
261
			}
262
		}
263
		throw new CollectorServiceRuntimeException(
264
				String.format("Impossible to retrieve FTP file %s after %s retries. Aborting FTP collection.", fullPathFile, nRepeat));
265
	}
266

    
267
	@Override
268
	public void remove() {
269
		throw new UnsupportedOperationException();
270
	}
271
}
(3-3/5)