Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.sftp;
2

    
3
import java.io.OutputStream;
4
import java.net.URI;
5
import java.net.URISyntaxException;
6
import java.util.*;
7

    
8
import com.jcraft.jsch.*;
9
import eu.dnetlib.data.collector.rmi.CollectorServiceRuntimeException;
10
import org.apache.commons.io.output.ByteArrayOutputStream;
11
import org.apache.commons.lang.StringUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.joda.time.DateTime;
15
import org.joda.time.format.DateTimeFormat;
16
import org.joda.time.format.DateTimeFormatter;
17

    
18
/**
19
 * Created by andrea on 11/01/16.
20
 */
21
public class SftpIterator implements Iterator<String> {
22
    private static final Log log = LogFactory.getLog(SftpIterator.class);
23

    
24
    private static final int MAX_RETRIES = 5;
25
    private static final int DEFAULT_TIMEOUT = 30000;
26
    private static final long BACKOFF_MILLIS = 10000;
27

    
28
    private String baseUrl;
29
    private String sftpURIScheme;
30
    private String sftpServerAddress;
31
    private String remoteSftpBasePath;
32
    private String username;
33
    private String password;
34
    private boolean isRecursive;
35
    private Set<String> extensionsSet;
36
	private boolean incremental;
37

    
38
    private Session sftpSession;
39
    private ChannelSftp sftpChannel;
40

    
41
    private Queue<String> queue;
42

    
43
	private DateTime fromDate = null;
44
	private DateTimeFormatter sftpDateFormatter = DateTimeFormat.forPattern("EEE MMM dd HH:mm:ss 'CET' yyyy");
45
	private DateTimeFormatter simpleDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd");
46

    
47
	public SftpIterator(String baseUrl, String username, String password, boolean isRecursive, Set<String> extensionsSet, String fromDate) {
48
		this.baseUrl = baseUrl;
49
        this.username = username;
50
        this.password = password;
51
        this.isRecursive = isRecursive;
52
        this.extensionsSet = extensionsSet;
53
		this.incremental = StringUtils.isNotBlank(fromDate);
54
		if (incremental) {
55
			//I expect fromDate in the format 'yyyy-MM-dd'. See class eu.dnetlib.msro.workflows.nodes.collect.FindDateRangeForIncrementalHarvestingJobNode .
56
			this.fromDate = DateTime.parse(fromDate, simpleDateTimeFormatter);
57
			log.debug("fromDate string: " + fromDate + " -- parsed: " + this.fromDate.toString());
58
		}
59
		try {
60
            URI sftpServer = new URI(baseUrl);
61
            this.sftpURIScheme = sftpServer.getScheme();
62
            this.sftpServerAddress = sftpServer.getHost();
63
            this.remoteSftpBasePath = sftpServer.getPath();
64
        } catch (URISyntaxException e) {
65
            throw new CollectorServiceRuntimeException("Bad syntax in the URL " + baseUrl);
66
        }
67

    
68
        connectToSftpServer();
69
        initializeQueue();
70
    }
71

    
72
    private void connectToSftpServer() {
73
        JSch jsch = new JSch();
74

    
75
        try {
76
            JSch.setConfig("StrictHostKeyChecking", "no");
77
            sftpSession = jsch.getSession(username, sftpServerAddress);
78
            sftpSession.setPassword(password);
79
            sftpSession.connect();
80

    
81
            Channel channel = sftpSession.openChannel(sftpURIScheme);
82
            channel.connect();
83
            sftpChannel = (ChannelSftp) channel;
84
	        String pwd = sftpChannel.pwd();
85
	        log.debug("PWD from server: " + pwd);
86
	        String fullPath = pwd + remoteSftpBasePath;
87
	        sftpChannel.cd(fullPath);
88
	        log.debug("PWD from server 2 after 'cd " + fullPath + "' : " + sftpChannel.pwd());
89
	        log.info("Connected to SFTP server " + sftpServerAddress);
90
        } catch (JSchException e) {
91
            throw new CollectorServiceRuntimeException("Unable to connect to remote SFTP server.", e);
92
        } catch (SftpException e) {
93
            throw new CollectorServiceRuntimeException("Unable to access the base remote path on the SFTP server.", e);
94
        }
95
    }
96

    
97
    private void disconnectFromSftpServer() {
98
        sftpChannel.exit();
99
        sftpSession.disconnect();
100
    }
101

    
102
    private void initializeQueue() {
103
        queue = new LinkedList<String>();
104
	    log.info(String.format("SFTP collector plugin collecting from %s with recursion = %s, incremental = %s with fromDate=%s", remoteSftpBasePath,
105
			    isRecursive,
106
			    incremental, fromDate));
107
	    listDirectoryRecursive(".", "");
108
    }
109

    
110
    private void listDirectoryRecursive(final String parentDir, final String currentDir) {
111
        String dirToList = parentDir;
112
	    if (StringUtils.isNotBlank(currentDir)) {
113
		    dirToList += "/" + currentDir;
114
        }
115
	    log.debug("PARENT DIR: " + parentDir);
116
	    log.debug("DIR TO LIST: " + dirToList);
117
	    try {
118
            Vector<ChannelSftp.LsEntry> ls = sftpChannel.ls(dirToList);
119
            for (ChannelSftp.LsEntry entry : ls) {
120
                String currentFileName = entry.getFilename();
121
                if (currentFileName.equals(".") || currentFileName.equals("..")) {
122
                    // skip parent directory and directory itself
123
                    continue;
124
                }
125

    
126
                SftpATTRS attrs = entry.getAttrs();
127
                if (attrs.isDir()) {
128
                    if (isRecursive) {
129
                        listDirectoryRecursive(dirToList, currentFileName);
130
                    }
131
                } else {
132
                    // test the file for extensions compliance and, just in case, add it to the list.
133
                    for (String ext : extensionsSet) {
134
                        if (currentFileName.endsWith(ext)) {
135
	                        //test if the file has been changed after the last collection date:
136
	                        if (incremental) {
137
		                        String mtimeStr = attrs.getMtimeString();
138
		                        DateTime dt = DateTime.parse(mtimeStr, sftpDateFormatter);
139
		                        log.debug("Mtime string: " + mtimeStr + " -- parsed: " + dt.toString());
140
		                        if (dt.isAfter(fromDate)) {
141
			                        // queue.add(dirToList + "/" + currentFileName);
142
			                        queue.add(currentFileName);
143
			                        log.debug(currentFileName + " has changed and must be re-collected");
144
		                        } else {
145
			                        if (log.isDebugEnabled()) {
146
				                        log.debug(currentFileName + " has not changed since last collection");
147
			                        }
148
		                        }
149
	                        } else {
150
		                        //if it is not incremental, just add it to the queue
151
		                        //queue.add(dirToList + "/" + currentFileName);
152
		                        queue.add(currentFileName);
153
	                        }
154

    
155
                        }
156
                    }
157
                }
158
            }
159
        } catch (SftpException e) {
160
            throw new CollectorServiceRuntimeException("Cannot list the sftp remote directory", e);
161

    
162
        }
163
    }
164

    
165
    @Override
166
    public boolean hasNext() {
167
        if (queue.isEmpty()) {
168
            disconnectFromSftpServer();
169
            return false;
170
        } else {
171
            return true;
172
        }
173
    }
174

    
175
    @Override
176
    public String next() {
177
        String nextRemotePath = queue.remove();
178
        int nRepeat = 0;
179
	    String fullPathFile = nextRemotePath;
180
	    while (nRepeat < MAX_RETRIES) {
181
            try {
182
                OutputStream baos = new ByteArrayOutputStream();
183
                sftpChannel.get(nextRemotePath, baos);
184
	            if (log.isDebugEnabled()) {
185
		            fullPathFile = sftpChannel.pwd() + "/" + nextRemotePath;
186
		            log.debug(String.format("Collected file from SFTP: %s%s", sftpServerAddress, fullPathFile));
187
	            }
188
	            return baos.toString();
189
            } catch (SftpException e) {
190
                nRepeat++;
191
	            log.warn(String.format("An error occurred [%s] for %s%s, retrying.. [retried %s time(s)]", e.getMessage(), sftpServerAddress, fullPathFile,
192
			            nRepeat));
193
	            // disconnectFromSftpServer();
194
                try {
195
                    Thread.sleep(BACKOFF_MILLIS);
196
                } catch (InterruptedException e1) {
197
                    log.error(e1);
198
                }
199
            }
200
        }
201
	    throw new CollectorServiceRuntimeException(
202
			    String.format("Impossible to retrieve FTP file %s after %s retries. Aborting FTP collection.", fullPathFile, nRepeat));
203
    }
204

    
205
    @Override
206
    public void remove() {
207
        throw new UnsupportedOperationException();
208
    }
209
}
(2-2/3)