1
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/pom.xml trunk/pom.xml
|
2
|
--- 3rdparty-original/piggybank/java/pom.xml 1970-01-01 01:00:00.000000000 +0100
|
3
|
+++ trunk/pom.xml 2013-06-11 01:02:24.676859717 +0200
|
4
|
@@ -0,0 +1,121 @@
|
5
|
+<?xml version="1.0" encoding="UTF-8"?>
|
6
|
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
7
|
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
8
|
+
|
9
|
+ <parent>
|
10
|
+ <groupId>eu.dnetlib</groupId>
|
11
|
+ <artifactId>dnet-parent</artifactId>
|
12
|
+ <version>0.0.1-SNAPSHOT</version>
|
13
|
+ </parent>
|
14
|
+ <modelVersion>4.0.0</modelVersion>
|
15
|
+ <artifactId>icm-iis-3rdparty-pig-avrostorage</artifactId>
|
16
|
+ <packaging>jar</packaging>
|
17
|
+ <!-- <version>0.0.1-SNAPSHOT</version> -->
|
18
|
+ <properties>
|
19
|
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
20
|
+ </properties>
|
21
|
+ <dependencies>
|
22
|
+ <dependency>
|
23
|
+ <groupId>org.apache.pig</groupId>
|
24
|
+ <artifactId>pig</artifactId>
|
25
|
+ <version>0.10.0-cdh4.1.2</version>
|
26
|
+ </dependency>
|
27
|
+ <dependency>
|
28
|
+ <groupId>org.apache.avro</groupId>
|
29
|
+ <artifactId>avro</artifactId>
|
30
|
+ <version>1.7.4</version>
|
31
|
+ </dependency>
|
32
|
+ <dependency>
|
33
|
+ <groupId>org.apache.hadoop</groupId>
|
34
|
+ <artifactId>hadoop-common</artifactId>
|
35
|
+ <version>2.0.0-cdh4.1.2</version>
|
36
|
+ </dependency>
|
37
|
+ <dependency>
|
38
|
+ <groupId>org.apache.hadoop</groupId>
|
39
|
+ <artifactId>hadoop-core</artifactId>
|
40
|
+ <version>2.0.0-mr1-cdh4.1.2</version>
|
41
|
+ <type>jar</type>
|
42
|
+ </dependency>
|
43
|
+ <dependency>
|
44
|
+ <groupId>com.googlecode.json-simple</groupId>
|
45
|
+ <artifactId>json-simple</artifactId>
|
46
|
+ <version>1.1</version>
|
47
|
+ <type>jar</type>
|
48
|
+ </dependency>
|
49
|
+ <dependency>
|
50
|
+ <groupId>junit</groupId>
|
51
|
+ <artifactId>junit</artifactId>
|
52
|
+ <version>4.10</version>
|
53
|
+ <scope>test</scope>
|
54
|
+ </dependency>
|
55
|
+ <dependency>
|
56
|
+ <groupId>org.antlr</groupId>
|
57
|
+ <artifactId>antlr-runtime</artifactId>
|
58
|
+ <version>3.4</version>
|
59
|
+ </dependency>
|
60
|
+ <dependency>
|
61
|
+ <groupId>eu.dnetlib</groupId>
|
62
|
+ <artifactId>icm-iis-core</artifactId>
|
63
|
+ <version>0.0.1-SNAPSHOT</version>
|
64
|
+ <type>jar</type>
|
65
|
+ </dependency>
|
66
|
+ </dependencies>
|
67
|
+ <build>
|
68
|
+ <plugins>
|
69
|
+ <plugin>
|
70
|
+ <groupId>org.apache.maven.plugins</groupId>
|
71
|
+ <artifactId>maven-compiler-plugin</artifactId>
|
72
|
+ <configuration>
|
73
|
+ <source>1.6</source>
|
74
|
+ <target>1.6</target>
|
75
|
+ </configuration>
|
76
|
+ </plugin>
|
77
|
+ <plugin>
|
78
|
+ <groupId>org.apache.maven.plugins</groupId>
|
79
|
+ <artifactId>maven-surefire-plugin</artifactId>
|
80
|
+ <version>2.4.3</version>
|
81
|
+ <configuration>
|
82
|
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
|
83
|
+ <includes>
|
84
|
+ <include>**/*Test.java</include>
|
85
|
+ <include>**/avro/Test*.java</include>
|
86
|
+ </includes>
|
87
|
+ <excludes>
|
88
|
+ <exclude>**/AllTests.java</exclude>
|
89
|
+ <exclude>**/Abstract*Test.java</exclude>
|
90
|
+ <exclude>**/*$*</exclude>
|
91
|
+ </excludes>
|
92
|
+ </configuration>
|
93
|
+ </plugin>
|
94
|
+ </plugins>
|
95
|
+ </build>
|
96
|
+ <repositories>
|
97
|
+ <repository>
|
98
|
+ <id>cloudera</id>
|
99
|
+ <name>Cloudera Repository</name>
|
100
|
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
|
101
|
+ <releases>
|
102
|
+ <enabled>true</enabled>
|
103
|
+ </releases>
|
104
|
+ <snapshots>
|
105
|
+ <enabled>false</enabled>
|
106
|
+ </snapshots>
|
107
|
+ </repository>
|
108
|
+ <!-- This repository contains our patched
|
109
|
+ version of "avro" and "avro-mapred" modules (see the dependencies section)
|
110
|
+ This entry might be removed when the patch to these modules becomes
|
111
|
+ a part of the official Avro release.-->
|
112
|
+ <repository>
|
113
|
+ <id>dnet-deps</id>
|
114
|
+ <name>dnet dependencies</name>
|
115
|
+ <url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet-deps</url>
|
116
|
+ <releases>
|
117
|
+ <enabled>true</enabled>
|
118
|
+ </releases>
|
119
|
+ <snapshots>
|
120
|
+ <enabled>false</enabled>
|
121
|
+ </snapshots>
|
122
|
+ <layout>default</layout>
|
123
|
+ </repository>
|
124
|
+ </repositories>
|
125
|
+</project>
|
126
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
|
127
|
--- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java 2013-06-11 01:48:03.516933061 +0200
|
128
|
+++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java 2013-06-11 00:11:55.000000000 +0200
|
129
|
@@ -102,6 +102,7 @@ public class AvroSchema2Pig {
|
130
|
|
131
|
tupleSchema.setFields(childFields);
|
132
|
fieldSchema.setSchema(tupleSchema);
|
133
|
+ visitedRecords.remove(in);
|
134
|
}
|
135
|
|
136
|
} else if (avroType.equals(Schema.Type.ARRAY)) {
|
137
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
|
138
|
--- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java 2013-06-11 01:48:03.516933061 +0200
|
139
|
+++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java 2013-06-11 00:12:54.000000000 +0200
|
140
|
@@ -17,6 +17,7 @@
|
141
|
|
142
|
package org.apache.pig.piggybank.storage.avro;
|
143
|
|
144
|
+import eu.dnetlib.iis.core.common.AvroUtils;
|
145
|
import java.io.IOException;
|
146
|
import java.io.InputStream;
|
147
|
import java.util.ArrayList;
|
148
|
@@ -32,6 +33,8 @@ import org.apache.avro.Schema;
|
149
|
import org.apache.avro.Schema.Field;
|
150
|
import org.apache.avro.file.DataFileStream;
|
151
|
import org.apache.avro.generic.GenericDatumReader;
|
152
|
+import org.apache.avro.specific.SpecificRecordBase;
|
153
|
+import org.apache.commons.lang.StringUtils;
|
154
|
import org.apache.hadoop.conf.Configuration;
|
155
|
import org.apache.hadoop.fs.FileStatus;
|
156
|
import org.apache.hadoop.fs.FileSystem;
|
157
|
@@ -105,6 +108,9 @@ public class AvroStorage extends FileInp
|
158
|
private boolean checkSchema = true; /*whether check schema of input directories*/
|
159
|
private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
|
160
|
|
161
|
+ private String inputSchemaFile;
|
162
|
+ private String outputSchemaFile;
|
163
|
+
|
164
|
/**
|
165
|
* Empty constructor. Output schema is derived from pig schema.
|
166
|
*/
|
167
|
@@ -122,7 +128,7 @@ public class AvroStorage extends FileInp
|
168
|
* @throws IOException
|
169
|
* @throws ParseException
|
170
|
*/
|
171
|
- public AvroStorage(String[] parts) throws IOException, ParseException {
|
172
|
+ public AvroStorage(String[] parts) throws IOException, ParseException, ClassNotFoundException, InstantiationException, IllegalAccessException {
|
173
|
outputAvroSchema = null;
|
174
|
nullable = true;
|
175
|
checkSchema = true;
|
176
|
@@ -147,13 +153,18 @@ public class AvroStorage extends FileInp
|
177
|
*/
|
178
|
@Override
|
179
|
public void setLocation(String location, Job job) throws IOException {
|
180
|
- if (inputAvroSchema != null) {
|
181
|
- return;
|
182
|
- }
|
183
|
Set<Path> paths = new HashSet<Path>();
|
184
|
Configuration conf = job.getConfiguration();
|
185
|
if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
|
186
|
- setInputAvroSchema(paths, conf);
|
187
|
+ if (inputAvroSchema == null) {
|
188
|
+ if (inputSchemaFile != null) {
|
189
|
+ FileSystem fs = FileSystem.get(job.getConfiguration());
|
190
|
+ Path path = fs.makeQualified(new Path(inputSchemaFile));
|
191
|
+ inputAvroSchema = getSchemaFromFile(path, fs);
|
192
|
+ } else {
|
193
|
+ setInputAvroSchema(paths, conf);
|
194
|
+ }
|
195
|
+ }
|
196
|
FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
|
197
|
} else {
|
198
|
throw new IOException("Input path \'" + location + "\' is not found");
|
199
|
@@ -416,7 +427,6 @@ public class AvroStorage extends FileInp
|
200
|
*/
|
201
|
@SuppressWarnings("unchecked")
|
202
|
protected Map<String, Object> parseJsonString(String jsonString) throws ParseException {
|
203
|
-
|
204
|
/*parse the json object */
|
205
|
JSONParser parser = new JSONParser();
|
206
|
JSONObject obj = (JSONObject) parser.parse(jsonString);
|
207
|
@@ -450,7 +460,6 @@ public class AvroStorage extends FileInp
|
208
|
* @throws ParseException
|
209
|
*/
|
210
|
protected Map<String, Object> parseStringList(String[] parts) throws IOException {
|
211
|
-
|
212
|
Map<String, Object> map = new HashMap<String, Object>();
|
213
|
|
214
|
for (int i = 0; i < parts.length; ) {
|
215
|
@@ -477,6 +486,10 @@ public class AvroStorage extends FileInp
|
216
|
|| name.equalsIgnoreCase("same")
|
217
|
|| name.equalsIgnoreCase("schema")
|
218
|
|| name.equalsIgnoreCase("schema_file")
|
219
|
+ || name.equalsIgnoreCase("input_schema_file")
|
220
|
+ || name.equalsIgnoreCase("output_schema_file")
|
221
|
+ || name.equalsIgnoreCase("input_schema_class")
|
222
|
+ || name.equalsIgnoreCase("output_schema_class")
|
223
|
|| name.matches("field\\d+")) {
|
224
|
/* store value as string */
|
225
|
map.put(name, value);
|
226
|
@@ -496,8 +509,7 @@ public class AvroStorage extends FileInp
|
227
|
/**
|
228
|
* Initialize output avro schema using input property map
|
229
|
*/
|
230
|
- protected void init(Map<String, Object> inputs) throws IOException {
|
231
|
-
|
232
|
+ protected void init(Map<String, Object> inputs) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
|
233
|
/*used to store field schemas */
|
234
|
List<Field> fields = null;
|
235
|
|
236
|
@@ -517,11 +529,18 @@ public class AvroStorage extends FileInp
|
237
|
}
|
238
|
else if (inputs.containsKey("schema_file")) {
|
239
|
Path path = new Path((String) inputs.get("schema_file"));
|
240
|
+
|
241
|
AvroStorageLog.details("schemaFile path=" + path.toUri().toString());
|
242
|
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
|
243
|
Schema schema = getSchemaFromFile(path, fs);
|
244
|
schemaManager = new AvroSchemaManager(schema);
|
245
|
}
|
246
|
+ else if (inputs.containsKey("input_schema_file")) {
|
247
|
+ inputSchemaFile = (String) inputs.get("input_schema_file");
|
248
|
+ }
|
249
|
+ else if (inputs.containsKey("output_schema_file")) {
|
250
|
+ outputSchemaFile = (String) inputs.get("output_schema_file");
|
251
|
+ }
|
252
|
|
253
|
/* iterate input property map */
|
254
|
for (Entry<String, Object> entry : inputs.entrySet()) {
|
255
|
@@ -541,6 +560,10 @@ public class AvroStorage extends FileInp
|
256
|
nullable = (Boolean) value;
|
257
|
} else if (name.equalsIgnoreCase("schema")) {
|
258
|
outputAvroSchema = Schema.parse((String) value);
|
259
|
+ } else if (name.equalsIgnoreCase("input_schema_class")) {
|
260
|
+ inputAvroSchema = AvroUtils.toSchema((String) value);
|
261
|
+ } else if (name.equalsIgnoreCase("output_schema_class")) {
|
262
|
+ outputAvroSchema = AvroUtils.toSchema((String) value);
|
263
|
} else if (name.matches("field\\d+")) {
|
264
|
/*set schema of dth field */
|
265
|
if (fields == null)
|
266
|
@@ -579,6 +602,8 @@ public class AvroStorage extends FileInp
|
267
|
fields.add(field);
|
268
|
} else if (!name.equalsIgnoreCase("data")
|
269
|
&& !name.equalsIgnoreCase("schema_file")
|
270
|
+ && !name.equalsIgnoreCase("input_schema_file")
|
271
|
+ && !name.equalsIgnoreCase("output_schema_file")
|
272
|
&& !name.equalsIgnoreCase("debug")) {
|
273
|
throw new IOException("Invalid parameter:" + name);
|
274
|
}
|
275
|
@@ -599,7 +624,6 @@ public class AvroStorage extends FileInp
|
276
|
nullable = true;
|
277
|
}
|
278
|
}
|
279
|
-
|
280
|
}
|
281
|
|
282
|
@Override
|
283
|
@@ -609,6 +633,31 @@ public class AvroStorage extends FileInp
|
284
|
|
285
|
@Override
|
286
|
public void setStoreLocation(String location, Job job) throws IOException {
|
287
|
+ if (outputSchemaFile != null) {
|
288
|
+ FileSystem fs = FileSystem.get(job.getConfiguration());
|
289
|
+ Path path = fs.makeQualified(new Path(outputSchemaFile));
|
290
|
+
|
291
|
+ outputAvroSchema = getSchemaFromFile(path, fs);
|
292
|
+
|
293
|
+ UDFContext context = UDFContext.getUDFContext();
|
294
|
+ Properties property = context.getUDFProperties(ResourceSchema.class);
|
295
|
+
|
296
|
+ String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
|
297
|
+ String key = getSchemaKey();
|
298
|
+ Map<String, String> schemaMap = (prevSchemaStr != null)
|
299
|
+ ? parseSchemaMap(prevSchemaStr)
|
300
|
+ : new HashMap<String, String>();
|
301
|
+ schemaMap.put(key, outputAvroSchema.toString());
|
302
|
+
|
303
|
+ List<String> schemas = new ArrayList<String>();
|
304
|
+ for (Map.Entry<String, String> entry : schemaMap.entrySet()) {
|
305
|
+ schemas.add(entry.getKey() + AvroStorage.SCHEMA_KEYVALUE_DELIM + entry.getValue());
|
306
|
+ }
|
307
|
+
|
308
|
+ String newSchemaStr = StringUtils.join(schemas, AvroStorage.SCHEMA_DELIM);
|
309
|
+ property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
|
310
|
+ }
|
311
|
+
|
312
|
AvroStorageLog.details("output location=" + location);
|
313
|
FileOutputFormat.setOutputPath(job, new Path(location));
|
314
|
}
|
315
|
@@ -621,6 +670,7 @@ public class AvroStorage extends FileInp
|
316
|
AvroStorageLog.funcCall("Check schema");
|
317
|
UDFContext context = UDFContext.getUDFContext();
|
318
|
Properties property = context.getUDFProperties(ResourceSchema.class);
|
319
|
+
|
320
|
String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
|
321
|
AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
|
322
|
|
323
|
@@ -683,6 +733,7 @@ public class AvroStorage extends FileInp
|
324
|
UDFContext context = UDFContext.getUDFContext();
|
325
|
Properties property = context.getUDFProperties(ResourceSchema.class);
|
326
|
String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
|
327
|
+
|
328
|
Map<String, String> map = (allSchemaStr != null) ? parseSchemaMap(allSchemaStr) : null;
|
329
|
|
330
|
String key = getSchemaKey();
|
331
|
@@ -711,7 +762,7 @@ public class AvroStorage extends FileInp
|
332
|
StoreFunc.cleanupOnFailureImpl(location, job);
|
333
|
}
|
334
|
|
335
|
- @Override
|
336
|
+ //@Override
|
337
|
public void cleanupOnSuccess(String location, Job job) throws IOException {
|
338
|
// Nothing to do
|
339
|
}
|
340
|
@@ -719,7 +770,7 @@ public class AvroStorage extends FileInp
|
341
|
@Override
|
342
|
public void putNext(Tuple t) throws IOException {
|
343
|
try {
|
344
|
- this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t);
|
345
|
+ this.writer.write(NullWritable.get(), t);
|
346
|
} catch (InterruptedException e) {
|
347
|
e.printStackTrace();
|
348
|
}
|
349
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro
|
350
|
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro 1970-01-01 01:00:00.000000000 +0100
|
351
|
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro 2013-06-11 01:28:15.576901251 +0200
|
352
|
@@ -0,0 +1 @@
|
353
|
+Objavro.schema?{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]} ?w?(d`??????0?????????w?(d`??????
|
354
|
\ Brak znaku nowej linii na końcu pliku
|
355
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro
|
356
|
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro 1970-01-01 01:00:00.000000000 +0100
|
357
|
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro 2013-06-11 01:16:41.648882666 +0200
|
358
|
@@ -0,0 +1 @@
|
359
|
+Objavro.schema?{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]} ?w?(d`??????0?????????w?(d`??????
|
360
|
\ Brak znaku nowej linii na końcu pliku
|
361
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc
|
362
|
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc 1970-01-01 01:00:00.000000000 +0100
|
363
|
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc 2013-06-11 01:16:56.912883076 +0200
|
364
|
@@ -0,0 +1,8 @@
|
365
|
+{
|
366
|
+ "type": "record",
|
367
|
+ "name": "simple",
|
368
|
+ "fields": [
|
369
|
+ {"name": "member_id", "type": "int"},
|
370
|
+ {"name": "count", "type": "long"}
|
371
|
+ ]
|
372
|
+}
|
373
|
\ Brak znaku nowej linii na końcu pliku
|
374
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
|
375
|
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java 2013-06-11 01:48:03.512933062 +0200
|
376
|
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java 2013-06-11 01:23:38.920893841 +0200
|
377
|
@@ -35,7 +35,6 @@ import org.apache.pig.backend.hadoop.exe
|
378
|
import org.apache.pig.impl.logicalLayer.FrontendException;
|
379
|
import org.apache.pig.piggybank.storage.avro.AvroStorage;
|
380
|
import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
|
381
|
-import org.apache.pig.test.Util;
|
382
|
import org.junit.AfterClass;
|
383
|
import org.junit.Assert;
|
384
|
import org.junit.BeforeClass;
|
385
|
@@ -81,7 +80,9 @@ public class TestAvroStorage {
|
386
|
final private String testNoMatchedFiles = getInputFile("test_dir{1,2}/file_that_does_not_exist*.avro");
|
387
|
final private String testArrayFile = getInputFile("test_array.avro");
|
388
|
final private String testRecordFile = getInputFile("test_record.avro");
|
389
|
+ final private String testSimpleFile = getInputFile("test_simple.avro");
|
390
|
final private String testRecordSchema = getInputFile("test_record.avsc");
|
391
|
+ final private String testSimpleRecordSchema = getInputFile("test_simple_record.avsc");
|
392
|
final private String testGenericUnionFile = getInputFile("test_generic_union.avro");
|
393
|
final private String testRecursiveRecordInMap = getInputFile("test_recursive_record_in_map.avro");
|
394
|
final private String testRecursiveRecordInArray = getInputFile("test_recursive_record_in_array.avro");
|
395
|
@@ -973,6 +974,76 @@ public class TestAvroStorage {
|
396
|
verifyResults(output, expected);
|
397
|
}
|
398
|
|
399
|
+ @Test
|
400
|
+ public void testOutputSchemaFile() throws IOException {
|
401
|
+ String output1= outbasedir + "testOutputSchemaFile";
|
402
|
+ String expected1 = basedir + "expected_testRecordSplit3.avro";
|
403
|
+ deleteDirectory(new File(output1));
|
404
|
+ String [] queries = {
|
405
|
+ " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
|
406
|
+ " groups = GROUP avro BY member_id;",
|
407
|
+ " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
|
408
|
+ " STORE sc INTO '" + output1 +
|
409
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
|
410
|
+ " 'no_schema_check'," +
|
411
|
+ " 'output_schema_file', '" + testSimpleRecordSchema + "' );"
|
412
|
+ };
|
413
|
+ testAvroStorage( queries);
|
414
|
+ verifyResults(output1, expected1);
|
415
|
+ }
|
416
|
+
|
417
|
+ @Test
|
418
|
+ public void testInputSchemaFile() throws IOException {
|
419
|
+ String output1= outbasedir + "testInputSchemaFile";
|
420
|
+ String expected1 = basedir + "expected_testRecordSplit3.avro";
|
421
|
+ deleteDirectory(new File(output1));
|
422
|
+ String [] queries = {
|
423
|
+ " avro = LOAD '" + testSimpleFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('input_schema_file', '" + testSimpleRecordSchema + "');",
|
424
|
+ " STORE avro INTO '" + output1 +
|
425
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
|
426
|
+ " 'no_schema_check'," +
|
427
|
+ " 'output_schema_file', '" + testSimpleRecordSchema + "' );"
|
428
|
+ };
|
429
|
+ testAvroStorage( queries);
|
430
|
+ verifyResults(output1, expected1);
|
431
|
+ }
|
432
|
+
|
433
|
+ @Test
|
434
|
+ public void testInputSchemaClass() throws IOException {
|
435
|
+ String output= outbasedir + "testInputSchemaClass";
|
436
|
+ String test = getInputFile("expected_testRecursiveRecordReference1.avro");
|
437
|
+ String expected = basedir + "expected_testRecursiveRecordReference1.avro";
|
438
|
+ deleteDirectory(new File(output));
|
439
|
+ String [] queries = {
|
440
|
+ " in = LOAD '" + test +
|
441
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+
|
442
|
+ " 'input_schema_class', 'org.apache.avro.Schema.Type.INT' );",
|
443
|
+ " STORE in INTO '" + output +
|
444
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
|
445
|
+ " 'schema', '\"int\"' );"
|
446
|
+ };
|
447
|
+ testAvroStorage(queries);
|
448
|
+ verifyResults(output, expected);
|
449
|
+ }
|
450
|
+
|
451
|
+ @Test
|
452
|
+ public void testOutputSchemaClass() throws IOException {
|
453
|
+ String output= outbasedir + "testOutputSchemaClass";
|
454
|
+ String test = getInputFile("expected_testRecursiveRecordReference1.avro");
|
455
|
+ String expected = basedir + "expected_testRecursiveRecordReference1.avro";
|
456
|
+ deleteDirectory(new File(output));
|
457
|
+ String [] queries = {
|
458
|
+ " in = LOAD '" + test +
|
459
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+
|
460
|
+ " 'schema', '\"int\"' );",
|
461
|
+ " STORE in INTO '" + output +
|
462
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
|
463
|
+ " 'output_schema_class', 'org.apache.avro.Schema.Type.INT' );"
|
464
|
+ };
|
465
|
+ testAvroStorage(queries);
|
466
|
+ verifyResults(output, expected);
|
467
|
+ }
|
468
|
+
|
469
|
private static void deleteDirectory (File path) {
|
470
|
if ( path.exists()) {
|
471
|
File [] files = path.listFiles();
|
472
|
@@ -1014,8 +1085,8 @@ public class TestAvroStorage {
|
473
|
|
474
|
private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException {
|
475
|
// Seems compress for Avro is broken in 23. Skip this test and open Jira PIG-
|
476
|
- if (Util.isHadoop23())
|
477
|
- return;
|
478
|
+// if (Util.isHadoop23())
|
479
|
+// return;
|
480
|
|
481
|
FileSystem fs = FileSystem.getLocal(new Configuration()) ;
|
482
|
|