Project

General

Profile

1
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/pom.xml trunk/pom.xml
2
--- 3rdparty-original/piggybank/java/pom.xml	1970-01-01 01:00:00.000000000 +0100
3
+++ trunk/pom.xml	2013-06-11 01:02:24.676859717 +0200
4
@@ -0,0 +1,121 @@
5
+<?xml version="1.0" encoding="UTF-8"?>
6
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
7
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
8
+
9
+	<parent>
10
+		<groupId>eu.dnetlib</groupId>
11
+		<artifactId>dnet-parent</artifactId>
12
+		<version>0.0.1-SNAPSHOT</version>
13
+	</parent>
14
+	<modelVersion>4.0.0</modelVersion>
15
+	<artifactId>icm-iis-3rdparty-pig-avrostorage</artifactId>
16
+	<packaging>jar</packaging>
17
+	<!-- <version>0.0.1-SNAPSHOT</version> -->
18
+	<properties>
19
+		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
20
+	</properties>
21
+	<dependencies>
22
+        <dependency>
23
+			<groupId>org.apache.pig</groupId>
24
+			<artifactId>pig</artifactId>
25
+			<version>0.10.0-cdh4.1.2</version>
26
+		</dependency>
27
+        <dependency>
28
+			<groupId>org.apache.avro</groupId>
29
+			<artifactId>avro</artifactId>
30
+			<version>1.7.4</version>
31
+		</dependency>
32
+        <dependency>
33
+			<groupId>org.apache.hadoop</groupId>
34
+			<artifactId>hadoop-common</artifactId>
35
+			<version>2.0.0-cdh4.1.2</version>
36
+		</dependency>
37
+        <dependency>
38
+            <groupId>org.apache.hadoop</groupId>
39
+            <artifactId>hadoop-core</artifactId>
40
+            <version>2.0.0-mr1-cdh4.1.2</version>
41
+            <type>jar</type>
42
+        </dependency>
43
+        <dependency>
44
+            <groupId>com.googlecode.json-simple</groupId>
45
+            <artifactId>json-simple</artifactId>
46
+            <version>1.1</version>
47
+            <type>jar</type>
48
+        </dependency>
49
+        <dependency>
50
+			<groupId>junit</groupId>
51
+			<artifactId>junit</artifactId>
52
+			<version>4.10</version>
53
+			<scope>test</scope>
54
+		</dependency>
55
+        <dependency>
56
+			<groupId>org.antlr</groupId>
57
+			<artifactId>antlr-runtime</artifactId>
58
+			<version>3.4</version>
59
+		</dependency>
60
+        <dependency>
61
+            <groupId>eu.dnetlib</groupId>
62
+            <artifactId>icm-iis-core</artifactId>
63
+            <version>0.0.1-SNAPSHOT</version>
64
+            <type>jar</type>
65
+        </dependency>
66
+	</dependencies>
67
+	<build>
68
+        <plugins>
69
+			<plugin>
70
+				<groupId>org.apache.maven.plugins</groupId>
71
+				<artifactId>maven-compiler-plugin</artifactId>
72
+				<configuration>
73
+					<source>1.6</source>
74
+					<target>1.6</target>
75
+				</configuration>
76
+			</plugin>
77
+            <plugin>
78
+                <groupId>org.apache.maven.plugins</groupId>
79
+                <artifactId>maven-surefire-plugin</artifactId>
80
+                <version>2.4.3</version>
81
+                <configuration>
82
+                  <redirectTestOutputToFile>true</redirectTestOutputToFile>
83
+                    <includes>
84
+                        <include>**/*Test.java</include>
85
+                        <include>**/avro/Test*.java</include>
86
+                    </includes>
87
+                    <excludes>
88
+                        <exclude>**/AllTests.java</exclude>
89
+                        <exclude>**/Abstract*Test.java</exclude>
90
+                        <exclude>**/*$*</exclude>
91
+                    </excludes>
92
+                </configuration>
93
+            </plugin>
94
+		</plugins>
95
+	</build>
96
+	<repositories>
97
+        <repository>
98
+			<id>cloudera</id>
99
+			<name>Cloudera Repository</name>
100
+			<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
101
+			<releases>
102
+				<enabled>true</enabled>
103
+			</releases>
104
+			<snapshots>
105
+				<enabled>false</enabled>
106
+			</snapshots>
107
+		</repository>
108
+	    <!-- This repository contains our patched 
109
+	    version of "avro" and "avro-mapred" modules (see the dependencies section)
110
+	    This entry might be removed when the patch to these modules becomes 
111
+	    a part of the official Avro release.-->
112
+	    <repository>
113
+			<id>dnet-deps</id>
114
+			<name>dnet dependencies</name>
115
+			<url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet-deps</url>
116
+			<releases>
117
+				<enabled>true</enabled>
118
+			</releases>
119
+			<snapshots>
120
+				<enabled>false</enabled>
121
+			</snapshots>
122
+			<layout>default</layout>
123
+		</repository>
124
+	</repositories>
125
+</project>
126
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
127
--- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java	2013-06-11 01:48:03.516933061 +0200
128
+++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java	2013-06-11 00:11:55.000000000 +0200
129
@@ -102,6 +102,7 @@ public class AvroSchema2Pig {
130
 
131
                 tupleSchema.setFields(childFields);
132
                 fieldSchema.setSchema(tupleSchema);
133
+                visitedRecords.remove(in);
134
             }
135
 
136
         } else if (avroType.equals(Schema.Type.ARRAY)) {
137
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
138
--- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java	2013-06-11 01:48:03.516933061 +0200
139
+++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java	2013-06-11 00:12:54.000000000 +0200
140
@@ -17,6 +17,7 @@
141
 
142
 package org.apache.pig.piggybank.storage.avro;
143
 
144
+import eu.dnetlib.iis.core.common.AvroUtils;
145
 import java.io.IOException;
146
 import java.io.InputStream;
147
 import java.util.ArrayList;
148
@@ -32,6 +33,8 @@ import org.apache.avro.Schema;
149
 import org.apache.avro.Schema.Field;
150
 import org.apache.avro.file.DataFileStream;
151
 import org.apache.avro.generic.GenericDatumReader;
152
+import org.apache.avro.specific.SpecificRecordBase;
153
+import org.apache.commons.lang.StringUtils;
154
 import org.apache.hadoop.conf.Configuration;
155
 import org.apache.hadoop.fs.FileStatus;
156
 import org.apache.hadoop.fs.FileSystem;
157
@@ -105,6 +108,9 @@ public class AvroStorage extends FileInp
158
     private boolean checkSchema = true; /*whether check schema of input directories*/
159
     private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
160
 
161
+    private String inputSchemaFile;
162
+    private String outputSchemaFile;
163
+    
164
     /**
165
      * Empty constructor. Output schema is derived from pig schema.
166
      */
167
@@ -122,7 +128,7 @@ public class AvroStorage extends FileInp
168
      * @throws IOException
169
      * @throws ParseException
170
      */
171
-    public AvroStorage(String[] parts) throws IOException, ParseException {
172
+    public AvroStorage(String[] parts) throws IOException, ParseException, ClassNotFoundException, InstantiationException, IllegalAccessException {
173
         outputAvroSchema = null;
174
         nullable = true;
175
         checkSchema = true;
176
@@ -147,13 +153,18 @@ public class AvroStorage extends FileInp
177
      */
178
     @Override
179
     public void setLocation(String location, Job job) throws IOException {
180
-        if (inputAvroSchema != null) {
181
-            return;
182
-        }
183
         Set<Path> paths = new HashSet<Path>();
184
         Configuration conf = job.getConfiguration();
185
         if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
186
-            setInputAvroSchema(paths, conf);
187
+             if (inputAvroSchema == null) {
188
+                 if (inputSchemaFile != null) {
189
+                    FileSystem fs = FileSystem.get(job.getConfiguration());
190
+                    Path path = fs.makeQualified(new Path(inputSchemaFile));
191
+                    inputAvroSchema = getSchemaFromFile(path, fs);
192
+                } else {
193
+                    setInputAvroSchema(paths, conf);
194
+                }
195
+            }
196
             FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
197
         } else {
198
             throw new IOException("Input path \'" + location + "\' is not found");
199
@@ -416,7 +427,6 @@ public class AvroStorage extends FileInp
200
      */
201
     @SuppressWarnings("unchecked")
202
     protected Map<String, Object> parseJsonString(String jsonString) throws ParseException {
203
-
204
         /*parse the json object */
205
         JSONParser parser = new JSONParser();
206
         JSONObject obj = (JSONObject) parser.parse(jsonString);
207
@@ -450,7 +460,6 @@ public class AvroStorage extends FileInp
208
      * @throws ParseException
209
      */
210
     protected Map<String, Object> parseStringList(String[] parts) throws IOException {
211
-
212
         Map<String, Object> map = new HashMap<String, Object>();
213
 
214
         for (int i = 0; i < parts.length; ) {
215
@@ -477,6 +486,10 @@ public class AvroStorage extends FileInp
216
                              || name.equalsIgnoreCase("same")
217
                              || name.equalsIgnoreCase("schema")
218
                              || name.equalsIgnoreCase("schema_file")
219
+                             || name.equalsIgnoreCase("input_schema_file")
220
+                             || name.equalsIgnoreCase("output_schema_file")
221
+                             || name.equalsIgnoreCase("input_schema_class")
222
+                             || name.equalsIgnoreCase("output_schema_class")
223
                              || name.matches("field\\d+")) {
224
                     /* store value as string */
225
                     map.put(name, value);
226
@@ -496,8 +509,7 @@ public class AvroStorage extends FileInp
227
     /**
228
      * Initialize output avro schema using input property map
229
      */
230
-    protected void init(Map<String, Object> inputs) throws IOException {
231
-
232
+    protected void init(Map<String, Object> inputs) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
233
         /*used to store field schemas */
234
         List<Field> fields = null;
235
 
236
@@ -517,11 +529,18 @@ public class AvroStorage extends FileInp
237
         }
238
         else if (inputs.containsKey("schema_file")) {
239
             Path path = new Path((String) inputs.get("schema_file"));
240
+            
241
             AvroStorageLog.details("schemaFile path=" + path.toUri().toString());
242
             FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
243
             Schema schema = getSchemaFromFile(path, fs);
244
             schemaManager = new AvroSchemaManager(schema);
245
         }
246
+        else if (inputs.containsKey("input_schema_file")) {
247
+            inputSchemaFile = (String) inputs.get("input_schema_file");
248
+        }
249
+        else if (inputs.containsKey("output_schema_file")) {
250
+            outputSchemaFile = (String) inputs.get("output_schema_file");
251
+        }
252
 
253
         /* iterate input property map */
254
         for (Entry<String, Object> entry : inputs.entrySet()) {
255
@@ -541,6 +560,10 @@ public class AvroStorage extends FileInp
256
                 nullable = (Boolean) value;
257
             } else if (name.equalsIgnoreCase("schema")) {
258
                 outputAvroSchema = Schema.parse((String) value);
259
+            } else if (name.equalsIgnoreCase("input_schema_class")) {
260
+                inputAvroSchema = AvroUtils.toSchema((String) value);
261
+            } else if (name.equalsIgnoreCase("output_schema_class")) {
262
+                outputAvroSchema = AvroUtils.toSchema((String) value);
263
             } else if (name.matches("field\\d+")) {
264
                 /*set schema of dth field */
265
                 if (fields == null)
266
@@ -579,6 +602,8 @@ public class AvroStorage extends FileInp
267
                 fields.add(field);
268
             } else if (!name.equalsIgnoreCase("data")
269
                                   && !name.equalsIgnoreCase("schema_file")
270
+                                  && !name.equalsIgnoreCase("input_schema_file")
271
+                                  && !name.equalsIgnoreCase("output_schema_file")
272
                                   && !name.equalsIgnoreCase("debug")) {
273
                 throw new IOException("Invalid parameter:" + name);
274
             }
275
@@ -599,7 +624,6 @@ public class AvroStorage extends FileInp
276
                 nullable = true;
277
             }
278
         }
279
-
280
     }
281
 
282
     @Override
283
@@ -609,6 +633,31 @@ public class AvroStorage extends FileInp
284
 
285
     @Override
286
     public void setStoreLocation(String location, Job job) throws IOException {
287
+        if (outputSchemaFile != null) {
288
+            FileSystem fs = FileSystem.get(job.getConfiguration());
289
+            Path path = fs.makeQualified(new Path(outputSchemaFile));
290
+
291
+            outputAvroSchema = getSchemaFromFile(path, fs);
292
+
293
+            UDFContext context = UDFContext.getUDFContext();
294
+            Properties property = context.getUDFProperties(ResourceSchema.class);
295
+        
296
+            String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
297
+            String key = getSchemaKey();
298
+            Map<String, String> schemaMap = (prevSchemaStr != null)
299
+                                                                ? parseSchemaMap(prevSchemaStr)
300
+                                                                : new HashMap<String, String>();
301
+            schemaMap.put(key, outputAvroSchema.toString());
302
+            
303
+            List<String> schemas = new ArrayList<String>();
304
+            for (Map.Entry<String, String> entry : schemaMap.entrySet()) {
305
+                schemas.add(entry.getKey() + AvroStorage.SCHEMA_KEYVALUE_DELIM + entry.getValue());
306
+            }
307
+            
308
+            String newSchemaStr = StringUtils.join(schemas, AvroStorage.SCHEMA_DELIM);
309
+            property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
310
+        }
311
+                
312
         AvroStorageLog.details("output location=" + location);
313
         FileOutputFormat.setOutputPath(job, new Path(location));
314
     }
315
@@ -621,6 +670,7 @@ public class AvroStorage extends FileInp
316
         AvroStorageLog.funcCall("Check schema");
317
         UDFContext context = UDFContext.getUDFContext();
318
         Properties property = context.getUDFProperties(ResourceSchema.class);
319
+
320
         String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
321
         AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
322
 
323
@@ -683,6 +733,7 @@ public class AvroStorage extends FileInp
324
         UDFContext context = UDFContext.getUDFContext();
325
         Properties property = context.getUDFProperties(ResourceSchema.class);
326
         String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
327
+      
328
         Map<String, String> map = (allSchemaStr != null)  ? parseSchemaMap(allSchemaStr) : null;
329
 
330
         String key = getSchemaKey();
331
@@ -711,7 +762,7 @@ public class AvroStorage extends FileInp
332
         StoreFunc.cleanupOnFailureImpl(location, job);
333
     }
334
 
335
-    @Override
336
+    //@Override
337
     public void cleanupOnSuccess(String location, Job job) throws IOException {
338
         // Nothing to do
339
     }
340
@@ -719,7 +770,7 @@ public class AvroStorage extends FileInp
341
     @Override
342
     public void putNext(Tuple t) throws IOException {
343
         try {
344
-            this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t);
345
+            this.writer.write(NullWritable.get(), t);
346
         } catch (InterruptedException e) {
347
             e.printStackTrace();
348
         }
349
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro
350
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro	1970-01-01 01:00:00.000000000 +0100
351
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro	2013-06-11 01:28:15.576901251 +0200
352
@@ -0,0 +1 @@
353
+Objavro.schema?{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]}?w?(d`??????0?????????w?(d`??????
354
\ Brak znaku nowej linii na końcu pliku
355
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro
356
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro	1970-01-01 01:00:00.000000000 +0100
357
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro	2013-06-11 01:16:41.648882666 +0200
358
@@ -0,0 +1 @@
359
+Objavro.schema?{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]}?w?(d`??????0?????????w?(d`??????
360
\ Brak znaku nowej linii na końcu pliku
361
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc
362
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc	1970-01-01 01:00:00.000000000 +0100
363
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc	2013-06-11 01:16:56.912883076 +0200
364
@@ -0,0 +1,8 @@
365
+{
366
+    "type": "record",
367
+    "name": "simple",
368
+    "fields": [ 
369
+        {"name": "member_id", "type": "int"},
370
+        {"name": "count", "type": "long"}
371
+        ]
372
+}
373
\ Brak znaku nowej linii na końcu pliku
374
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
375
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java	2013-06-11 01:48:03.512933062 +0200
376
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java	2013-06-11 01:23:38.920893841 +0200
377
@@ -35,7 +35,6 @@ import org.apache.pig.backend.hadoop.exe
378
 import org.apache.pig.impl.logicalLayer.FrontendException;
379
 import org.apache.pig.piggybank.storage.avro.AvroStorage;
380
 import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
381
-import org.apache.pig.test.Util;
382
 import org.junit.AfterClass;
383
 import org.junit.Assert;
384
 import org.junit.BeforeClass;
385
@@ -81,7 +80,9 @@ public class TestAvroStorage {
386
     final private String testNoMatchedFiles = getInputFile("test_dir{1,2}/file_that_does_not_exist*.avro");
387
     final private String testArrayFile = getInputFile("test_array.avro");
388
     final private String testRecordFile = getInputFile("test_record.avro");
389
+    final private String testSimpleFile = getInputFile("test_simple.avro");
390
     final private String testRecordSchema = getInputFile("test_record.avsc");
391
+    final private String testSimpleRecordSchema = getInputFile("test_simple_record.avsc");
392
     final private String testGenericUnionFile = getInputFile("test_generic_union.avro");
393
     final private String testRecursiveRecordInMap = getInputFile("test_recursive_record_in_map.avro");
394
     final private String testRecursiveRecordInArray = getInputFile("test_recursive_record_in_array.avro");
395
@@ -973,6 +974,76 @@ public class TestAvroStorage {
396
         verifyResults(output, expected);
397
     }
398
 
399
+    @Test
400
+    public void testOutputSchemaFile() throws IOException {
401
+        String output1= outbasedir + "testOutputSchemaFile";
402
+        String expected1 = basedir + "expected_testRecordSplit3.avro";
403
+        deleteDirectory(new File(output1));
404
+        String [] queries = {
405
+           " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
406
+           " groups = GROUP avro BY member_id;",
407
+           " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
408
+           " STORE sc INTO '" + output1 +
409
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
410
+              " 'no_schema_check'," +
411
+              " 'output_schema_file', '" + testSimpleRecordSchema + "' );"
412
+            };
413
+        testAvroStorage( queries);
414
+        verifyResults(output1, expected1);
415
+    }
416
+    
417
+    @Test
418
+    public void testInputSchemaFile() throws IOException {
419
+        String output1= outbasedir + "testInputSchemaFile";
420
+        String expected1 = basedir + "expected_testRecordSplit3.avro";
421
+        deleteDirectory(new File(output1));
422
+        String [] queries = {
423
+           " avro = LOAD '" + testSimpleFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('input_schema_file', '" + testSimpleRecordSchema + "');",
424
+           " STORE avro INTO '" + output1 +
425
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
426
+              " 'no_schema_check'," +
427
+              " 'output_schema_file', '" + testSimpleRecordSchema + "' );"
428
+            };
429
+        testAvroStorage( queries);
430
+        verifyResults(output1, expected1);
431
+    }
432
+  
433
+    @Test
434
+    public void testInputSchemaClass() throws IOException {
435
+        String output= outbasedir + "testInputSchemaClass";
436
+        String test = getInputFile("expected_testRecursiveRecordReference1.avro");
437
+        String expected = basedir + "expected_testRecursiveRecordReference1.avro";
438
+        deleteDirectory(new File(output));
439
+        String [] queries = {
440
+          " in = LOAD '" + test +
441
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+
442
+                " 'input_schema_class', 'org.apache.avro.Schema.Type.INT' );",
443
+          " STORE in INTO '" + output +
444
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
445
+              " 'schema', '\"int\"' );"
446
+           };
447
+        testAvroStorage(queries);
448
+        verifyResults(output, expected);
449
+    }
450
+    
451
+    @Test
452
+    public void testOutputSchemaClass() throws IOException {
453
+        String output= outbasedir + "testOutputSchemaClass";
454
+        String test = getInputFile("expected_testRecursiveRecordReference1.avro");
455
+        String expected = basedir + "expected_testRecursiveRecordReference1.avro";
456
+        deleteDirectory(new File(output));
457
+        String [] queries = {
458
+          " in = LOAD '" + test +
459
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+
460
+                " 'schema', '\"int\"' );",
461
+          " STORE in INTO '" + output +
462
+              "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
463
+              " 'output_schema_class', 'org.apache.avro.Schema.Type.INT' );"
464
+           };
465
+        testAvroStorage(queries);
466
+        verifyResults(output, expected);
467
+    }
468
+    
469
     private static void deleteDirectory (File path) {
470
         if ( path.exists()) {
471
             File [] files = path.listFiles();
472
@@ -1014,8 +1085,8 @@ public class TestAvroStorage {
473
 
474
     private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException {
475
         // Seems compress for Avro is broken in 23. Skip this test and open Jira PIG-
476
-        if (Util.isHadoop23())
477
-            return;
478
+//        if (Util.isHadoop23())
479
+//            return;
480
 
481
         FileSystem fs = FileSystem.getLocal(new Configuration()) ;
482
 
(2-2/4)