Project

General

Profile

1
package eu.dnetlib.dhp.common.java.io;
2

    
3
import java.io.IOException;
4
import java.util.ArrayList;
5
import java.util.List;
6

    
7
import org.apache.avro.Schema;
8
import org.apache.avro.file.DataFileWriter;
9
import org.apache.avro.generic.GenericContainer;
10
import org.apache.avro.io.DatumWriter;
11
import org.apache.avro.specific.SpecificDatumWriter;
12

    
13

    
14
/**
15
 * Utility for accessing to Avro-based data stores stored in file system
16
 * @author Mateusz Kobos
17
 *
18
 */
19
public final class DataStore {
20
	
21
	private final static String singleDataStoreFileName = "content.avro";
22
	
23
	private static final int FILE_NO_PADDING_LENGTH = 7;
24
	
25
	private DataStore(){}
26

    
27
	/**
28
	 * Create a new data store directory with single file and return writer that allows 
29
	 * adding new records
30
	 * @param path path to a directory to be created
31
	 * @param schema schema of the records to be stored in the file
32
	 * @return 
33
	 * @throws IOException
34
	 */
35
	public static <T> DataFileWriter<T> create(
36
			FileSystemPath path, Schema schema) throws IOException{
37
		return create(path, schema, singleDataStoreFileName);
38
	}
39
	
40

    
41
	/**
42
     * Create a new data store directory and return writer that allows 
43
     * adding new records
44
     * @param path path to a directory to be created
45
     * @param schema schema of the records to be stored in the file
46
     * @param dataStoreFileName datastore file name
47
     * @return 
48
     * @throws IOException
49
     */
50
    public static <T> DataFileWriter<T> create(
51
            FileSystemPath path, Schema schema, String dataStoreFileName) throws IOException{
52
        path.getFileSystem().mkdirs(path.getPath());
53
        FileSystemPath outFile = new FileSystemPath(
54
                path, dataStoreFileName);
55
        return DataStore.createSingleFile(outFile, schema);
56
    }
57
	
58
	/**
59
	 * Get reader for reading records from given data store
60
	 * 
61
	 * Here the schema used for reading the data store is set to be the same 
62
	 * as the one that was used to write it.
63
	 * 
64
	 * @see getReader(FileSystemPath path, Schema readerSchema) for details.
65
	 * 
66
	 */
67
	public static <T> CloseableIterator<T> getReader(FileSystemPath path) 
68
			throws IOException{
69
		return getReader(path, null);
70
	}
71
	
72
	/**
73
	 * Get reader for reading records from given data store
74
	 * @param path path to a directory corresponding to data store
75
	 * @param readerSchema the schema onto which the read data store will 
76
	 * 	be projected
77
	 */
78
	public static <T> CloseableIterator<T> getReader(
79
			FileSystemPath path, Schema readerSchema) throws IOException{
80
		return new AvroDataStoreReader<T>(path, readerSchema);
81
	}
82
	
83
	/**
84
	 * Read data store entries and insert them into a list. A utility function.
85
	 * 
86
	 * Here the schema used for reading the data store is set to be the same 
87
	 * as the one that was used to write it.
88
	 */
89
	public static <T> List<T> read(FileSystemPath path) 
90
			throws IOException{
91
		return read(path, null);
92
	}
93
	
94
	/**
95
	 * Read data store entries and insert them into a list. A utility function.
96
	 * 
97
	 * @param readerSchema the schema onto which the read data store will 
98
	 * 	be projected
99
	 */
100
	public static <T> List<T> read(FileSystemPath path, Schema readerSchema) 
101
			throws IOException{
102
		CloseableIterator<T> iterator = getReader(path, readerSchema);
103
		List<T> elems = new ArrayList<T>();
104
		while(iterator.hasNext()){
105
			elems.add(iterator.next());
106
		}
107
		return elems;
108
	}
109
	
110
	/**
111
	 * Create a data store from a list of entries. A utility function.
112
	 * The schema is implicitly
113
	 * taken from the first element from the {@code elements} list.
114
	 * @param elements list of elements to write. At least one element has
115
	 * to be present, because it is used to retrieve schema of the
116
	 * structures passed in the list.
117
	 */
118
	public static <T extends GenericContainer> void create(
119
			List<T> elements, FileSystemPath path) throws IOException{
120
		if(elements.isEmpty()){
121
			throw new IllegalArgumentException(
122
					"The list of elements has to be non-empty");
123
		}
124
		Schema schema = elements.get(0).getSchema();
125
		create(elements, path, schema);
126
	}
127

    
128
	/**
129
	 * Create a data store from a list of entries with schema given explicitly.
130
	 * A utility function.
131
	 */
132
	public static <T extends GenericContainer> void create(
133
			List<T> elements, FileSystemPath path, Schema schema) 
134
					throws IOException{
135
		DataFileWriter<T> writer = create(path, schema);
136
		try{
137
			for(T i: elements){
138
				writer.append(i);
139
			}
140
		} finally {
141
			if(writer != null){
142
				writer.close();
143
			}
144
		}
145
	}
146
	
147
	/**
148
	 * Create a single Avro file. This method shouldn't be normally used to
149
	 * create data stores since it creates only a single Avro file, 
150
	 * while a data store consists of a directory containing one or more files.
151
	 */
152
	public static <T> DataFileWriter<T> createSingleFile(
153
			FileSystemPath path, Schema schema) throws IOException{
154
		DatumWriter<T> datumWriter = new SpecificDatumWriter<T>();
155
		DataFileWriter<T> writer = new DataFileWriter<T>(datumWriter);
156
		writer.create(schema, path.getFileSystem().create(path.getPath()));
157
		return writer;
158
	}
159

    
160
	/**
161
	 * Generates filename for given file number.
162
	 * @param fileNo file sequence number
163
	 */
164
	public static String generateFileName(int fileNo) {
165
        StringBuffer strBuff = new StringBuffer(String.valueOf(fileNo));
166
        while(strBuff.length()<FILE_NO_PADDING_LENGTH) {
167
            strBuff.insert(0, '0');
168
        }
169
        strBuff.append(".avro");
170
        return strBuff.toString();
171
    }
172
}
(4-4/7)