Project

General

Profile

1
package eu.dnetlib.data.collector.plugins.sftp;
2

    
3
import java.io.OutputStream;
4
import java.net.URI;
5
import java.net.URISyntaxException;
6
import java.util.*;
7

    
8
import com.jcraft.jsch.*;
9
import eu.dnetlib.data.collector.rmi.CollectorServiceRuntimeException;
10
import org.apache.commons.io.output.ByteArrayOutputStream;
11
import org.apache.commons.lang3.StringUtils;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.joda.time.DateTime;
15
import org.joda.time.format.DateTimeFormat;
16
import org.joda.time.format.DateTimeFormatter;
17

    
18
/**
19
 * Created by andrea on 11/01/16.
20
 */
21
public class SftpIterator implements Iterator<String> {
22
    private static final Log log = LogFactory.getLog(SftpIterator.class);
23

    
24
    private static final int MAX_RETRIES = 5;
25
    private static final int DEFAULT_TIMEOUT = 30000;
26
    private static final long BACKOFF_MILLIS = 10000;
27

    
28
    private String baseUrl;
29
    private String sftpURIScheme;
30
    private String sftpServerAddress;
31
    private String remoteSftpBasePath;
32
    private String username;
33
    private String password;
34
    private boolean isRecursive;
35
    private Set<String> extensionsSet;
36
	private boolean incremental;
37

    
38
    private Session sftpSession;
39
    private ChannelSftp sftpChannel;
40

    
41
    private Queue<String> queue;
42

    
43
	private DateTime fromDate = null;
44
	private DateTimeFormatter simpleDateTimeFormatter = DateTimeFormat.forPattern("yyyy-MM-dd");
45

    
46
	private static String EMPTY_RECORD = "<record/>";
47

    
48
	public SftpIterator(String baseUrl, String username, String password, boolean isRecursive, Set<String> extensionsSet, String fromDate) {
49
		this.baseUrl = baseUrl;
50
        this.username = username;
51
        this.password = password;
52
        this.isRecursive = isRecursive;
53
        this.extensionsSet = extensionsSet;
54
		this.incremental = StringUtils.isNotBlank(fromDate);
55
		if (incremental) {
56
			//I expect fromDate in the format 'yyyy-MM-dd'. See class eu.dnetlib.msro.workflows.nodes.collect.FindDateRangeForIncrementalHarvestingJobNode .
57
			this.fromDate = DateTime.parse(fromDate, simpleDateTimeFormatter);
58
			log.debug("fromDate string: " + fromDate + " -- parsed: " + this.fromDate.toString());
59
		}
60
		try {
61
            URI sftpServer = new URI(baseUrl);
62
            this.sftpURIScheme = sftpServer.getScheme();
63
            this.sftpServerAddress = sftpServer.getHost();
64
            this.remoteSftpBasePath = sftpServer.getPath();
65
        } catch (URISyntaxException e) {
66
            throw new CollectorServiceRuntimeException("Bad syntax in the URL " + baseUrl);
67
        }
68

    
69
        connectToSftpServer();
70
        initializeQueue();
71
    }
72

    
73
    private void connectToSftpServer() {
74
        JSch jsch = new JSch();
75

    
76
        try {
77
            JSch.setConfig("StrictHostKeyChecking", "no");
78
            sftpSession = jsch.getSession(username, sftpServerAddress);
79
            sftpSession.setPassword(password);
80
            sftpSession.connect();
81

    
82
            Channel channel = sftpSession.openChannel(sftpURIScheme);
83
            channel.connect();
84
            sftpChannel = (ChannelSftp) channel;
85
	        String pwd = sftpChannel.pwd();
86
	        log.debug("PWD from server: " + pwd);
87
	        String fullPath = pwd + remoteSftpBasePath;
88
	        sftpChannel.cd(fullPath);
89
	        log.debug("PWD from server 2 after 'cd " + fullPath + "' : " + sftpChannel.pwd());
90
	        log.info("Connected to SFTP server " + sftpServerAddress);
91
        } catch (JSchException e) {
92
            throw new CollectorServiceRuntimeException("Unable to connect to remote SFTP server.", e);
93
        } catch (SftpException e) {
94
            throw new CollectorServiceRuntimeException("Unable to access the base remote path on the SFTP server.", e);
95
        }
96
    }
97

    
98
    private void disconnectFromSftpServer() {
99
        sftpChannel.exit();
100
        sftpSession.disconnect();
101
    }
102

    
103
    private void initializeQueue() {
104
        queue = new LinkedList<String>();
105
	    log.info(String.format("SFTP collector plugin collecting from %s with recursion = %s, incremental = %s with fromDate=%s", remoteSftpBasePath,
106
			    isRecursive,
107
			    incremental, fromDate));
108
	    listDirectoryRecursive(".", "");
109
    }
110

    
111
    private void listDirectoryRecursive(final String parentDir, final String currentDir) {
112
        String dirToList = parentDir;
113
        if (StringUtils.isNotBlank(currentDir)) {
114
            dirToList += "/" + currentDir;
115
        }
116
        log.debug("PARENT DIR: " + parentDir);
117
        log.debug("DIR TO LIST: " + dirToList);
118
        try {
119
            Vector<ChannelSftp.LsEntry> ls = sftpChannel.ls(dirToList);
120
            for (ChannelSftp.LsEntry entry : ls) {
121
                String currentFileName = entry.getFilename();
122
                if (currentFileName.equals(".") || currentFileName.equals("..")) {
123
                    // skip parent directory and directory itself
124
                    continue;
125
                }
126
                SftpATTRS attrs = entry.getAttrs();
127
                if (attrs.isDir()) {
128
                    if (isRecursive) {
129
                        listDirectoryRecursive(dirToList, currentFileName);
130
                    }
131
                } else {
132
                    // test the file for extensions compliance and, just in case, add it to the list.
133
                    for (String ext : extensionsSet) {
134
                        if (currentFileName.endsWith(ext)) {
135
                            if(dirToList.length() > 2){
136
                                currentFileName = dirToList + "/" + currentFileName;
137
                            }
138
                            //test if the file has been changed after the last collection date:
139
                            if (incremental) {
140
                                int mTime = attrs.getMTime();
141
                                //int times are values reduced by the milliseconds, hence we multiply per 1000L
142
                                DateTime dt = new DateTime(mTime * 1000L);
143
                                if (dt.isAfter(fromDate)) {
144
                                    queue.add(currentFileName);
145
                                    log.debug(currentFileName + " has changed and must be re-collected");
146
                                } else {
147
                                    if (log.isDebugEnabled()) {
148
                                        log.debug(currentFileName + " has not changed since last collection");
149
                                    }
150
                                }
151
                            } else {
152
                                //if it is not incremental, just add it to the queue
153
                                queue.add(currentFileName);
154
                            }
155
                        }
156
                    }
157
                }
158
            }
159
        } catch (SftpException e) {
160
            throw new CollectorServiceRuntimeException("Cannot list the sftp remote directory", e);
161
        }
162
    }
163

    
164
    @Override
165
    public boolean hasNext() {
166
        if (queue.isEmpty()) {
167
            disconnectFromSftpServer();
168
            return false;
169
        } else {
170
            return true;
171
        }
172
    }
173

    
174
    @Override
175
    public String next() {
176
        String nextRemotePath = queue.remove();
177
        int nRepeat = 0;
178
	    String fullPathFile = nextRemotePath;
179
	    while (nRepeat < MAX_RETRIES) {
180
            try {
181
                OutputStream baos = new ByteArrayOutputStream();
182
                sftpChannel.get(nextRemotePath, baos);
183
	            if (log.isDebugEnabled()) {
184
		            fullPathFile = sftpChannel.pwd() + "/" + nextRemotePath;
185
		            log.debug(String.format("Collected file from SFTP: %s%s", sftpServerAddress, fullPathFile));
186
	            }
187
	            String doc = baos.toString();
188
	            if(StringUtils.isNotBlank(doc)) return doc;
189
	            else return EMPTY_RECORD;
190
            } catch (SftpException e) {
191
                nRepeat++;
192
	            log.warn(String.format("An error occurred [%s] for %s%s, retrying.. [retried %s time(s)]", e.getMessage(), sftpServerAddress, fullPathFile,
193
			            nRepeat));
194
	            // disconnectFromSftpServer();
195
                try {
196
                    Thread.sleep(BACKOFF_MILLIS);
197
                } catch (InterruptedException e1) {
198
                    log.error(e1);
199
                }
200
            }
201
        }
202
	    throw new CollectorServiceRuntimeException(
203
			    String.format("Impossible to retrieve FTP file %s after %s retries. Aborting FTP collection.", fullPathFile, nRepeat));
204
    }
205

    
206
    @Override
207
    public void remove() {
208
        throw new UnsupportedOperationException();
209
    }
210
}
(2-2/3)