Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.sftp;
2

    
3
import java.io.OutputStream;
4
import java.net.URI;
5
import java.net.URISyntaxException;
6
import java.time.Instant;
7
import java.time.LocalDateTime;
8
import java.time.ZoneId;
9
import java.time.format.DateTimeFormatter;
10
import java.util.*;
11

    
12
import com.jcraft.jsch.*;
13
import eu.dnetlib.data.collector.ThreadSafeIterator;
14
import eu.dnetlib.rmi.data.CollectorServiceRuntimeException;
15
import org.apache.commons.io.output.ByteArrayOutputStream;
16
import org.apache.commons.lang3.StringUtils;
17
import org.apache.commons.logging.Log;
18
import org.apache.commons.logging.LogFactory;
19

    
20
/**
21
 * Created by andrea on 11/01/16.
22
 */
23
public class SftpIterator extends ThreadSafeIterator {
24

    
25
	private static final Log log = LogFactory.getLog(SftpIterator.class);
26

    
27
	private static final int MAX_RETRIES = 2;
28
	private static final int DEFAULT_TIMEOUT = 30000;
29
	private static final long BACKOFF_MILLIS = 10000;
30

    
31
	//params for simple authentication mode
32
	private String username;
33
	private String password;
34

    
35
	//params for pubkey authentication mode
36
	private String prvKeyFile;
37
	private String passPhrase;
38
	private String knownHostsFile;
39

    
40
	private String baseUrl;
41
	private String sftpURIScheme;
42
	private String sftpServerAddress;
43
	private String remoteSftpBasePath;
44

    
45
	private boolean isRecursive;
46
	private Set<String> extensionsSet;
47
	private boolean incremental;
48

    
49
	private Session sftpSession;
50
	private ChannelSftp sftpChannel;
51

    
52
	private Queue<String> queue;
53

    
54
	private LocalDateTime fromDate = null;
55
	private DateTimeFormatter simpleDateTimeFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd");
56

    
57
	private void init(final String baseUrl, final boolean isRecursive, final Set<String> extensionsSet, final String fromDate) {
58
		this.baseUrl = baseUrl;
59
		this.isRecursive = isRecursive;
60
		this.extensionsSet = extensionsSet;
61
		this.incremental = StringUtils.isNotBlank(fromDate);
62
		if (incremental) {
63
			//I expect fromDate in the format 'yyyy-MM-dd'. See class eu.dnetlib.msro.workflows.nodes.collect.FindDateRangeForIncrementalHarvestingJobNode .
64
			this.fromDate = LocalDateTime.parse(fromDate, simpleDateTimeFormatter);
65
			log.debug("fromDate string: " + fromDate + " -- parsed: " + this.fromDate.toString());
66
		}
67
		try {
68
			URI sftpServer = new URI(this.baseUrl);
69
			this.sftpURIScheme = sftpServer.getScheme();
70
			this.sftpServerAddress = sftpServer.getHost();
71
			this.remoteSftpBasePath = sftpServer.getPath();
72
		} catch (URISyntaxException e) {
73
			throw new CollectorServiceRuntimeException("Bad syntax in the URL " + baseUrl);
74
		}
75
	}
76

    
77
	public SftpIterator(String baseUrl, String username, String password, boolean isRecursive, Set<String> extensionsSet, String fromDate) {
78
		init(baseUrl, isRecursive, extensionsSet, fromDate);
79
		this.username = username;
80
		this.password = password;
81
		connectToSftpServerSimpleAuth();
82
		initializeQueue();
83
	}
84

    
85
	public SftpIterator(final String baseUrl, final String username, final String prvKeyFilePath, final String passPhrase, final String knownHostsFile, final boolean isRecursive, final Set<String> extensionsSet, final String fromDate) {
86
		init(baseUrl, isRecursive, extensionsSet, fromDate);
87
		this.username = username;
88
		this.passPhrase = passPhrase;
89
		this.prvKeyFile = prvKeyFilePath;
90
		this.knownHostsFile = knownHostsFile;
91
		connectToSftpServerPubKeyAuth();
92
		initializeQueue();
93
	}
94

    
95
	private void connectToSftpServerPubKeyAuth() {
96
		JSch jsch = new JSch();
97
		log.info("Connecting to "+sftpServerAddress+" with PubKey authentication");
98
		log.info("Username "+ username);
99
		log.info("Private key path: "+prvKeyFile);
100
		if(StringUtils.isNotBlank(passPhrase)){
101
			log.info("with Pass phrase");
102
		}
103
		log.info("Known host file path: "+knownHostsFile);
104
		try {
105
			jsch.setKnownHosts(this.knownHostsFile);
106
			jsch.addIdentity(this.prvKeyFile, this.passPhrase);
107
			sftpSession = jsch.getSession(username, sftpServerAddress);
108
			sftpSession.connect();
109
			openChannelOnBasePath();
110
		} catch (JSchException e) {
111
			throw new CollectorServiceRuntimeException("Unable to create a session on remote SFTP server via Public key authentication.", e);
112
		}
113

    
114
	}
115

    
116
	private void connectToSftpServerSimpleAuth() {
117
		JSch jsch = new JSch();
118
		try {
119
			JSch.setConfig("StrictHostKeyChecking", "no");
120
			sftpSession = jsch.getSession(username, sftpServerAddress);
121
			sftpSession.setPassword(password);
122
			sftpSession.connect();
123
			openChannelOnBasePath();
124
		} catch (JSchException e) {
125
			throw new CollectorServiceRuntimeException("Unable to create a session on remote SFTP server via simple authentication.", e);
126
		}
127
	}
128

    
129
	private void openChannelOnBasePath() {
130
		String fullPath = "";
131
		try {
132
			Channel channel = sftpSession.openChannel(sftpURIScheme);
133
			channel.connect();
134
			sftpChannel = (ChannelSftp) channel;
135
			String pwd = sftpChannel.pwd();
136
			log.debug("PWD from server: " + pwd);
137
			fullPath = pwd + remoteSftpBasePath;
138
			sftpChannel.cd(fullPath);
139
			log.debug("PWD from server 2 after 'cd " + fullPath + "' : " + sftpChannel.pwd());
140
			log.info("Connected to SFTP server " + sftpServerAddress);
141
		} catch (JSchException e) {
142
			throw new CollectorServiceRuntimeException("Unable to open/connect SFTP channel.", e);
143
		} catch (SftpException e) {
144
			throw new CollectorServiceRuntimeException("Unable to access the remote path "+fullPath+" on the SFTP server.", e);
145
		}
146
	}
147

    
148
	private void disconnectFromSftpServer() {
149
		sftpChannel.exit();
150
		sftpSession.disconnect();
151
	}
152

    
153
	private void initializeQueue() {
154
		queue = new LinkedList<>();
155
		log.info(String.format("SFTP collector plugin collecting from %s with recursion = %s, incremental = %s with fromDate=%s", remoteSftpBasePath,
156
				isRecursive,
157
				incremental, fromDate));
158
		listDirectoryRecursive(".", "");
159
	}
160

    
161
	private void listDirectoryRecursive(final String parentDir, final String currentDir) {
162
		String dirToList = parentDir;
163
		if (StringUtils.isNotBlank(currentDir)) {
164
			dirToList += "/" + currentDir;
165
		}
166
		log.debug("PARENT DIR: " + parentDir);
167
		log.debug("DIR TO LIST: " + dirToList);
168
		try {
169
			Vector<ChannelSftp.LsEntry> ls = sftpChannel.ls(dirToList);
170
			for (ChannelSftp.LsEntry entry : ls) {
171
				String currentFileName = entry.getFilename();
172
				if (currentFileName.equals(".") || currentFileName.equals("..")) {
173
					// skip parent directory and directory itself
174
					continue;
175
				}
176

    
177
				SftpATTRS attrs = entry.getAttrs();
178
				if (attrs.isDir()) {
179
					if (isRecursive) {
180
						listDirectoryRecursive(dirToList, currentFileName);
181
					}
182
				} else {
183
					// test the file for extensions compliance and, just in case, add it to the list.
184
					for (String ext : extensionsSet) {
185
						if (currentFileName.endsWith(ext)) {
186
							//test if the file has been changed after the last collection date:
187
							if (incremental) {
188
								int mTime = attrs.getMTime();
189

    
190
								//int times are values reduced by the milliseconds, hence we multiply per 1000L
191

    
192
								Instant instant = Instant.ofEpochMilli(mTime * 1000L);
193
								LocalDateTime dt = LocalDateTime.ofInstant(instant, ZoneId.systemDefault());
194
								if (dt.isAfter(fromDate)) {
195
									queue.add(dirToList+"/"+currentFileName);
196
									log.debug(dirToList+"/"+currentFileName + " has changed and must be re-collected");
197
								} else {
198
									if (log.isDebugEnabled()) {
199
										log.debug(dirToList+"/"+currentFileName + " has not changed since last collection");
200
									}
201
								}
202
							} else {
203
								//if it is not incremental, just add it to the queue
204
								queue.add(dirToList+"/"+currentFileName);
205
							}
206

    
207
						}
208
					}
209
				}
210
			}
211
		} catch (SftpException e) {
212
			throw new CollectorServiceRuntimeException("Cannot list the sftp remote directory", e);
213

    
214
		}
215
	}
216

    
217
	@Override
218
	public boolean doHasNext() {
219
		return !queue.isEmpty();
220
	}
221

    
222
	@Override
223
	public String doNext() {
224
		if(queue.isEmpty())
225
			throw new CollectorServiceRuntimeException("Unexpected empty queue in next()");
226
		String nextRemotePath = queue.remove();
227
		int nRepeat = 0;
228
		String fullPathFile = "";
229
		while (nRepeat < MAX_RETRIES) {
230
			try {
231
				OutputStream baos = new ByteArrayOutputStream();
232
				sftpChannel.get(nextRemotePath, baos);
233
				if (log.isDebugEnabled()) {
234
					fullPathFile  = sftpChannel.pwd() + "/" + nextRemotePath;
235
					log.debug(String.format("Collected file from SFTP: %s%s", sftpServerAddress, fullPathFile));
236
				}
237
				if (queue.isEmpty()) {
238
					disconnectFromSftpServer();
239
				}
240
				return baos.toString();
241
			} catch (SftpException e) {
242
				nRepeat++;
243
				log.warn(String.format("An error occurred [%s] for %s%s, retrying.. [retried %s time(s)]", e.getMessage(), sftpServerAddress, fullPathFile,
244
						nRepeat));
245
				e.printStackTrace();
246
				try {
247
					Thread.sleep(BACKOFF_MILLIS);
248
				} catch (InterruptedException e1) {
249
					log.error(e1);
250
				}
251
			}
252
		}
253
		throw new CollectorServiceRuntimeException(
254
				String.format("Impossible to retrieve FTP file %s after %s retries. Aborting FTP collection.", fullPathFile, nRepeat));
255
	}
256

    
257
	@Override
258
	public void remove() {
259
		throw new UnsupportedOperationException();
260
	}
261
}
(3-3/5)