1 |
18910
|
dominika.t
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/pom.xml trunk/pom.xml
|
2 |
|
|
--- 3rdparty-original/piggybank/java/pom.xml 1970-01-01 01:00:00.000000000 +0100
|
3 |
|
|
+++ trunk/pom.xml 2013-06-11 01:02:24.676859717 +0200
|
4 |
|
|
@@ -0,0 +1,121 @@
|
5 |
|
|
+<?xml version="1.0" encoding="UTF-8"?>
|
6 |
|
|
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
7 |
|
|
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
|
8 |
|
|
+
|
9 |
|
|
+ <parent>
|
10 |
|
|
+ <groupId>eu.dnetlib</groupId>
|
11 |
|
|
+ <artifactId>dnet-parent</artifactId>
|
12 |
|
|
+ <version>0.0.1-SNAPSHOT</version>
|
13 |
|
|
+ </parent>
|
14 |
|
|
+ <modelVersion>4.0.0</modelVersion>
|
15 |
|
|
+ <artifactId>icm-iis-3rdparty-pig-avrostorage</artifactId>
|
16 |
|
|
+ <packaging>jar</packaging>
|
17 |
|
|
+ <!-- <version>0.0.1-SNAPSHOT</version> -->
|
18 |
|
|
+ <properties>
|
19 |
|
|
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
|
20 |
|
|
+ </properties>
|
21 |
|
|
+ <dependencies>
|
22 |
|
|
+ <dependency>
|
23 |
|
|
+ <groupId>org.apache.pig</groupId>
|
24 |
|
|
+ <artifactId>pig</artifactId>
|
25 |
|
|
+ <version>0.10.0-cdh4.1.2</version>
|
26 |
|
|
+ </dependency>
|
27 |
|
|
+ <dependency>
|
28 |
|
|
+ <groupId>org.apache.avro</groupId>
|
29 |
|
|
+ <artifactId>avro</artifactId>
|
30 |
|
|
+ <version>1.7.4</version>
|
31 |
|
|
+ </dependency>
|
32 |
|
|
+ <dependency>
|
33 |
|
|
+ <groupId>org.apache.hadoop</groupId>
|
34 |
|
|
+ <artifactId>hadoop-common</artifactId>
|
35 |
|
|
+ <version>2.0.0-cdh4.1.2</version>
|
36 |
|
|
+ </dependency>
|
37 |
|
|
+ <dependency>
|
38 |
|
|
+ <groupId>org.apache.hadoop</groupId>
|
39 |
|
|
+ <artifactId>hadoop-core</artifactId>
|
40 |
|
|
+ <version>2.0.0-mr1-cdh4.1.2</version>
|
41 |
|
|
+ <type>jar</type>
|
42 |
|
|
+ </dependency>
|
43 |
|
|
+ <dependency>
|
44 |
|
|
+ <groupId>com.googlecode.json-simple</groupId>
|
45 |
|
|
+ <artifactId>json-simple</artifactId>
|
46 |
|
|
+ <version>1.1</version>
|
47 |
|
|
+ <type>jar</type>
|
48 |
|
|
+ </dependency>
|
49 |
|
|
+ <dependency>
|
50 |
|
|
+ <groupId>junit</groupId>
|
51 |
|
|
+ <artifactId>junit</artifactId>
|
52 |
|
|
+ <version>4.10</version>
|
53 |
|
|
+ <scope>test</scope>
|
54 |
|
|
+ </dependency>
|
55 |
|
|
+ <dependency>
|
56 |
|
|
+ <groupId>org.antlr</groupId>
|
57 |
|
|
+ <artifactId>antlr-runtime</artifactId>
|
58 |
|
|
+ <version>3.4</version>
|
59 |
|
|
+ </dependency>
|
60 |
|
|
+ <dependency>
|
61 |
|
|
+ <groupId>eu.dnetlib</groupId>
|
62 |
|
|
+ <artifactId>icm-iis-core</artifactId>
|
63 |
|
|
+ <version>0.0.1-SNAPSHOT</version>
|
64 |
|
|
+ <type>jar</type>
|
65 |
|
|
+ </dependency>
|
66 |
|
|
+ </dependencies>
|
67 |
|
|
+ <build>
|
68 |
|
|
+ <plugins>
|
69 |
|
|
+ <plugin>
|
70 |
|
|
+ <groupId>org.apache.maven.plugins</groupId>
|
71 |
|
|
+ <artifactId>maven-compiler-plugin</artifactId>
|
72 |
|
|
+ <configuration>
|
73 |
|
|
+ <source>1.6</source>
|
74 |
|
|
+ <target>1.6</target>
|
75 |
|
|
+ </configuration>
|
76 |
|
|
+ </plugin>
|
77 |
|
|
+ <plugin>
|
78 |
|
|
+ <groupId>org.apache.maven.plugins</groupId>
|
79 |
|
|
+ <artifactId>maven-surefire-plugin</artifactId>
|
80 |
|
|
+ <version>2.4.3</version>
|
81 |
|
|
+ <configuration>
|
82 |
|
|
+ <redirectTestOutputToFile>true</redirectTestOutputToFile>
|
83 |
|
|
+ <includes>
|
84 |
|
|
+ <include>**/*Test.java</include>
|
85 |
|
|
+ <include>**/avro/Test*.java</include>
|
86 |
|
|
+ </includes>
|
87 |
|
|
+ <excludes>
|
88 |
|
|
+ <exclude>**/AllTests.java</exclude>
|
89 |
|
|
+ <exclude>**/Abstract*Test.java</exclude>
|
90 |
|
|
+ <exclude>**/*$*</exclude>
|
91 |
|
|
+ </excludes>
|
92 |
|
|
+ </configuration>
|
93 |
|
|
+ </plugin>
|
94 |
|
|
+ </plugins>
|
95 |
|
|
+ </build>
|
96 |
|
|
+ <repositories>
|
97 |
|
|
+ <repository>
|
98 |
|
|
+ <id>cloudera</id>
|
99 |
|
|
+ <name>Cloudera Repository</name>
|
100 |
|
|
+ <url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
|
101 |
|
|
+ <releases>
|
102 |
|
|
+ <enabled>true</enabled>
|
103 |
|
|
+ </releases>
|
104 |
|
|
+ <snapshots>
|
105 |
|
|
+ <enabled>false</enabled>
|
106 |
|
|
+ </snapshots>
|
107 |
|
|
+ </repository>
|
108 |
|
|
+ <!-- This repository contains our patched
|
109 |
|
|
+ version of "avro" and "avro-mapred" modules (see the dependencies section)
|
110 |
|
|
+ This entry might be removed when the patch to these modules becomes
|
111 |
|
|
+ a part of the official Avro release.-->
|
112 |
|
|
+ <repository>
|
113 |
|
|
+ <id>dnet-deps</id>
|
114 |
|
|
+ <name>dnet dependencies</name>
|
115 |
|
|
+ <url>http://maven.research-infrastructures.eu/nexus/content/repositories/dnet-deps</url>
|
116 |
|
|
+ <releases>
|
117 |
|
|
+ <enabled>true</enabled>
|
118 |
|
|
+ </releases>
|
119 |
|
|
+ <snapshots>
|
120 |
|
|
+ <enabled>false</enabled>
|
121 |
|
|
+ </snapshots>
|
122 |
|
|
+ <layout>default</layout>
|
123 |
|
|
+ </repository>
|
124 |
|
|
+ </repositories>
|
125 |
|
|
+</project>
|
126 |
|
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java
|
127 |
|
|
--- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java 2013-06-11 01:48:03.516933061 +0200
|
128 |
|
|
+++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroSchema2Pig.java 2013-06-11 00:11:55.000000000 +0200
|
129 |
|
|
@@ -102,6 +102,7 @@ public class AvroSchema2Pig {
|
130 |
|
|
|
131 |
|
|
tupleSchema.setFields(childFields);
|
132 |
|
|
fieldSchema.setSchema(tupleSchema);
|
133 |
|
|
+ visitedRecords.remove(in);
|
134 |
|
|
}
|
135 |
|
|
|
136 |
|
|
} else if (avroType.equals(Schema.Type.ARRAY)) {
|
137 |
|
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java
|
138 |
|
|
--- 3rdparty-original/piggybank/java/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java 2013-06-11 01:48:03.516933061 +0200
|
139 |
|
|
+++ trunk/src/main/java/org/apache/pig/piggybank/storage/avro/AvroStorage.java 2013-06-11 00:12:54.000000000 +0200
|
140 |
|
|
@@ -17,6 +17,7 @@
|
141 |
|
|
|
142 |
|
|
package org.apache.pig.piggybank.storage.avro;
|
143 |
|
|
|
144 |
|
|
+import eu.dnetlib.iis.core.common.AvroUtils;
|
145 |
|
|
import java.io.IOException;
|
146 |
|
|
import java.io.InputStream;
|
147 |
|
|
import java.util.ArrayList;
|
148 |
|
|
@@ -32,6 +33,8 @@ import org.apache.avro.Schema;
|
149 |
|
|
import org.apache.avro.Schema.Field;
|
150 |
|
|
import org.apache.avro.file.DataFileStream;
|
151 |
|
|
import org.apache.avro.generic.GenericDatumReader;
|
152 |
|
|
+import org.apache.avro.specific.SpecificRecordBase;
|
153 |
|
|
+import org.apache.commons.lang.StringUtils;
|
154 |
|
|
import org.apache.hadoop.conf.Configuration;
|
155 |
|
|
import org.apache.hadoop.fs.FileStatus;
|
156 |
|
|
import org.apache.hadoop.fs.FileSystem;
|
157 |
|
|
@@ -105,6 +108,9 @@ public class AvroStorage extends FileInp
|
158 |
|
|
private boolean checkSchema = true; /*whether check schema of input directories*/
|
159 |
|
|
private boolean ignoreBadFiles = false; /* whether ignore corrupted files during load */
|
160 |
|
|
|
161 |
|
|
+ private String inputSchemaFile;
|
162 |
|
|
+ private String outputSchemaFile;
|
163 |
|
|
+
|
164 |
|
|
/**
|
165 |
|
|
* Empty constructor. Output schema is derived from pig schema.
|
166 |
|
|
*/
|
167 |
|
|
@@ -122,7 +128,7 @@ public class AvroStorage extends FileInp
|
168 |
|
|
* @throws IOException
|
169 |
|
|
* @throws ParseException
|
170 |
|
|
*/
|
171 |
|
|
- public AvroStorage(String[] parts) throws IOException, ParseException {
|
172 |
|
|
+ public AvroStorage(String[] parts) throws IOException, ParseException, ClassNotFoundException, InstantiationException, IllegalAccessException {
|
173 |
|
|
outputAvroSchema = null;
|
174 |
|
|
nullable = true;
|
175 |
|
|
checkSchema = true;
|
176 |
|
|
@@ -147,13 +153,18 @@ public class AvroStorage extends FileInp
|
177 |
|
|
*/
|
178 |
|
|
@Override
|
179 |
|
|
public void setLocation(String location, Job job) throws IOException {
|
180 |
|
|
- if (inputAvroSchema != null) {
|
181 |
|
|
- return;
|
182 |
|
|
- }
|
183 |
|
|
Set<Path> paths = new HashSet<Path>();
|
184 |
|
|
Configuration conf = job.getConfiguration();
|
185 |
|
|
if (AvroStorageUtils.getAllSubDirs(new Path(location), conf, paths)) {
|
186 |
|
|
- setInputAvroSchema(paths, conf);
|
187 |
|
|
+ if (inputAvroSchema == null) {
|
188 |
|
|
+ if (inputSchemaFile != null) {
|
189 |
|
|
+ FileSystem fs = FileSystem.get(job.getConfiguration());
|
190 |
|
|
+ Path path = fs.makeQualified(new Path(inputSchemaFile));
|
191 |
|
|
+ inputAvroSchema = getSchemaFromFile(path, fs);
|
192 |
|
|
+ } else {
|
193 |
|
|
+ setInputAvroSchema(paths, conf);
|
194 |
|
|
+ }
|
195 |
|
|
+ }
|
196 |
|
|
FileInputFormat.setInputPaths(job, paths.toArray(new Path[0]));
|
197 |
|
|
} else {
|
198 |
|
|
throw new IOException("Input path \'" + location + "\' is not found");
|
199 |
|
|
@@ -416,7 +427,6 @@ public class AvroStorage extends FileInp
|
200 |
|
|
*/
|
201 |
|
|
@SuppressWarnings("unchecked")
|
202 |
|
|
protected Map<String, Object> parseJsonString(String jsonString) throws ParseException {
|
203 |
|
|
-
|
204 |
|
|
/*parse the json object */
|
205 |
|
|
JSONParser parser = new JSONParser();
|
206 |
|
|
JSONObject obj = (JSONObject) parser.parse(jsonString);
|
207 |
|
|
@@ -450,7 +460,6 @@ public class AvroStorage extends FileInp
|
208 |
|
|
* @throws ParseException
|
209 |
|
|
*/
|
210 |
|
|
protected Map<String, Object> parseStringList(String[] parts) throws IOException {
|
211 |
|
|
-
|
212 |
|
|
Map<String, Object> map = new HashMap<String, Object>();
|
213 |
|
|
|
214 |
|
|
for (int i = 0; i < parts.length; ) {
|
215 |
|
|
@@ -477,6 +486,10 @@ public class AvroStorage extends FileInp
|
216 |
|
|
|| name.equalsIgnoreCase("same")
|
217 |
|
|
|| name.equalsIgnoreCase("schema")
|
218 |
|
|
|| name.equalsIgnoreCase("schema_file")
|
219 |
|
|
+ || name.equalsIgnoreCase("input_schema_file")
|
220 |
|
|
+ || name.equalsIgnoreCase("output_schema_file")
|
221 |
|
|
+ || name.equalsIgnoreCase("input_schema_class")
|
222 |
|
|
+ || name.equalsIgnoreCase("output_schema_class")
|
223 |
|
|
|| name.matches("field\\d+")) {
|
224 |
|
|
/* store value as string */
|
225 |
|
|
map.put(name, value);
|
226 |
|
|
@@ -496,8 +509,7 @@ public class AvroStorage extends FileInp
|
227 |
|
|
/**
|
228 |
|
|
* Initialize output avro schema using input property map
|
229 |
|
|
*/
|
230 |
|
|
- protected void init(Map<String, Object> inputs) throws IOException {
|
231 |
|
|
-
|
232 |
|
|
+ protected void init(Map<String, Object> inputs) throws IOException, ClassNotFoundException, InstantiationException, IllegalAccessException {
|
233 |
|
|
/*used to store field schemas */
|
234 |
|
|
List<Field> fields = null;
|
235 |
|
|
|
236 |
|
|
@@ -517,11 +529,18 @@ public class AvroStorage extends FileInp
|
237 |
|
|
}
|
238 |
|
|
else if (inputs.containsKey("schema_file")) {
|
239 |
|
|
Path path = new Path((String) inputs.get("schema_file"));
|
240 |
|
|
+
|
241 |
|
|
AvroStorageLog.details("schemaFile path=" + path.toUri().toString());
|
242 |
|
|
FileSystem fs = FileSystem.get(path.toUri(), new Configuration());
|
243 |
|
|
Schema schema = getSchemaFromFile(path, fs);
|
244 |
|
|
schemaManager = new AvroSchemaManager(schema);
|
245 |
|
|
}
|
246 |
|
|
+ else if (inputs.containsKey("input_schema_file")) {
|
247 |
|
|
+ inputSchemaFile = (String) inputs.get("input_schema_file");
|
248 |
|
|
+ }
|
249 |
|
|
+ else if (inputs.containsKey("output_schema_file")) {
|
250 |
|
|
+ outputSchemaFile = (String) inputs.get("output_schema_file");
|
251 |
|
|
+ }
|
252 |
|
|
|
253 |
|
|
/* iterate input property map */
|
254 |
|
|
for (Entry<String, Object> entry : inputs.entrySet()) {
|
255 |
|
|
@@ -541,6 +560,10 @@ public class AvroStorage extends FileInp
|
256 |
|
|
nullable = (Boolean) value;
|
257 |
|
|
} else if (name.equalsIgnoreCase("schema")) {
|
258 |
|
|
outputAvroSchema = Schema.parse((String) value);
|
259 |
|
|
+ } else if (name.equalsIgnoreCase("input_schema_class")) {
|
260 |
|
|
+ inputAvroSchema = AvroUtils.toSchema((String) value);
|
261 |
|
|
+ } else if (name.equalsIgnoreCase("output_schema_class")) {
|
262 |
|
|
+ outputAvroSchema = AvroUtils.toSchema((String) value);
|
263 |
|
|
} else if (name.matches("field\\d+")) {
|
264 |
|
|
/*set schema of dth field */
|
265 |
|
|
if (fields == null)
|
266 |
|
|
@@ -579,6 +602,8 @@ public class AvroStorage extends FileInp
|
267 |
|
|
fields.add(field);
|
268 |
|
|
} else if (!name.equalsIgnoreCase("data")
|
269 |
|
|
&& !name.equalsIgnoreCase("schema_file")
|
270 |
|
|
+ && !name.equalsIgnoreCase("input_schema_file")
|
271 |
|
|
+ && !name.equalsIgnoreCase("output_schema_file")
|
272 |
|
|
&& !name.equalsIgnoreCase("debug")) {
|
273 |
|
|
throw new IOException("Invalid parameter:" + name);
|
274 |
|
|
}
|
275 |
|
|
@@ -599,7 +624,6 @@ public class AvroStorage extends FileInp
|
276 |
|
|
nullable = true;
|
277 |
|
|
}
|
278 |
|
|
}
|
279 |
|
|
-
|
280 |
|
|
}
|
281 |
|
|
|
282 |
|
|
@Override
|
283 |
|
|
@@ -609,6 +633,31 @@ public class AvroStorage extends FileInp
|
284 |
|
|
|
285 |
|
|
@Override
|
286 |
|
|
public void setStoreLocation(String location, Job job) throws IOException {
|
287 |
|
|
+ if (outputSchemaFile != null) {
|
288 |
|
|
+ FileSystem fs = FileSystem.get(job.getConfiguration());
|
289 |
|
|
+ Path path = fs.makeQualified(new Path(outputSchemaFile));
|
290 |
|
|
+
|
291 |
|
|
+ outputAvroSchema = getSchemaFromFile(path, fs);
|
292 |
|
|
+
|
293 |
|
|
+ UDFContext context = UDFContext.getUDFContext();
|
294 |
|
|
+ Properties property = context.getUDFProperties(ResourceSchema.class);
|
295 |
|
|
+
|
296 |
|
|
+ String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
|
297 |
|
|
+ String key = getSchemaKey();
|
298 |
|
|
+ Map<String, String> schemaMap = (prevSchemaStr != null)
|
299 |
|
|
+ ? parseSchemaMap(prevSchemaStr)
|
300 |
|
|
+ : new HashMap<String, String>();
|
301 |
|
|
+ schemaMap.put(key, outputAvroSchema.toString());
|
302 |
|
|
+
|
303 |
|
|
+ List<String> schemas = new ArrayList<String>();
|
304 |
|
|
+ for (Map.Entry<String, String> entry : schemaMap.entrySet()) {
|
305 |
|
|
+ schemas.add(entry.getKey() + AvroStorage.SCHEMA_KEYVALUE_DELIM + entry.getValue());
|
306 |
|
|
+ }
|
307 |
|
|
+
|
308 |
|
|
+ String newSchemaStr = StringUtils.join(schemas, AvroStorage.SCHEMA_DELIM);
|
309 |
|
|
+ property.setProperty(AVRO_OUTPUT_SCHEMA_PROPERTY, newSchemaStr);
|
310 |
|
|
+ }
|
311 |
|
|
+
|
312 |
|
|
AvroStorageLog.details("output location=" + location);
|
313 |
|
|
FileOutputFormat.setOutputPath(job, new Path(location));
|
314 |
|
|
}
|
315 |
|
|
@@ -621,6 +670,7 @@ public class AvroStorage extends FileInp
|
316 |
|
|
AvroStorageLog.funcCall("Check schema");
|
317 |
|
|
UDFContext context = UDFContext.getUDFContext();
|
318 |
|
|
Properties property = context.getUDFProperties(ResourceSchema.class);
|
319 |
|
|
+
|
320 |
|
|
String prevSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
|
321 |
|
|
AvroStorageLog.details("Previously defined schemas=" + prevSchemaStr);
|
322 |
|
|
|
323 |
|
|
@@ -683,6 +733,7 @@ public class AvroStorage extends FileInp
|
324 |
|
|
UDFContext context = UDFContext.getUDFContext();
|
325 |
|
|
Properties property = context.getUDFProperties(ResourceSchema.class);
|
326 |
|
|
String allSchemaStr = property.getProperty(AVRO_OUTPUT_SCHEMA_PROPERTY);
|
327 |
|
|
+
|
328 |
|
|
Map<String, String> map = (allSchemaStr != null) ? parseSchemaMap(allSchemaStr) : null;
|
329 |
|
|
|
330 |
|
|
String key = getSchemaKey();
|
331 |
|
|
@@ -711,7 +762,7 @@ public class AvroStorage extends FileInp
|
332 |
|
|
StoreFunc.cleanupOnFailureImpl(location, job);
|
333 |
|
|
}
|
334 |
|
|
|
335 |
|
|
- @Override
|
336 |
|
|
+ //@Override
|
337 |
|
|
public void cleanupOnSuccess(String location, Job job) throws IOException {
|
338 |
|
|
// Nothing to do
|
339 |
|
|
}
|
340 |
|
|
@@ -719,7 +770,7 @@ public class AvroStorage extends FileInp
|
341 |
|
|
@Override
|
342 |
|
|
public void putNext(Tuple t) throws IOException {
|
343 |
|
|
try {
|
344 |
|
|
- this.writer.write(NullWritable.get(), t.getAll().size() == 1 ? t.get(0) : t);
|
345 |
|
|
+ this.writer.write(NullWritable.get(), t);
|
346 |
|
|
} catch (InterruptedException e) {
|
347 |
|
|
e.printStackTrace();
|
348 |
|
|
}
|
349 |
|
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro
|
350 |
|
|
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro 1970-01-01 01:00:00.000000000 +0100
|
351 |
|
|
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/expected_testRecordSplit3.avro 2013-06-11 01:28:15.576901251 +0200
|
352 |
|
|
@@ -0,0 +1 @@
|
353 |
|
|
+Objavro.schema?{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]}
|
354 |
|
|
\ Brak znaku nowej linii na końcu pliku
|
355 |
|
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro
|
356 |
|
|
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro 1970-01-01 01:00:00.000000000 +0100
|
357 |
|
|
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple.avro 2013-06-11 01:16:41.648882666 +0200
|
358 |
|
|
@@ -0,0 +1 @@
|
359 |
|
|
+Objavro.schema?{"type":"record","name":"simple","fields":[{"name":"member_id","type":"int"},{"name":"count","type":"long"}]}
|
360 |
|
|
\ Brak znaku nowej linii na końcu pliku
|
361 |
|
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc
|
362 |
|
|
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc 1970-01-01 01:00:00.000000000 +0100
|
363 |
|
|
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/avro_test_files/test_simple_record.avsc 2013-06-11 01:16:56.912883076 +0200
|
364 |
|
|
@@ -0,0 +1,8 @@
|
365 |
|
|
+{
|
366 |
|
|
+ "type": "record",
|
367 |
|
|
+ "name": "simple",
|
368 |
|
|
+ "fields": [
|
369 |
|
|
+ {"name": "member_id", "type": "int"},
|
370 |
|
|
+ {"name": "count", "type": "long"}
|
371 |
|
|
+ ]
|
372 |
|
|
+}
|
373 |
|
|
\ Brak znaku nowej linii na końcu pliku
|
374 |
|
|
diff -rupNa '--exclude=.svn' '--exclude=target' 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java
|
375 |
|
|
--- 3rdparty-original/piggybank/java/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java 2013-06-11 01:48:03.512933062 +0200
|
376 |
|
|
+++ trunk/src/test/java/org/apache/pig/piggybank/test/storage/avro/TestAvroStorage.java 2013-06-11 01:23:38.920893841 +0200
|
377 |
|
|
@@ -35,7 +35,6 @@ import org.apache.pig.backend.hadoop.exe
|
378 |
|
|
import org.apache.pig.impl.logicalLayer.FrontendException;
|
379 |
|
|
import org.apache.pig.piggybank.storage.avro.AvroStorage;
|
380 |
|
|
import org.apache.pig.piggybank.storage.avro.PigSchema2Avro;
|
381 |
|
|
-import org.apache.pig.test.Util;
|
382 |
|
|
import org.junit.AfterClass;
|
383 |
|
|
import org.junit.Assert;
|
384 |
|
|
import org.junit.BeforeClass;
|
385 |
|
|
@@ -81,7 +80,9 @@ public class TestAvroStorage {
|
386 |
|
|
final private String testNoMatchedFiles = getInputFile("test_dir{1,2}/file_that_does_not_exist*.avro");
|
387 |
|
|
final private String testArrayFile = getInputFile("test_array.avro");
|
388 |
|
|
final private String testRecordFile = getInputFile("test_record.avro");
|
389 |
|
|
+ final private String testSimpleFile = getInputFile("test_simple.avro");
|
390 |
|
|
final private String testRecordSchema = getInputFile("test_record.avsc");
|
391 |
|
|
+ final private String testSimpleRecordSchema = getInputFile("test_simple_record.avsc");
|
392 |
|
|
final private String testGenericUnionFile = getInputFile("test_generic_union.avro");
|
393 |
|
|
final private String testRecursiveRecordInMap = getInputFile("test_recursive_record_in_map.avro");
|
394 |
|
|
final private String testRecursiveRecordInArray = getInputFile("test_recursive_record_in_array.avro");
|
395 |
|
|
@@ -973,6 +974,76 @@ public class TestAvroStorage {
|
396 |
|
|
verifyResults(output, expected);
|
397 |
|
|
}
|
398 |
|
|
|
399 |
|
|
+ @Test
|
400 |
|
|
+ public void testOutputSchemaFile() throws IOException {
|
401 |
|
|
+ String output1= outbasedir + "testOutputSchemaFile";
|
402 |
|
|
+ String expected1 = basedir + "expected_testRecordSplit3.avro";
|
403 |
|
|
+ deleteDirectory(new File(output1));
|
404 |
|
|
+ String [] queries = {
|
405 |
|
|
+ " avro = LOAD '" + testRecordFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ();",
|
406 |
|
|
+ " groups = GROUP avro BY member_id;",
|
407 |
|
|
+ " sc = FOREACH groups GENERATE group AS key, COUNT(avro) AS cnt;",
|
408 |
|
|
+ " STORE sc INTO '" + output1 +
|
409 |
|
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
|
410 |
|
|
+ " 'no_schema_check'," +
|
411 |
|
|
+ " 'output_schema_file', '" + testSimpleRecordSchema + "' );"
|
412 |
|
|
+ };
|
413 |
|
|
+ testAvroStorage( queries);
|
414 |
|
|
+ verifyResults(output1, expected1);
|
415 |
|
|
+ }
|
416 |
|
|
+
|
417 |
|
|
+ @Test
|
418 |
|
|
+ public void testInputSchemaFile() throws IOException {
|
419 |
|
|
+ String output1= outbasedir + "testInputSchemaFile";
|
420 |
|
|
+ String expected1 = basedir + "expected_testRecordSplit3.avro";
|
421 |
|
|
+ deleteDirectory(new File(output1));
|
422 |
|
|
+ String [] queries = {
|
423 |
|
|
+ " avro = LOAD '" + testSimpleFile + " ' USING org.apache.pig.piggybank.storage.avro.AvroStorage ('input_schema_file', '" + testSimpleRecordSchema + "');",
|
424 |
|
|
+ " STORE avro INTO '" + output1 +
|
425 |
|
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
|
426 |
|
|
+ " 'no_schema_check'," +
|
427 |
|
|
+ " 'output_schema_file', '" + testSimpleRecordSchema + "' );"
|
428 |
|
|
+ };
|
429 |
|
|
+ testAvroStorage( queries);
|
430 |
|
|
+ verifyResults(output1, expected1);
|
431 |
|
|
+ }
|
432 |
|
|
+
|
433 |
|
|
+ @Test
|
434 |
|
|
+ public void testInputSchemaClass() throws IOException {
|
435 |
|
|
+ String output= outbasedir + "testInputSchemaClass";
|
436 |
|
|
+ String test = getInputFile("expected_testRecursiveRecordReference1.avro");
|
437 |
|
|
+ String expected = basedir + "expected_testRecursiveRecordReference1.avro";
|
438 |
|
|
+ deleteDirectory(new File(output));
|
439 |
|
|
+ String [] queries = {
|
440 |
|
|
+ " in = LOAD '" + test +
|
441 |
|
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+
|
442 |
|
|
+ " 'input_schema_class', 'org.apache.avro.Schema.Type.INT' );",
|
443 |
|
|
+ " STORE in INTO '" + output +
|
444 |
|
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
|
445 |
|
|
+ " 'schema', '\"int\"' );"
|
446 |
|
|
+ };
|
447 |
|
|
+ testAvroStorage(queries);
|
448 |
|
|
+ verifyResults(output, expected);
|
449 |
|
|
+ }
|
450 |
|
|
+
|
451 |
|
|
+ @Test
|
452 |
|
|
+ public void testOutputSchemaClass() throws IOException {
|
453 |
|
|
+ String output= outbasedir + "testOutputSchemaClass";
|
454 |
|
|
+ String test = getInputFile("expected_testRecursiveRecordReference1.avro");
|
455 |
|
|
+ String expected = basedir + "expected_testRecursiveRecordReference1.avro";
|
456 |
|
|
+ deleteDirectory(new File(output));
|
457 |
|
|
+ String [] queries = {
|
458 |
|
|
+ " in = LOAD '" + test +
|
459 |
|
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage ("+
|
460 |
|
|
+ " 'schema', '\"int\"' );",
|
461 |
|
|
+ " STORE in INTO '" + output +
|
462 |
|
|
+ "' USING org.apache.pig.piggybank.storage.avro.AvroStorage (" +
|
463 |
|
|
+ " 'output_schema_class', 'org.apache.avro.Schema.Type.INT' );"
|
464 |
|
|
+ };
|
465 |
|
|
+ testAvroStorage(queries);
|
466 |
|
|
+ verifyResults(output, expected);
|
467 |
|
|
+ }
|
468 |
|
|
+
|
469 |
|
|
private static void deleteDirectory (File path) {
|
470 |
|
|
if ( path.exists()) {
|
471 |
|
|
File [] files = path.listFiles();
|
472 |
|
|
@@ -1014,8 +1085,8 @@ public class TestAvroStorage {
|
473 |
|
|
|
474 |
|
|
private void verifyResults(String outPath, String expectedOutpath, String expectedCodec) throws IOException {
|
475 |
|
|
// Seems compress for Avro is broken in 23. Skip this test and open Jira PIG-
|
476 |
|
|
- if (Util.isHadoop23())
|
477 |
|
|
- return;
|
478 |
|
|
+// if (Util.isHadoop23())
|
479 |
|
|
+// return;
|
480 |
|
|
|
481 |
|
|
FileSystem fs = FileSystem.getLocal(new Configuration()) ;
|