Project

General

Profile

1
package eu.dnetlib.dhp.common.java.io;
2

    
3
import java.io.IOException;
4
import java.util.NoSuchElementException;
5
import java.util.regex.Pattern;
6

    
7
import org.apache.hadoop.fs.FileSystem;
8
import org.apache.hadoop.fs.LocatedFileStatus;
9
import org.apache.hadoop.fs.RemoteIterator;
10
import org.apache.hadoop.io.SequenceFile;
11
import org.apache.hadoop.io.SequenceFile.Reader;
12
import org.apache.hadoop.io.Text;
13
import org.apache.hadoop.io.Writable;
14
import org.apache.hadoop.util.ReflectionUtils;
15

    
16
/**
17
 * Iterator that extracts sequence file's consecutive {@link Text} values.
18
 * 
19
 * @author mhorst
20
 */
21
public class SequenceFileTextValueReader implements CloseableIterator<Text> {
22

    
23
	private SequenceFile.Reader sequenceReader;
24

    
25
	private final RemoteIterator<LocatedFileStatus> fileIt;
26

    
27
	private final FileSystem fs;
28

    
29
	/**
30
	 * Ignore file starting with underscore. Such files are also ignored by
31
	 * default by map-reduce jobs.
32
	 */
33
	private final static Pattern WHITELIST_REGEXP = Pattern.compile("^[^_].*");
34

    
35
	private Text toBeReturned;
36

    
37
	//------------------------ CONSTRUCTORS --------------------------
38
	
39
	/**
40
	 * Default constructor.
41
	 * 
42
	 * @param path HDFS path along with associated FileSystem
43
	 * @throws IOException
44
	 */
45
	public SequenceFileTextValueReader(final FileSystemPath path) throws IOException {
46
		this.fs = path.getFileSystem();
47
		if (fs.isDirectory(path.getPath())) {
48
			fileIt = fs.listFiles(path.getPath(), false);
49
			sequenceReader = getNextSequenceReader();
50
		} else {
51
			fileIt = null;
52
			sequenceReader = new Reader(fs.getConf(), SequenceFile.Reader.file(path.getPath()));
53
		}
54
	}
55

    
56
	//------------------------ LOGIC ---------------------------------
57
	
58
	/*
59
	 * (non-Javadoc)
60
	 * 
61
	 * @see java.util.Iterator#hasNext()
62
	 */
63
	@Override
64
	public boolean hasNext() {
65
		// check and provide next when already returned
66
		if (toBeReturned == null) {
67
			toBeReturned = getNext();
68
		}
69
		return toBeReturned != null;
70
	}
71

    
72
	/*
73
	 * (non-Javadoc)
74
	 * 
75
	 * @see java.util.Iterator#next()
76
	 */
77
	@Override
78
	public Text next() {
79
		if (toBeReturned != null) {
80
			// element fetched while executing hasNext()
81
			Text result = toBeReturned;
82
			toBeReturned = null;
83
			return result;
84
		} else {
85
			Text resultCandidate = getNext();
86
			if (resultCandidate!=null) {
87
				return resultCandidate;
88
			} else {
89
				throw new NoSuchElementException();
90
			}
91
		}
92
	}
93

    
94
	/*
95
	 * (non-Javadoc)
96
	 * 
97
	 * @see eu.dnetlib.dhp.exp.iterator.ClosableIterator#close()
98
	 */
99
	@Override
100
	public void close() throws IOException {
101
		if (sequenceReader != null) {
102
			sequenceReader.close();
103
		}
104
	}
105
	
106
	//------------------------ PRIVATE -------------------------------
107
	
108
	private final Reader getNextSequenceReader() throws IOException {
109
		while (fileIt != null && fileIt.hasNext()) {
110
			LocatedFileStatus currentFileStatus = fileIt.next();
111
			if (isValidFile(currentFileStatus)) {
112
				return new Reader(this.fs.getConf(), SequenceFile.Reader.file(currentFileStatus.getPath()));
113
			}
114
		}
115
		// fallback
116
		return null;
117
	}
118

    
119
	/**
120
	 * Checks whether file is valid candidate.
121
	 * 
122
	 * @param fileStatus
123
	 *            file status holding file name
124
	 * @return true when valid, false otherwise
125
	 */
126
	private final boolean isValidFile(LocatedFileStatus fileStatus) {
127
		if (fileStatus.isFile()) {
128
			return WHITELIST_REGEXP.matcher(fileStatus.getPath().getName()).matches();
129
		} else {
130
			return false;
131
		}
132
	}
133

    
134
	/**
135
	 * @return next data package
136
	 */
137
	private Text getNext() {
138
		try {
139
			if (sequenceReader == null) {
140
				return null;
141
			}
142
			Writable key = (Writable) ReflectionUtils.newInstance(sequenceReader.getKeyClass(), fs.getConf());
143
			Writable value = (Writable) ReflectionUtils.newInstance(sequenceReader.getValueClass(), fs.getConf());
144
			if (sequenceReader.next(key, value)) {
145
				return (Text) value;
146
			} else {
147
				sequenceReader.close();
148
				sequenceReader = getNextSequenceReader();
149
				if (sequenceReader != null) {
150
					return getNext();
151
				}
152
			}
153
			// fallback
154
			return null;
155
		} catch (IOException e) {
156
			throw new RuntimeException(e);
157
		}
158
	}
159
}
(7-7/7)