diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/pom.xml trunk/pom.xml --- 3rdparty-original/piggybank/java/pom.xml 1970-01-01 01:00:00.000000000 +0100 +++ trunk/pom.xml 2013-06-11 01:02:24.676859717 +0200 @@ -0,0 +1,121 @@ + + + + + eu.dnetlib + dnet-parent + 0.0.1-SNAPSHOT + + 4.0.0 + icm-iis-3rdparty-pig-avrostorage + jar + + + UTF-8 + + + + org.apache.pig + pig + 0.10.0-cdh4.1.2 + + + org.apache.avro + avro + 1.7.4 + + + org.apache.hadoop + hadoop-common + 2.0.0-cdh4.1.2 + + + org.apache.hadoop + hadoop-core + 2.0.0-mr1-cdh4.1.2 + jar + + + com.googlecode.json-simple + json-simple + 1.1 + jar + + + junit + junit + 4.10 + test + + + org.antlr + antlr-runtime + 3.4 + + + eu.dnetlib + icm-iis-core + 0.0.1-SNAPSHOT + jar + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + 1.6 + 1.6 + + + + org.apache.maven.plugins + maven-surefire-plugin + 2.4.3 + + true + + **/*Test.java + **/avro/Test*.java + + + **/AllTests.java + **/Abstract*Test.java + **/*$* + + + + + + + + cloudera + Cloudera Repository + https://repository.cloudera.com/artifactory/cloudera-repos + + true + + + false + + + + + dnet-deps + dnet dependencies + http://maven.research-infrastructures.eu/nexus/content/repositories/dnet-deps + + true + + + false + + default + + + diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java --- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java 2013-06-11 01:48:03.516933061 +0200 +++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java 2013-06-11 00:11:55.000000000 +0200 @@ -102,6 +102,7 @@ public class AvroSchema2Pig { tupleSchema.setFields(childFields); fieldSchema.setSchema(tupleSchema); + visitedRecords.remove(in); } } else if (avroType.equals(Schema.Type.ARRAY)) { diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java --- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java 2013-06-11 01:48:03.516933061 +0200 +++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java 2013-06-11 00:12:54.000000000 +0200 @@ -17,6 +17,7 @@ package org.apache.pig.piggybank.storage.avro; +import eu.dnetlib.iis.core.common.AvroUtils; import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; @@ -32,6 +33,8 @@ import org.apache.avro.Schema; import org.apache.avro.Schema.Field; import org.apache.avro.file.DataFileStream; import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.specific.SpecificRecordBase; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -105,6 +108,9 @@ public class AvroStorage extends FileInp private boolean checkSchema = true; /*whether check schema of input directories*/ private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */ + private String inputSchemaFile; + private String outputSchemaFile; + /** * Empty constructor. Output schema is derived from pig schema. */ @@ -122,7 +128,7 @@ public class AvroStorage extends FileInp * @throws IOException * @throws ParseException */ - public AvroStorage(String[] parts) throws IOException, ParseException { + public AvroStorage(String[] parts) throws IOException, ParseException, ClassNotFoundException, InstantiationException, IllegalAccessException { outputAvroSchema = null; nullable = true; checkSchema = true; @@ -147,13 +153,18 @@ public class AvroStorage extends FileInp */ @Override public void setLocation(String location, Job job) throws IOException { - if (inputAvroSchema != null) { - return; - } Set paths = new HashSet(); Configuration conf = job.getConfiguration(); if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) { - setInputAvroSchema(paths, conf); + if (inputAvroSchema == null) { + if (inputSchemaFile != null) { + FileSystem fs = FileSystem.get(job.getConfiguration()); + Path path = fs.makeQualified(new Path(inputSchemaFile)); + inputAvroSchema = getSchemaFromFile(path, fs); + } else { + setInputAvroSchema(paths, conf); + } + } FileInputFormat.setInputPaths(job, paths.toArray(new Path[0])); } else { throw new IOException("Input path \'" + location + "\' is not found"); @@ -416,7 +427,6 @@ public class AvroStorage extends FileInp */ @SuppressWarnings("unchecked") protected Map parseJsonString(String jsonString) throws ParseException { - /*parse the json object */ JSONParser parser = new JSONParser(); JSONObject obj = (JSONObject) parser.parse(jsonString); @@ -450,7 +460,6 @@ public class AvroStorage extends FileInp * @throws ParseException */ protected Map parseStringList(String[] parts) throws IOException { - Map map = new HashMap(); for (int i = 0; i < parts.length; ) { @@ -477,6 +486,10 @@ public class AvroStorage extends FileInp || name.equalsIgnoreCase("same") || name.equalsIgnoreCase("schema") || name.equalsIgnoreCase("schema_file") + || name.equalsIgnoreCase("input_schema_file") + || name.equalsIgnoreCase("output_schema_file") + || name.equalsIgnoreCase("input_schema_class") + || name.equalsIgnoreCase("output_schema_class") || name.matches("field\\d+")) { /* store value as string */ map.put(name, value); @@ -496,8 +509,7 @@ public class AvroStorage extends FileInp /** * Initialize output avro schema using input property map */ - protected void init(Map inputs) throws IOException { - + protected void init(Map inputs) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException { /*used to store field schemas */ List fields = null; @@ -517,11 +529,18 @@ public class AvroStorage extends FileInp } else if (inputs.containsKey("schema_file")) { Path path = new Path((String) inputs.get("schema_file")); + AvroStorageLog.details("schemaFile path=" + path.toUri().toString()); FileSystem fs = FileSystem.get(path.toUri(), new Configuration()); Schema schema = getSchemaFromFile(path, fs); schemaManager = new AvroSchemaManager(schema); } + else if (inputs.containsKey("input_schema_file")) { + inputSchemaFile = (String) inputs.get("input_schema_file"); + } + else if (inputs.containsKey("output_schema_file")) { + outputSchemaFile = (String) inputs.get("output_schema_file"); + } /* iterate input property map */ for (Entry entry : inputs.entrySet()) { @@ -541,6 +560,10 @@ public class AvroStorage extends FileInp nullable = (Boolean) value; } else if (name.equalsIgnoreCase("schema")) { outputAvroSchema = Schema.parse((String) value); + } else if (name.equalsIgnoreCase("input_schema_class")) { + inputAvroSchema = AvroUtils.toSchema((String) value); + } else if (name.equalsIgnoreCase("output_schema_class")) { + outputAvroSchema = AvroUtils.toSchema((String) value); } else if (name.matches("field\\d+")) { /*set schema of dth field */ if (fields == null) @@ -579,6 +602,8 @@ public class AvroStorage extends FileInp fields.add(field); } else if (!name.equalsIgnoreCase("data") && !name.equalsIgnoreCase("schema_file") + && !name.equalsIgnoreCase("input_schema_file") + && !name.equalsIgnoreCase("output_schema_file") && !name.equalsIgnoreCase("debug")) { throw new IOException("Invalid parameter:" + name); } @@ -599,7 +624,6 @@ public class AvroStorage extends FileInp nullable = true; } } - } @Override @@ -609,6 +633,31 @@ public class AvroStorage extends FileInp @Override public void setStoreLocation(String location, Job job) throws IOException { + if (outputSchemaFile != null) { + FileSystem fs = FileSystem.get(job.getConfiguration()); + Path path = fs.makeQualified(new Path(outputSchemaFile)); + + outputAvroSchema = getSchemaFromFile(path, fs); + + UDFContext context = UDFContext.getUDFContext(); + Properties property = context.getUDFProperties(ResourceSchema.class); + + String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY); + String key = getSchemaKey(); + Map schemaMap = (prevSchemaStr != null) + ? parseSchemaMap(prevSchemaStr) + : new HashMap(); + schemaMap.put(key, outputAvroSchema.toString()); + + List schemas = new ArrayList(); + for (Map.Entry entry : schemaMap.entrySet()) { + schemas.add(entry.getKey() + AvroStorage.SCHEMA_KEYVALUE_DELIM + entry.getValue()); + } + + String newSchemaStr = StringUtils.join(schemas, AvroStorage.SCHEMA_DELIM); + property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr); + } + AvroStorageLog.details("output location=" + location); FileOutputFormat.setOutputPath(job, new Path(location)); } @@ -621,6 +670,7 @@ public class AvroStorage extends FileInp AvroStorageLog.funcCall("Check schema"); UDFContext context = UDFContext.getUDFContext(); Properties property = context.getUDFProperties(ResourceSchema.class); + String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY); AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr); @@ -683,6 +733,7 @@ public class AvroStorage extends FileInp UDFContext context = UDFContext.getUDFContext(); Properties property = context.getUDFProperties(ResourceSchema.class); String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY); + Map map = (allSchemaStr != null) ? parseSchemaMap(allSchemaStr) : null; String key = getSchemaKey(); @@ -711,7 +762,7 @@ public class AvroStorage extends FileInp StoreFunc.cleanupOnFailureImpl(location, job); } - @Override + //@Override public void cleanupOnSuccess(String location, Job job) throws IOException { // Nothing to do } @@ -719,7 +770,7 @@ public class AvroStorage extends FileInp @Override public void putNext(Tuple t) throws IOException { try { - this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t); + this.writer.write(NullWritable.get(), t); } catch (InterruptedException e) { e.printStackTrace(); } diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro --- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro 1970-01-01 01:00:00.000000000 +0100 +++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro 2013-06-11 01:28:15.576901251 +0200 @@ -0,0 +1 @@ +Objavro.schemaÚ{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]}»w„(d`çä×ô§¢0öüŠ¸¼ÊÐÒ»w„(d`çä×ô§¢ \ Brak znaku nowej linii na koÅ„cu pliku diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro --- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro 1970-01-01 01:00:00.000000000 +0100 +++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro 2013-06-11 01:16:41.648882666 +0200 @@ -0,0 +1 @@ +Objavro.schemaÚ{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]}»w„(d`çä×ô§¢0öüŠ¸¼ÊÐÒ»w„(d`çä×ô§¢ \ Brak znaku nowej linii na koÅ„cu pliku diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc --- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc 1970-01-01 01:00:00.000000000 +0100 +++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc 2013-06-11 01:16:56.912883076 +0200 @@ -0,0 +1,8 @@ +{ + "type": "record", + "name": "simple", + "fields": [ + {"name": "member_id", "type": "int"}, + {"name": "count", "type": "long"} + ] +} \ Brak znaku nowej linii na koÅ„cu pliku diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java --- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java 2013-06-11 01:48:03.512933062 +0200 +++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java 2013-06-11 01:23:38.920893841 +0200 @@ -35,7 +35,6 @@ import org.apache.pig.backend.hadoop.exe import org.apache.pig.impl.logicalLayer.FrontendException; import org.apache.pig.piggybank.storage.avro.AvroStorage; import org.apache.pig.piggybank.storage.avro.PigSchema2Avro; -import org.apache.pig.test.Util; import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; @@ -81,7 +80,9 @@ public class TestAvroStorage { final private String testNoMatchedFiles = getInputFile("test_dir{1,2}/file_that_does_not_exist*.avro"); final private String testArrayFile = getInputFile("test_array.avro"); final private String testRecordFile = getInputFile("test_record.avro"); + final private String testSimpleFile = getInputFile("test_simple.avro"); final private String testRecordSchema = getInputFile("test_record.avsc"); + final private String testSimpleRecordSchema = getInputFile("test_simple_record.avsc"); final private String testGenericUnionFile = getInputFile("test_generic_union.avro"); final private String testRecursiveRecordInMap = getInputFile("test_recursive_record_in_map.avro"); final private String testRecursiveRecordInArray = getInputFile("test_recursive_record_in_array.avro"); @@ -973,6 +974,76 @@ public class TestAvroStorage { verifyResults(output, expected); } + @Test + public void testOutputSchemaFile() throws IOException { + String output1= outbasedir + "testOutputSchemaFile"; + String expected1 = basedir + "expected_testRecordSplit3.avro"; + deleteDirectory(new File(output1)); + String [] queries = { + " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();", + " groups = GROUP avro BY member_id;", + " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;", + " STORE sc INTO '" + output1 + + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" + + " 'no_schema_check'," + + " 'output_schema_file', '" + testSimpleRecordSchema + "' );" + }; + testAvroStorage( queries); + verifyResults(output1, expected1); + } + + @Test + public void testInputSchemaFile() throws IOException { + String output1= outbasedir + "testInputSchemaFile"; + String expected1 = basedir + "expected_testRecordSplit3.avro"; + deleteDirectory(new File(output1)); + String [] queries = { + " avro = LOAD '" + testSimpleFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('input_schema_file', '" + testSimpleRecordSchema + "');", + " STORE avro INTO '" + output1 + + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" + + " 'no_schema_check'," + + " 'output_schema_file', '" + testSimpleRecordSchema + "' );" + }; + testAvroStorage( queries); + verifyResults(output1, expected1); + } + + @Test + public void testInputSchemaClass() throws IOException { + String output= outbasedir + "testInputSchemaClass"; + String test = getInputFile("expected_testRecursiveRecordReference1.avro"); + String expected = basedir + "expected_testRecursiveRecordReference1.avro"; + deleteDirectory(new File(output)); + String [] queries = { + " in = LOAD '" + test + + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+ + " 'input_schema_class', 'org.apache.avro.Schema.Type.INT' );", + " STORE in INTO '" + output + + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" + + " 'schema', '\"int\"' );" + }; + testAvroStorage(queries); + verifyResults(output, expected); + } + + @Test + public void testOutputSchemaClass() throws IOException { + String output= outbasedir + "testOutputSchemaClass"; + String test = getInputFile("expected_testRecursiveRecordReference1.avro"); + String expected = basedir + "expected_testRecursiveRecordReference1.avro"; + deleteDirectory(new File(output)); + String [] queries = { + " in = LOAD '" + test + + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+ + " 'schema', '\"int\"' );", + " STORE in INTO '" + output + + "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" + + " 'output_schema_class', 'org.apache.avro.Schema.Type.INT' );" + }; + testAvroStorage(queries); + verifyResults(output, expected); + } + private static void deleteDirectory (File path) { if ( path.exists()) { File [] files = path.listFiles(); @@ -1014,8 +1085,8 @@ public class TestAvroStorage { private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException { // Seems compress for Avro is broken in 23. Skip this test and open Jira PIG- - if (Util.isHadoop23()) - return; +// if (Util.isHadoop23()) +// return; FileSystem fs = FileSystem.getLocal(new Configuration()) ;