1
|
package eu.dnetlib.dhp.common.java.io;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.ArrayList;
|
5
|
import java.util.List;
|
6
|
|
7
|
import org.apache.avro.Schema;
|
8
|
import org.apache.avro.file.DataFileWriter;
|
9
|
import org.apache.avro.generic.GenericContainer;
|
10
|
import org.apache.avro.io.DatumWriter;
|
11
|
import org.apache.avro.specific.SpecificDatumWriter;
|
12
|
|
13
|
|
14
|
/**
|
15
|
* Utility for accessing to Avro-based data stores stored in file system
|
16
|
* @author Mateusz Kobos
|
17
|
*
|
18
|
*/
|
19
|
public final class DataStore {
|
20
|
|
21
|
private final static String singleDataStoreFileName = "content.avro";
|
22
|
|
23
|
private static final int FILE_NO_PADDING_LENGTH = 7;
|
24
|
|
25
|
private DataStore(){}
|
26
|
|
27
|
/**
|
28
|
* Create a new data store directory with single file and return writer that allows
|
29
|
* adding new records
|
30
|
* @param path path to a directory to be created
|
31
|
* @param schema schema of the records to be stored in the file
|
32
|
* @return
|
33
|
* @throws IOException
|
34
|
*/
|
35
|
public static <T> DataFileWriter<T> create(
|
36
|
FileSystemPath path, Schema schema) throws IOException{
|
37
|
return create(path, schema, singleDataStoreFileName);
|
38
|
}
|
39
|
|
40
|
|
41
|
/**
|
42
|
* Create a new data store directory and return writer that allows
|
43
|
* adding new records
|
44
|
* @param path path to a directory to be created
|
45
|
* @param schema schema of the records to be stored in the file
|
46
|
* @param dataStoreFileName datastore file name
|
47
|
* @return
|
48
|
* @throws IOException
|
49
|
*/
|
50
|
public static <T> DataFileWriter<T> create(
|
51
|
FileSystemPath path, Schema schema, String dataStoreFileName) throws IOException{
|
52
|
path.getFileSystem().mkdirs(path.getPath());
|
53
|
FileSystemPath outFile = new FileSystemPath(
|
54
|
path, dataStoreFileName);
|
55
|
return DataStore.createSingleFile(outFile, schema);
|
56
|
}
|
57
|
|
58
|
/**
|
59
|
* Get reader for reading records from given data store
|
60
|
*
|
61
|
* Here the schema used for reading the data store is set to be the same
|
62
|
* as the one that was used to write it.
|
63
|
*
|
64
|
* @see getReader(FileSystemPath path, Schema readerSchema) for details.
|
65
|
*
|
66
|
*/
|
67
|
public static <T> CloseableIterator<T> getReader(FileSystemPath path)
|
68
|
throws IOException{
|
69
|
return getReader(path, null);
|
70
|
}
|
71
|
|
72
|
/**
|
73
|
* Get reader for reading records from given data store
|
74
|
* @param path path to a directory corresponding to data store
|
75
|
* @param readerSchema the schema onto which the read data store will
|
76
|
* be projected
|
77
|
*/
|
78
|
public static <T> CloseableIterator<T> getReader(
|
79
|
FileSystemPath path, Schema readerSchema) throws IOException{
|
80
|
return new AvroDataStoreReader<T>(path, readerSchema);
|
81
|
}
|
82
|
|
83
|
/**
|
84
|
* Read data store entries and insert them into a list. A utility function.
|
85
|
*
|
86
|
* Here the schema used for reading the data store is set to be the same
|
87
|
* as the one that was used to write it.
|
88
|
*/
|
89
|
public static <T> List<T> read(FileSystemPath path)
|
90
|
throws IOException{
|
91
|
return read(path, null);
|
92
|
}
|
93
|
|
94
|
/**
|
95
|
* Read data store entries and insert them into a list. A utility function.
|
96
|
*
|
97
|
* @param readerSchema the schema onto which the read data store will
|
98
|
* be projected
|
99
|
*/
|
100
|
public static <T> List<T> read(FileSystemPath path, Schema readerSchema)
|
101
|
throws IOException{
|
102
|
CloseableIterator<T> iterator = getReader(path, readerSchema);
|
103
|
List<T> elems = new ArrayList<T>();
|
104
|
while(iterator.hasNext()){
|
105
|
elems.add(iterator.next());
|
106
|
}
|
107
|
return elems;
|
108
|
}
|
109
|
|
110
|
/**
|
111
|
* Create a data store from a list of entries. A utility function.
|
112
|
* The schema is implicitly
|
113
|
* taken from the first element from the {@code elements} list.
|
114
|
* @param elements list of elements to write. At least one element has
|
115
|
* to be present, because it is used to retrieve schema of the
|
116
|
* structures passed in the list.
|
117
|
*/
|
118
|
public static <T extends GenericContainer> void create(
|
119
|
List<T> elements, FileSystemPath path) throws IOException{
|
120
|
if(elements.isEmpty()){
|
121
|
throw new IllegalArgumentException(
|
122
|
"The list of elements has to be non-empty");
|
123
|
}
|
124
|
Schema schema = elements.get(0).getSchema();
|
125
|
create(elements, path, schema);
|
126
|
}
|
127
|
|
128
|
/**
|
129
|
* Create a data store from a list of entries with schema given explicitly.
|
130
|
* A utility function.
|
131
|
*/
|
132
|
public static <T extends GenericContainer> void create(
|
133
|
List<T> elements, FileSystemPath path, Schema schema)
|
134
|
throws IOException{
|
135
|
DataFileWriter<T> writer = create(path, schema);
|
136
|
try{
|
137
|
for(T i: elements){
|
138
|
writer.append(i);
|
139
|
}
|
140
|
} finally {
|
141
|
if(writer != null){
|
142
|
writer.close();
|
143
|
}
|
144
|
}
|
145
|
}
|
146
|
|
147
|
/**
|
148
|
* Create a single Avro file. This method shouldn't be normally used to
|
149
|
* create data stores since it creates only a single Avro file,
|
150
|
* while a data store consists of a directory containing one or more files.
|
151
|
*/
|
152
|
public static <T> DataFileWriter<T> createSingleFile(
|
153
|
FileSystemPath path, Schema schema) throws IOException{
|
154
|
DatumWriter<T> datumWriter = new SpecificDatumWriter<T>();
|
155
|
DataFileWriter<T> writer = new DataFileWriter<T>(datumWriter);
|
156
|
writer.create(schema, path.getFileSystem().create(path.getPath()));
|
157
|
return writer;
|
158
|
}
|
159
|
|
160
|
/**
|
161
|
* Generates filename for given file number.
|
162
|
* @param fileNo file sequence number
|
163
|
*/
|
164
|
public static String generateFileName(int fileNo) {
|
165
|
StringBuffer strBuff = new StringBuffer(String.valueOf(fileNo));
|
166
|
while(strBuff.length()<FILE_NO_PADDING_LENGTH) {
|
167
|
strBuff.insert(0, '0');
|
168
|
}
|
169
|
strBuff.append(".avro");
|
170
|
return strBuff.toString();
|
171
|
}
|
172
|
}
|