Project

General

Profile

1
package eu.dnetlib.dhp.common.java.io;
2

    
3
import java.io.IOException;
4
import java.util.NoSuchElementException;
5
import java.util.regex.Pattern;
6

    
7
import org.apache.avro.Schema;
8
import org.apache.avro.file.DataFileReader;
9
import org.apache.avro.specific.SpecificDatumReader;
10
import org.apache.hadoop.fs.AvroFSInput;
11
import org.apache.hadoop.fs.FSDataInputStream;
12
import org.apache.hadoop.fs.LocatedFileStatus;
13
import org.apache.hadoop.fs.RemoteIterator;
14

    
15

    
16

    
17
/**
18
 * An abstraction over data store format which allows
19
 * iterating over records stored in the data store. 
20
 * It handles the standard case of a data store that is a directory containing 
21
 * many Avro files (but it can also read records from a single file). 
22
 * 
23
 * @author mhorst
24
 * @author Mateusz Kobos
25
 */
26
class AvroDataStoreReader<T> implements CloseableIterator<T> {
27

    
28
	private DataFileReader<T> currentReader;
29
	private RemoteIterator<LocatedFileStatus> fileIterator;
30
	private final FileSystemPath path;
31
	private final Schema readerSchema;
32

    
33
	/**
34
	 * Ignore file starting with underscore. Such files are also ignored by
35
	 * default by map-reduce jobs.
36
	 */
37
	private final Pattern whitelistPattern = Pattern.compile("^(?!_).*");
38

    
39
	/**
40
	 * Here the schema used for reading the data store is set to be the same 
41
	 * as the one that was used to write it.
42
	 */
43
	public AvroDataStoreReader(final FileSystemPath path)
44
			throws IOException {
45
		this(path, null);
46
	}
47
	
48
	/**
49
	 * @param path path to the data store to be read
50
	 * @param readerSchema the schema onto which the read data store will 
51
	 * 	be projected
52
	 */
53
	public AvroDataStoreReader(final FileSystemPath path, Schema readerSchema)
54
			throws IOException {
55
		this.path = path;
56
		this.readerSchema = readerSchema;
57
		fileIterator = path.getFileSystem().listFiles(path.getPath(), false);
58
		currentReader = getNextNonemptyReader();
59
	}
60
	
61
	private DataFileReader<T> getNextNonemptyReader() throws IOException {
62
		while (fileIterator != null && fileIterator.hasNext()) {
63
			LocatedFileStatus currentFileStatus = fileIterator.next();
64
			if (isValidFile(currentFileStatus)) {
65
				FileSystemPath currPath = new FileSystemPath(
66
						path.getFileSystem(), currentFileStatus.getPath());
67
				DataFileReader<T> reader = 
68
						getSingleFileReader(currPath, readerSchema);
69
				/** Check if the file contains at least one record */
70
				if(reader.hasNext()){
71
					return reader;
72
				} else {
73
					reader.close();
74
				}
75
			}
76
		}
77
		/** fallback */
78
		return null;
79
	}
80
	
81
	/**
82
	 * Get a reader for the specified Avro file. A utility function. 
83
	 * @param path path to the existing file
84
	 * @param readerSchema optional reader schema. If you want to use the
85
	 *		default option of using writer schema as the reader schema, pass the
86
	 *		{@code null} value. 
87
	 * @throws IOException
88
	 */
89
	private static <T> DataFileReader<T> getSingleFileReader(
90
			FileSystemPath path, Schema readerSchema) throws IOException{
91
		try{
92
		SpecificDatumReader<T> datumReader = new SpecificDatumReader<T>();
93
		if(readerSchema != null){
94
			datumReader.setExpected(readerSchema);
95
		}
96
		long len = path.getFileSystem().getFileStatus(path.getPath()).getLen();
97
		FSDataInputStream inputStream = path.getFileSystem().open(path.getPath());
98
		return new DataFileReader<T>(
99
				new AvroFSInput(inputStream, len), datumReader);
100
		} catch (IOException ex){
101
			throw new IOException("Problem with file \""+
102
					path.getPath().toString()+"\": "+ex.getMessage(), ex);
103
		}
104
	}
105

    
106
	/**
107
	 * Checks whether file is valid
108
	 * 
109
	 * @param fileStatus
110
	 * @return true when valid, false otherwise
111
	 */
112
	private boolean isValidFile(LocatedFileStatus fileStatus) {
113
		if (fileStatus.isFile()) {
114
			return whitelistPattern.matcher(
115
					fileStatus.getPath().getName()).matches();
116
		}
117
		/** fallback */
118
		return false;
119
	}
120

    
121
	@Override
122
	public boolean hasNext() {
123
		return currentReader != null;
124
	}
125

    
126
	@Override
127
	public T next(){
128
		if(currentReader == null){
129
			throw new NoSuchElementException();
130
		}
131
		T obj = currentReader.next();
132
		if(!currentReader.hasNext()){
133
			try{
134
				currentReader.close();
135
				currentReader = getNextNonemptyReader();
136
			} catch(IOException ex){
137
				throw new RuntimeException(ex);
138
			}
139
		}
140
		return obj;
141
	}
142

    
143
	@Override
144
	public void remove() {
145
		throw new UnsupportedOperationException();
146
	}
147

    
148
	@Override
149
	public void close() throws IOException {
150
		if(currentReader != null){
151
			currentReader.close();
152
			currentReader = null;
153
		}
154
		fileIterator = null;
155
	}
156
}
(1-1/7)