1 |
51263
|
claudio.at
|
package eu.dnetlib.dhp.common.java.io;
|
2 |
|
|
|
3 |
|
|
import java.io.IOException;
|
4 |
|
|
import java.util.ArrayList;
|
5 |
|
|
import java.util.List;
|
6 |
|
|
|
7 |
|
|
import org.apache.avro.Schema;
|
8 |
|
|
import org.apache.avro.file.DataFileWriter;
|
9 |
|
|
import org.apache.avro.generic.GenericContainer;
|
10 |
|
|
import org.apache.avro.io.DatumWriter;
|
11 |
|
|
import org.apache.avro.specific.SpecificDatumWriter;
|
12 |
|
|
|
13 |
|
|
|
14 |
|
|
/**
|
15 |
|
|
* Utility for accessing to Avro-based data stores stored in file system
|
16 |
|
|
* @author Mateusz Kobos
|
17 |
|
|
*
|
18 |
|
|
*/
|
19 |
|
|
public final class DataStore {
|
20 |
|
|
|
21 |
|
|
private final static String singleDataStoreFileName = "content.avro";
|
22 |
|
|
|
23 |
|
|
private static final int FILE_NO_PADDING_LENGTH = 7;
|
24 |
|
|
|
25 |
|
|
private DataStore(){}
|
26 |
|
|
|
27 |
|
|
/**
|
28 |
|
|
* Create a new data store directory with single file and return writer that allows
|
29 |
|
|
* adding new records
|
30 |
|
|
* @param path path to a directory to be created
|
31 |
|
|
* @param schema schema of the records to be stored in the file
|
32 |
|
|
* @return
|
33 |
|
|
* @throws IOException
|
34 |
|
|
*/
|
35 |
|
|
public static <T> DataFileWriter<T> create(
|
36 |
|
|
FileSystemPath path, Schema schema) throws IOException{
|
37 |
|
|
return create(path, schema, singleDataStoreFileName);
|
38 |
|
|
}
|
39 |
|
|
|
40 |
|
|
|
41 |
|
|
/**
|
42 |
|
|
* Create a new data store directory and return writer that allows
|
43 |
|
|
* adding new records
|
44 |
|
|
* @param path path to a directory to be created
|
45 |
|
|
* @param schema schema of the records to be stored in the file
|
46 |
|
|
* @param dataStoreFileName datastore file name
|
47 |
|
|
* @return
|
48 |
|
|
* @throws IOException
|
49 |
|
|
*/
|
50 |
|
|
public static <T> DataFileWriter<T> create(
|
51 |
|
|
FileSystemPath path, Schema schema, String dataStoreFileName) throws IOException{
|
52 |
|
|
path.getFileSystem().mkdirs(path.getPath());
|
53 |
|
|
FileSystemPath outFile = new FileSystemPath(
|
54 |
|
|
path, dataStoreFileName);
|
55 |
|
|
return DataStore.createSingleFile(outFile, schema);
|
56 |
|
|
}
|
57 |
|
|
|
58 |
|
|
/**
|
59 |
|
|
* Get reader for reading records from given data store
|
60 |
|
|
*
|
61 |
|
|
* Here the schema used for reading the data store is set to be the same
|
62 |
|
|
* as the one that was used to write it.
|
63 |
|
|
*
|
64 |
|
|
* @see getReader(FileSystemPath path, Schema readerSchema) for details.
|
65 |
|
|
*
|
66 |
|
|
*/
|
67 |
|
|
public static <T> CloseableIterator<T> getReader(FileSystemPath path)
|
68 |
|
|
throws IOException{
|
69 |
|
|
return getReader(path, null);
|
70 |
|
|
}
|
71 |
|
|
|
72 |
|
|
/**
|
73 |
|
|
* Get reader for reading records from given data store
|
74 |
|
|
* @param path path to a directory corresponding to data store
|
75 |
|
|
* @param readerSchema the schema onto which the read data store will
|
76 |
|
|
* be projected
|
77 |
|
|
*/
|
78 |
|
|
public static <T> CloseableIterator<T> getReader(
|
79 |
|
|
FileSystemPath path, Schema readerSchema) throws IOException{
|
80 |
|
|
return new AvroDataStoreReader<T>(path, readerSchema);
|
81 |
|
|
}
|
82 |
|
|
|
83 |
|
|
/**
|
84 |
|
|
* Read data store entries and insert them into a list. A utility function.
|
85 |
|
|
*
|
86 |
|
|
* Here the schema used for reading the data store is set to be the same
|
87 |
|
|
* as the one that was used to write it.
|
88 |
|
|
*/
|
89 |
|
|
public static <T> List<T> read(FileSystemPath path)
|
90 |
|
|
throws IOException{
|
91 |
|
|
return read(path, null);
|
92 |
|
|
}
|
93 |
|
|
|
94 |
|
|
/**
|
95 |
|
|
* Read data store entries and insert them into a list. A utility function.
|
96 |
|
|
*
|
97 |
|
|
* @param readerSchema the schema onto which the read data store will
|
98 |
|
|
* be projected
|
99 |
|
|
*/
|
100 |
|
|
public static <T> List<T> read(FileSystemPath path, Schema readerSchema)
|
101 |
|
|
throws IOException{
|
102 |
|
|
CloseableIterator<T> iterator = getReader(path, readerSchema);
|
103 |
|
|
List<T> elems = new ArrayList<T>();
|
104 |
|
|
while(iterator.hasNext()){
|
105 |
|
|
elems.add(iterator.next());
|
106 |
|
|
}
|
107 |
|
|
return elems;
|
108 |
|
|
}
|
109 |
|
|
|
110 |
|
|
/**
|
111 |
|
|
* Create a data store from a list of entries. A utility function.
|
112 |
|
|
* The schema is implicitly
|
113 |
|
|
* taken from the first element from the {@code elements} list.
|
114 |
|
|
* @param elements list of elements to write. At least one element has
|
115 |
|
|
* to be present, because it is used to retrieve schema of the
|
116 |
|
|
* structures passed in the list.
|
117 |
|
|
*/
|
118 |
|
|
public static <T extends GenericContainer> void create(
|
119 |
|
|
List<T> elements, FileSystemPath path) throws IOException{
|
120 |
|
|
if(elements.isEmpty()){
|
121 |
|
|
throw new IllegalArgumentException(
|
122 |
|
|
"The list of elements has to be non-empty");
|
123 |
|
|
}
|
124 |
|
|
Schema schema = elements.get(0).getSchema();
|
125 |
|
|
create(elements, path, schema);
|
126 |
|
|
}
|
127 |
|
|
|
128 |
|
|
/**
|
129 |
|
|
* Create a data store from a list of entries with schema given explicitly.
|
130 |
|
|
* A utility function.
|
131 |
|
|
*/
|
132 |
|
|
public static <T extends GenericContainer> void create(
|
133 |
|
|
List<T> elements, FileSystemPath path, Schema schema)
|
134 |
|
|
throws IOException{
|
135 |
|
|
DataFileWriter<T> writer = create(path, schema);
|
136 |
|
|
try{
|
137 |
|
|
for(T i: elements){
|
138 |
|
|
writer.append(i);
|
139 |
|
|
}
|
140 |
|
|
} finally {
|
141 |
|
|
if(writer != null){
|
142 |
|
|
writer.close();
|
143 |
|
|
}
|
144 |
|
|
}
|
145 |
|
|
}
|
146 |
|
|
|
147 |
|
|
/**
|
148 |
|
|
* Create a single Avro file. This method shouldn't be normally used to
|
149 |
|
|
* create data stores since it creates only a single Avro file,
|
150 |
|
|
* while a data store consists of a directory containing one or more files.
|
151 |
|
|
*/
|
152 |
|
|
public static <T> DataFileWriter<T> createSingleFile(
|
153 |
|
|
FileSystemPath path, Schema schema) throws IOException{
|
154 |
|
|
DatumWriter<T> datumWriter = new SpecificDatumWriter<T>();
|
155 |
|
|
DataFileWriter<T> writer = new DataFileWriter<T>(datumWriter);
|
156 |
|
|
writer.create(schema, path.getFileSystem().create(path.getPath()));
|
157 |
|
|
return writer;
|
158 |
|
|
}
|
159 |
|
|
|
160 |
|
|
/**
|
161 |
|
|
* Generates filename for given file number.
|
162 |
|
|
* @param fileNo file sequence number
|
163 |
|
|
*/
|
164 |
|
|
public static String generateFileName(int fileNo) {
|
165 |
|
|
StringBuffer strBuff = new StringBuffer(String.valueOf(fileNo));
|
166 |
|
|
while(strBuff.length()<FILE_NO_PADDING_LENGTH) {
|
167 |
|
|
strBuff.insert(0, '0');
|
168 |
|
|
}
|
169 |
|
|
strBuff.append(".avro");
|
170 |
|
|
return strBuff.toString();
|
171 |
|
|
}
|
172 |
|
|
}
|