diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/pom.xml trunk/pom.xml
--- 3rdparty-original/piggybank/java/pom.xml 1970-01-01 01:00:00.000000000 +0100
+++ trunk/pom.xml 2013-06-11 01:02:24.676859717 +0200
@@ -0,0 +1,121 @@
+
+
+
+
+ eu.dnetlib
+ dnet-parent
+ 0.0.1-SNAPSHOT
+
+ 4.0.0
+ icm-iis-3rdparty-pig-avrostorage
+ jar
+
+
+ UTF-8
+
+
+
+ org.apache.pig
+ pig
+ 0.10.0-cdh4.1.2
+
+
+ org.apache.avro
+ avro
+ 1.7.4
+
+
+ org.apache.hadoop
+ hadoop-common
+ 2.0.0-cdh4.1.2
+
+
+ org.apache.hadoop
+ hadoop-core
+ 2.0.0-mr1-cdh4.1.2
+ jar
+
+
+ com.googlecode.json-simple
+ json-simple
+ 1.1
+ jar
+
+
+ junit
+ junit
+ 4.10
+ test
+
+
+ org.antlr
+ antlr-runtime
+ 3.4
+
+
+ eu.dnetlib
+ icm-iis-core
+ 0.0.1-SNAPSHOT
+ jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+
+
+ 1.6
+
+
+
+ org.apache.maven.plugins
+ maven-surefire-plugin
+ 2.4.3
+
+ true
+
+ **/*Test.java
+ **/avro/Test*.java
+
+
+ **/AllTests.java
+ **/Abstract*Test.java
+ **/*$*
+
+
+
+
+
+
+
+ cloudera
+ Cloudera Repository
+ https://repository.cloudera.com/artifactory/cloudera-repos
+
+ true
+
+
+ false
+
+
+
+
+ dnet-deps
+ dnet dependencies
+ http://maven.research-infrastructures.eu/nexus/content/repositories/dnet-deps
+
+ true
+
+
+ false
+
+ default
+
+
+
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
--- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java 2013-06-11 01:48:03.516933061 +0200
+++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java 2013-06-11 00:11:55.000000000 +0200
@@ -102,6 +102,7 @@ public class AvroSchema2Pig {
tupleSchema.setFields(childFields);
fieldSchema.setSchema(tupleSchema);
+ visitedRecords.remove(in);
}
} else if (avroType.equals(Schema.Type.ARRAY)) {
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
--- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java 2013-06-11 01:48:03.516933061 +0200
+++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java 2013-06-11 00:12:54.000000000 +0200
@@ -17,6 +17,7 @@
package org.apache.pig.piggybank.storage.avro;
+import eu.dnetlib.iis.core.common.AvroUtils;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
@@ -32,6 +33,8 @@ import org.apache.avro.Schema;
import org.apache.avro.Schema.Field;
import org.apache.avro.file.DataFileStream;
import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.specific.SpecificRecordBase;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -105,6 +108,9 @@ public class AvroStorage extends FileInp
private boolean checkSchema = true; /*whether check schema of input directories*/
private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
+ private String inputSchemaFile;
+ private String outputSchemaFile;
+
/**
* Empty constructor. Output schema is derived from pig schema.
*/
@@ -122,7 +128,7 @@ public class AvroStorage extends FileInp
* @throws IOException
* @throws ParseException
*/
- public AvroStorage(String[] parts) throws IOException, ParseException {
+ public AvroStorage(String[] parts) throws IOException, ParseException, ClassNotFoundException, InstantiationException, IllegalAccessException {
outputAvroSchema = null;
nullable = true;
checkSchema = true;
@@ -147,13 +153,18 @@ public class AvroStorage extends FileInp
*/
@Override
public void setLocation(String location, Job job) throws IOException {
- if (inputAvroSchema != null) {
- return;
- }
Set paths = new HashSet();
Configuration conf = job.getConfiguration();
if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
- setInputAvroSchema(paths, conf);
+ if (inputAvroSchema == null) {
+ if (inputSchemaFile != null) {
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ Path path = fs.makeQualified(new Path(inputSchemaFile));
+ inputAvroSchema = getSchemaFromFile(path, fs);
+ } else {
+ setInputAvroSchema(paths, conf);
+ }
+ }
FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
} else {
throw new IOException("Input path \'" + location + "\' is not found");
@@ -416,7 +427,6 @@ public class AvroStorage extends FileInp
*/
@SuppressWarnings("unchecked")
protected Map parseJsonString(String jsonString) throws ParseException {
-
/*parse the json object */
JSONParser parser = new JSONParser();
JSONObject obj = (JSONObject) parser.parse(jsonString);
@@ -450,7 +460,6 @@ public class AvroStorage extends FileInp
* @throws ParseException
*/
protected Map parseStringList(String[] parts) throws IOException {
-
Map map = new HashMap();
for (int i = 0; i < parts.length; ) {
@@ -477,6 +486,10 @@ public class AvroStorage extends FileInp
|| name.equalsIgnoreCase("same")
|| name.equalsIgnoreCase("schema")
|| name.equalsIgnoreCase("schema_file")
+ || name.equalsIgnoreCase("input_schema_file")
+ || name.equalsIgnoreCase("output_schema_file")
+ || name.equalsIgnoreCase("input_schema_class")
+ || name.equalsIgnoreCase("output_schema_class")
|| name.matches("field\\d+")) {
/* store value as string */
map.put(name, value);
@@ -496,8 +509,7 @@ public class AvroStorage extends FileInp
/**
* Initialize output avro schema using input property map
*/
- protected void init(Map inputs) throws IOException {
-
+ protected void init(Map inputs) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
/*used to store field schemas */
List fields = null;
@@ -517,11 +529,18 @@ public class AvroStorage extends FileInp
}
else if (inputs.containsKey("schema_file")) {
Path path = new Path((String) inputs.get("schema_file"));
+
AvroStorageLog.details("schemaFile path=" + path.toUri().toString());
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
Schema schema = getSchemaFromFile(path, fs);
schemaManager = new AvroSchemaManager(schema);
}
+ else if (inputs.containsKey("input_schema_file")) {
+ inputSchemaFile = (String) inputs.get("input_schema_file");
+ }
+ else if (inputs.containsKey("output_schema_file")) {
+ outputSchemaFile = (String) inputs.get("output_schema_file");
+ }
/* iterate input property map */
for (Entry entry : inputs.entrySet()) {
@@ -541,6 +560,10 @@ public class AvroStorage extends FileInp
nullable = (Boolean) value;
} else if (name.equalsIgnoreCase("schema")) {
outputAvroSchema = Schema.parse((String) value);
+ } else if (name.equalsIgnoreCase("input_schema_class")) {
+ inputAvroSchema = AvroUtils.toSchema((String) value);
+ } else if (name.equalsIgnoreCase("output_schema_class")) {
+ outputAvroSchema = AvroUtils.toSchema((String) value);
} else if (name.matches("field\\d+")) {
/*set schema of dth field */
if (fields == null)
@@ -579,6 +602,8 @@ public class AvroStorage extends FileInp
fields.add(field);
} else if (!name.equalsIgnoreCase("data")
&& !name.equalsIgnoreCase("schema_file")
+ && !name.equalsIgnoreCase("input_schema_file")
+ && !name.equalsIgnoreCase("output_schema_file")
&& !name.equalsIgnoreCase("debug")) {
throw new IOException("Invalid parameter:" + name);
}
@@ -599,7 +624,6 @@ public class AvroStorage extends FileInp
nullable = true;
}
}
-
}
@Override
@@ -609,6 +633,31 @@ public class AvroStorage extends FileInp
@Override
public void setStoreLocation(String location, Job job) throws IOException {
+ if (outputSchemaFile != null) {
+ FileSystem fs = FileSystem.get(job.getConfiguration());
+ Path path = fs.makeQualified(new Path(outputSchemaFile));
+
+ outputAvroSchema = getSchemaFromFile(path, fs);
+
+ UDFContext context = UDFContext.getUDFContext();
+ Properties property = context.getUDFProperties(ResourceSchema.class);
+
+ String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
+ String key = getSchemaKey();
+ Map schemaMap = (prevSchemaStr != null)
+ ? parseSchemaMap(prevSchemaStr)
+ : new HashMap();
+ schemaMap.put(key, outputAvroSchema.toString());
+
+ List schemas = new ArrayList();
+ for (Map.Entry entry : schemaMap.entrySet()) {
+ schemas.add(entry.getKey() + AvroStorage.SCHEMA_KEYVALUE_DELIM + entry.getValue());
+ }
+
+ String newSchemaStr = StringUtils.join(schemas, AvroStorage.SCHEMA_DELIM);
+ property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
+ }
+
AvroStorageLog.details("output location=" + location);
FileOutputFormat.setOutputPath(job, new Path(location));
}
@@ -621,6 +670,7 @@ public class AvroStorage extends FileInp
AvroStorageLog.funcCall("Check schema");
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(ResourceSchema.class);
+
String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
@@ -683,6 +733,7 @@ public class AvroStorage extends FileInp
UDFContext context = UDFContext.getUDFContext();
Properties property = context.getUDFProperties(ResourceSchema.class);
String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
+
Map map = (allSchemaStr != null) ? parseSchemaMap(allSchemaStr) : null;
String key = getSchemaKey();
@@ -711,7 +762,7 @@ public class AvroStorage extends FileInp
StoreFunc.cleanupOnFailureImpl(location, job);
}
- @Override
+ //@Override
public void cleanupOnSuccess(String location, Job job) throws IOException {
// Nothing to do
}
@@ -719,7 +770,7 @@ public class AvroStorage extends FileInp
@Override
public void putNext(Tuple t) throws IOException {
try {
- this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t);
+ this.writer.write(NullWritable.get(), t);
} catch (InterruptedException e) {
e.printStackTrace();
}
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro 1970-01-01 01:00:00.000000000 +0100
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro 2013-06-11 01:28:15.576901251 +0200
@@ -0,0 +1 @@
+Objavro.schemaÚ{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]} »w„(d`çä×ô§¢0öüŠ¸¼ÊÐÒ»w„(d`çä×ô§¢
\ Brak znaku nowej linii na końcu pliku
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro 1970-01-01 01:00:00.000000000 +0100
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro 2013-06-11 01:16:41.648882666 +0200
@@ -0,0 +1 @@
+Objavro.schemaÚ{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]} »w„(d`çä×ô§¢0öüŠ¸¼ÊÐÒ»w„(d`çä×ô§¢
\ Brak znaku nowej linii na końcu pliku
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc 1970-01-01 01:00:00.000000000 +0100
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc 2013-06-11 01:16:56.912883076 +0200
@@ -0,0 +1,8 @@
+{
+ "type": "record",
+ "name": "simple",
+ "fields": [
+ {"name": "member_id", "type": "int"},
+ {"name": "count", "type": "long"}
+ ]
+}
\ Brak znaku nowej linii na końcu pliku
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java 2013-06-11 01:48:03.512933062 +0200
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java 2013-06-11 01:23:38.920893841 +0200
@@ -35,7 +35,6 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.piggybank.storage.avro.AvroStorage;
import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
-import org.apache.pig.test.Util;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -81,7 +80,9 @@ public class TestAvroStorage {
final private String testNoMatchedFiles = getInputFile("test_dir{1,2}/file_that_does_not_exist*.avro");
final private String testArrayFile = getInputFile("test_array.avro");
final private String testRecordFile = getInputFile("test_record.avro");
+ final private String testSimpleFile = getInputFile("test_simple.avro");
final private String testRecordSchema = getInputFile("test_record.avsc");
+ final private String testSimpleRecordSchema = getInputFile("test_simple_record.avsc");
final private String testGenericUnionFile = getInputFile("test_generic_union.avro");
final private String testRecursiveRecordInMap = getInputFile("test_recursive_record_in_map.avro");
final private String testRecursiveRecordInArray = getInputFile("test_recursive_record_in_array.avro");
@@ -973,6 +974,76 @@ public class TestAvroStorage {
verifyResults(output, expected);
}
+ @Test
+ public void testOutputSchemaFile() throws IOException {
+ String output1= outbasedir + "testOutputSchemaFile";
+ String expected1 = basedir + "expected_testRecordSplit3.avro";
+ deleteDirectory(new File(output1));
+ String [] queries = {
+ " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
+ " groups = GROUP avro BY member_id;",
+ " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
+ " STORE sc INTO '" + output1 +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'output_schema_file', '" + testSimpleRecordSchema + "' );"
+ };
+ testAvroStorage( queries);
+ verifyResults(output1, expected1);
+ }
+
+ @Test
+ public void testInputSchemaFile() throws IOException {
+ String output1= outbasedir + "testInputSchemaFile";
+ String expected1 = basedir + "expected_testRecordSplit3.avro";
+ deleteDirectory(new File(output1));
+ String [] queries = {
+ " avro = LOAD '" + testSimpleFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('input_schema_file', '" + testSimpleRecordSchema + "');",
+ " STORE avro INTO '" + output1 +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'no_schema_check'," +
+ " 'output_schema_file', '" + testSimpleRecordSchema + "' );"
+ };
+ testAvroStorage( queries);
+ verifyResults(output1, expected1);
+ }
+
+ @Test
+ public void testInputSchemaClass() throws IOException {
+ String output= outbasedir + "testInputSchemaClass";
+ String test = getInputFile("expected_testRecursiveRecordReference1.avro");
+ String expected = basedir + "expected_testRecursiveRecordReference1.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + test +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+
+ " 'input_schema_class', 'org.apache.avro.Schema.Type.INT' );",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'schema', '\"int\"' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
+ @Test
+ public void testOutputSchemaClass() throws IOException {
+ String output= outbasedir + "testOutputSchemaClass";
+ String test = getInputFile("expected_testRecursiveRecordReference1.avro");
+ String expected = basedir + "expected_testRecursiveRecordReference1.avro";
+ deleteDirectory(new File(output));
+ String [] queries = {
+ " in = LOAD '" + test +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+
+ " 'schema', '\"int\"' );",
+ " STORE in INTO '" + output +
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
+ " 'output_schema_class', 'org.apache.avro.Schema.Type.INT' );"
+ };
+ testAvroStorage(queries);
+ verifyResults(output, expected);
+ }
+
private static void deleteDirectory (File path) {
if ( path.exists()) {
File [] files = path.listFiles();
@@ -1014,8 +1085,8 @@ public class TestAvroStorage {
private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException {
// Seems compress for Avro is broken in 23. Skip this test and open Jira PIG-
- if (Util.isHadoop23())
- return;
+// if (Util.isHadoop23())
+// return;
FileSystem fs = FileSystem.getLocal(new Configuration()) ;