Revision 47545
Added by Marek Horst almost 7 years ago
modules/icm-iis-3rdparty-avro-json/trunk/pom.xml.disabled | ||
---|---|---|
1 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|
2 |
<!-- |
|
3 |
Licensed to the Apache Software Foundation (ASF) under one |
|
4 |
or more contributor license agreements. See the NOTICE file |
|
5 |
distributed with this work for additional information |
|
6 |
regarding copyright ownership. The ASF licenses this file |
|
7 |
to you under the Apache License, Version 2.0 (the |
|
8 |
"License"); you may not use this file except in compliance |
|
9 |
with the License. You may obtain a copy of the License at |
|
10 |
|
|
11 |
http://www.apache.org/licenses/LICENSE-2.0 |
|
12 |
|
|
13 |
Unless required by applicable law or agreed to in writing, software |
|
14 |
distributed under the License is distributed on an "AS IS" BASIS, |
|
15 |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
16 |
See the License for the specific language governing permissions and |
|
17 |
limitations under the License. |
|
18 |
--> |
|
19 |
<parent> |
|
20 |
<groupId>eu.dnetlib</groupId> |
|
21 |
<artifactId>dnet-hadoop-parent</artifactId> |
|
22 |
<version>1.0.0</version> |
|
23 |
</parent> |
|
24 |
|
|
25 |
<modelVersion>4.0.0</modelVersion> |
|
26 |
|
|
27 |
<artifactId>icm-iis-3rdparty-avro-json</artifactId> |
|
28 |
<version>1.1-SNAPSHOT</version> |
|
29 |
<packaging>jar</packaging> |
|
30 |
|
|
31 |
<scm> |
|
32 |
<developerConnection> |
|
33 |
scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet40/modules/icm-iis-3rdparty-avro-json/trunk |
|
34 |
</developerConnection> |
|
35 |
</scm> |
|
36 |
|
|
37 |
<name>Avro/JSON InputFormat/OutputFormats and SerDes</name> |
|
38 |
<url>http://www.cloudera.com</url> |
|
39 |
|
|
40 |
<properties> |
|
41 |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
|
42 |
<eclipse.output.directory>eclipse-classes</eclipse.output.directory> |
|
43 |
</properties> |
|
44 |
|
|
45 |
<build> |
|
46 |
<plugins> |
|
47 |
<plugin> |
|
48 |
<groupId>org.apache.maven.plugins</groupId> |
|
49 |
<artifactId>maven-eclipse-plugin</artifactId> |
|
50 |
<version>2.9</version> |
|
51 |
<configuration> |
|
52 |
<buildOutputDirectory>eclipse-classes</buildOutputDirectory> |
|
53 |
<downloadSources>true</downloadSources> |
|
54 |
<downloadJavadocs>false</downloadJavadocs> |
|
55 |
</configuration> |
|
56 |
</plugin> |
|
57 |
|
|
58 |
</plugins> |
|
59 |
</build> |
|
60 |
|
|
61 |
<dependencies> |
|
62 |
<dependency> |
|
63 |
<groupId>eu.dnetlib</groupId> |
|
64 |
<artifactId>icm-iis-core</artifactId> |
|
65 |
<version>[1.0.0,2.0.0)</version> |
|
66 |
</dependency> |
|
67 |
<dependency> |
|
68 |
<groupId>junit</groupId> |
|
69 |
<artifactId>junit</artifactId> |
|
70 |
<version>4.8.2</version> |
|
71 |
<scope>test</scope> |
|
72 |
</dependency> |
|
73 |
<dependency> |
|
74 |
<groupId>org.apache.avro</groupId> |
|
75 |
<artifactId>avro</artifactId> |
|
76 |
<version>${iis.avro.version}</version> |
|
77 |
</dependency> |
|
78 |
|
|
79 |
<dependency> |
|
80 |
<groupId>org.apache.avro</groupId> |
|
81 |
<artifactId>avro-mapred</artifactId> |
|
82 |
<version>${iis.avro.version}</version> |
|
83 |
<classifier>hadoop2</classifier> |
|
84 |
</dependency> |
|
85 |
|
|
86 |
<!-- Hadoop Dependencies --> |
|
87 |
<dependency> |
|
88 |
<groupId>org.apache.hive</groupId> |
|
89 |
<artifactId>hive-serde</artifactId> |
|
90 |
<version>${iis.hive.version}</version> |
|
91 |
<scope>provided</scope> |
|
92 |
</dependency> |
|
93 |
<dependency> |
|
94 |
<groupId>org.apache.hadoop</groupId> |
|
95 |
<artifactId>hadoop-client</artifactId> |
|
96 |
<version>${iis.hadoop.version}</version> |
|
97 |
<scope>provided</scope> |
|
98 |
</dependency> |
|
99 |
</dependencies> |
|
100 |
|
|
101 |
<repositories> |
|
102 |
<repository> |
|
103 |
<id>cloudera</id> |
|
104 |
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url> |
|
105 |
<releases> |
|
106 |
<enabled>true</enabled> |
|
107 |
</releases> |
|
108 |
<snapshots> |
|
109 |
<enabled>false</enabled> |
|
110 |
</snapshots> |
|
111 |
</repository> |
|
112 |
</repositories> |
|
113 |
</project> |
modules/icm-iis-3rdparty-avro-json/trunk/src/test/java/com/cloudera/science/avro/common/JsonConverterTest.java | ||
---|---|---|
1 |
/** |
|
2 |
* Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. |
|
3 |
* |
|
4 |
* Cloudera, Inc. licenses this file to you under the Apache License, |
|
5 |
* Version 2.0 (the "License"). You may not use this file except in |
|
6 |
* compliance with the License. You may obtain a copy of the License at |
|
7 |
* |
|
8 |
* http://www.apache.org/licenses/LICENSE-2.0 |
|
9 |
* |
|
10 |
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
|
11 |
* CONDITIONS OF ANY KIND, either express or implied. See the License for |
|
12 |
* the specific language governing permissions and limitations under the |
|
13 |
* License. |
|
14 |
*/ |
|
15 |
package com.cloudera.science.avro.common; |
|
16 |
|
|
17 |
import static org.junit.Assert.assertEquals; |
|
18 |
|
|
19 |
import java.io.IOException; |
|
20 |
import java.util.Arrays; |
|
21 |
|
|
22 |
import org.apache.avro.Schema; |
|
23 |
import org.apache.avro.Schema.Type; |
|
24 |
import org.apache.avro.generic.GenericRecord; |
|
25 |
import org.junit.Test; |
|
26 |
|
|
27 |
import com.google.common.collect.ImmutableList; |
|
28 |
|
|
29 |
public class JsonConverterTest { |
|
30 |
|
|
31 |
private static Schema.Field sf(String name, Schema schema) { |
|
32 |
return new Schema.Field(name, schema, "", null); |
|
33 |
} |
|
34 |
|
|
35 |
private static Schema.Field sf(String name, Schema.Type type) { |
|
36 |
return sf(name, sc(type)); |
|
37 |
} |
|
38 |
|
|
39 |
private static Schema sc(Schema.Type type) { |
|
40 |
return Schema.create(type); |
|
41 |
} |
|
42 |
|
|
43 |
private static Schema sr(Schema.Field... fields) { |
|
44 |
return Schema.createRecord(Arrays.asList(fields)); |
|
45 |
} |
|
46 |
|
|
47 |
Schema.Field f1 = sf("field1", Type.LONG); |
|
48 |
Schema.Field f2 = sf("field2", Schema.createArray(sc(Type.BOOLEAN))); |
|
49 |
Schema.Field f3Map = sf("field3", Schema.createMap(sc(Type.STRING))); |
|
50 |
Schema.Field f3Rec = sf("field3", sr(sf("key", Type.STRING))); |
|
51 |
|
|
52 |
@Test |
|
53 |
public void testBasicWithMap() throws Exception { |
|
54 |
JsonConverter jc = new JsonConverter(sr(f1, f2, f3Map)); |
|
55 |
String json = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}}"; |
|
56 |
GenericRecord r = jc.convert(json); |
|
57 |
assertEquals(json, r.toString()); |
|
58 |
} |
|
59 |
|
|
60 |
@Test |
|
61 |
public void testBasicWithRecord() throws Exception { |
|
62 |
JsonConverter jc = new JsonConverter(sr(f1, f2, f3Rec)); |
|
63 |
String json = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}}"; |
|
64 |
GenericRecord r = jc.convert(json); |
|
65 |
assertEquals(json, r.toString()); |
|
66 |
} |
|
67 |
|
|
68 |
@Test(expected=IllegalArgumentException.class) |
|
69 |
public void testMissingRequiredField() throws Exception { |
|
70 |
JsonConverter jc = new JsonConverter(sr(f1, f2, f3Rec)); |
|
71 |
String json = "{\"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}}"; |
|
72 |
jc.convert(json); |
|
73 |
} |
|
74 |
|
|
75 |
@Test |
|
76 |
public void testMissingNullableField() throws Exception { |
|
77 |
Schema optional = Schema.createUnion( |
|
78 |
ImmutableList.of(Schema.create(Type.NULL), Schema.create(Type.DOUBLE))); |
|
79 |
Schema.Field f4 = sf("field4", optional); |
|
80 |
JsonConverter jc = new JsonConverter(sr(f1, f2, f3Rec, f4)); |
|
81 |
String json = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}}"; |
|
82 |
GenericRecord r = jc.convert(json); |
|
83 |
String expect = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}, \"field4\": null}"; |
|
84 |
assertEquals(expect, r.toString()); |
|
85 |
} |
|
86 |
|
|
87 |
@Test |
|
88 |
public void testMissingNullableArrayField() throws Exception { |
|
89 |
Schema.Parser parser = new Schema.Parser(); |
|
90 |
Schema schemaOptionalArray = parser.parse("[\"null\", {\"type\": \"array\", \"items\": \"int\"}]"); |
|
91 |
Schema.Field f4 = sf("field4", schemaOptionalArray); |
|
92 |
JsonConverter jc = new JsonConverter(sr(f1, f2, f3Rec, f4)); |
|
93 |
String json = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}}"; |
|
94 |
GenericRecord r = jc.convert(json); |
|
95 |
String expect = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}, \"field4\": null}"; |
|
96 |
assertEquals(expect, r.toString()); |
|
97 |
} |
|
98 |
} |
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/serde/AvroAsJSONSerde.java | ||
---|---|---|
1 |
/** |
|
2 |
* Copyright (c) 2012, Cloudera, Inc. All Rights Reserved. |
|
3 |
* |
|
4 |
* Cloudera, Inc. licenses this file to you under the Apache License, |
|
5 |
* Version 2.0 (the "License"). You may not use this file except in |
|
6 |
* compliance with the License. You may obtain a copy of the License at |
|
7 |
* |
|
8 |
* http://www.apache.org/licenses/LICENSE-2.0 |
|
9 |
* |
|
10 |
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
|
11 |
* CONDITIONS OF ANY KIND, either express or implied. See the License for |
|
12 |
* the specific language governing permissions and limitations under the |
|
13 |
* License. |
|
14 |
*/ |
|
15 |
package com.cloudera.science.avro.serde; |
|
16 |
|
|
17 |
import java.io.IOException; |
|
18 |
import java.util.List; |
|
19 |
import java.util.Properties; |
|
20 |
|
|
21 |
import org.apache.avro.Schema; |
|
22 |
import org.apache.hadoop.conf.Configuration; |
|
23 |
import org.apache.hadoop.hive.serde.Constants; |
|
24 |
import org.apache.hadoop.hive.serde2.SerDe; |
|
25 |
import org.apache.hadoop.hive.serde2.SerDeException; |
|
26 |
import org.apache.hadoop.hive.serde2.SerDeStats; |
|
27 |
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable; |
|
28 |
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; |
|
29 |
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; |
|
30 |
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; |
|
31 |
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; |
|
32 |
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; |
|
33 |
import org.apache.hadoop.io.Text; |
|
34 |
import org.apache.hadoop.io.Writable; |
|
35 |
|
|
36 |
import com.cloudera.science.avro.common.JsonConverter; |
|
37 |
import com.cloudera.science.avro.common.SchemaLoader; |
|
38 |
import com.google.common.collect.ImmutableList; |
|
39 |
import com.google.common.collect.Lists; |
|
40 |
|
|
41 |
public class AvroAsJSONSerde implements SerDe { |
|
42 |
// Follow convention of AvroSerDe |
|
43 |
public static final String SCHEMA_LITERAL = "avro.schema.literal"; |
|
44 |
public static final String SCHEMA_URL = "avro.schema.url"; |
|
45 |
public static final String SCHEMA_TYPE_NAME = "avro.schema.class"; |
|
46 |
|
|
47 |
private Schema schema; |
|
48 |
private JsonConverter converter; |
|
49 |
private AvroGenericRecordWritable agrw = new AvroGenericRecordWritable(); |
|
50 |
private List<Object> row = Lists.newArrayList(); |
|
51 |
private ObjectInspector oi; |
|
52 |
|
|
53 |
@Override |
|
54 |
public void initialize(Configuration conf, Properties tbl) throws SerDeException { |
|
55 |
SchemaLoader loader = new SchemaLoader(conf); |
|
56 |
try { |
|
57 |
this.schema = loader.load(tbl.getProperty(SCHEMA_LITERAL), |
|
58 |
tbl.getProperty(SCHEMA_URL), tbl.getProperty(SCHEMA_TYPE_NAME)); |
|
59 |
} catch (IOException e) { |
|
60 |
throw new SerDeException(e); |
|
61 |
} |
|
62 |
this.converter = new JsonConverter(schema); |
|
63 |
row.add(""); |
|
64 |
|
|
65 |
String colName = tbl.getProperty(Constants.LIST_COLUMNS); |
|
66 |
if (colName == null || colName.isEmpty()) { |
|
67 |
colName = "json"; // use a default |
|
68 |
} |
|
69 |
|
|
70 |
this.oi = ObjectInspectorFactory.getStandardStructObjectInspector( |
|
71 |
ImmutableList.of(colName), |
|
72 |
ImmutableList.<ObjectInspector>of(PrimitiveObjectInspectorFactory.javaStringObjectInspector)); |
|
73 |
} |
|
74 |
|
|
75 |
@Override |
|
76 |
public Object deserialize(Writable blob) throws SerDeException { |
|
77 |
row.set(0, ((AvroGenericRecordWritable) blob).getRecord().toString()); |
|
78 |
return row; |
|
79 |
} |
|
80 |
|
|
81 |
@Override |
|
82 |
public ObjectInspector getObjectInspector() throws SerDeException { |
|
83 |
return oi; |
|
84 |
} |
|
85 |
|
|
86 |
@Override |
|
87 |
public SerDeStats getSerDeStats() { |
|
88 |
return null; |
|
89 |
} |
|
90 |
|
|
91 |
@Override |
|
92 |
public Class<? extends Writable> getSerializedClass() { |
|
93 |
return Text.class; |
|
94 |
} |
|
95 |
|
|
96 |
@Override |
|
97 |
public Writable serialize(Object obj, ObjectInspector oi) throws SerDeException { |
|
98 |
StructObjectInspector soi = (StructObjectInspector) oi; |
|
99 |
List<Object> data = soi.getStructFieldsDataAsList(soi); |
|
100 |
StringObjectInspector foi = (StringObjectInspector) |
|
101 |
soi.getAllStructFieldRefs().get(0).getFieldObjectInspector(); |
|
102 |
try { |
|
103 |
agrw.setRecord(converter.convert(foi.getPrimitiveJavaObject(data.get(0)))); |
|
104 |
} catch (IOException e) { |
|
105 |
throw new SerDeException(e); |
|
106 |
} |
|
107 |
return agrw; |
|
108 |
} |
|
109 |
} |
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/common/JsonConverter.java | ||
---|---|---|
1 |
/** |
|
2 |
* Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. |
|
3 |
* |
|
4 |
* Cloudera, Inc. licenses this file to you under the Apache License, |
|
5 |
* Version 2.0 (the "License"). You may not use this file except in |
|
6 |
* compliance with the License. You may obtain a copy of the License at |
|
7 |
* |
|
8 |
* http://www.apache.org/licenses/LICENSE-2.0 |
|
9 |
* |
|
10 |
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
|
11 |
* CONDITIONS OF ANY KIND, either express or implied. See the License for |
|
12 |
* the specific language governing permissions and limitations under the |
|
13 |
* License. |
|
14 |
*/ |
|
15 |
package com.cloudera.science.avro.common; |
|
16 |
|
|
17 |
import java.io.IOException; |
|
18 |
import java.util.ArrayList; |
|
19 |
import java.util.HashMap; |
|
20 |
import java.util.List; |
|
21 |
import java.util.Map; |
|
22 |
import java.util.Set; |
|
23 |
|
|
24 |
import org.apache.avro.Schema; |
|
25 |
import org.apache.avro.generic.GenericData; |
|
26 |
import org.apache.avro.generic.GenericRecord; |
|
27 |
import org.apache.commons.logging.Log; |
|
28 |
import org.apache.commons.logging.LogFactory; |
|
29 |
import org.codehaus.jackson.JsonNode; |
|
30 |
import org.codehaus.jackson.map.ObjectMapper; |
|
31 |
|
|
32 |
import com.google.common.collect.ImmutableSet; |
|
33 |
import com.google.common.collect.Lists; |
|
34 |
import com.google.common.collect.Maps; |
|
35 |
import com.google.common.collect.Sets; |
|
36 |
|
|
37 |
/** |
|
38 |
* |
|
39 |
*/ |
|
40 |
public class JsonConverter { |
|
41 |
private static final Log LOG = LogFactory.getLog(JsonConverter.class); |
|
42 |
private static final Set<Schema.Type> SUPPORTED_TYPES = ImmutableSet.of( |
|
43 |
Schema.Type.RECORD, Schema.Type.ARRAY, Schema.Type.MAP, |
|
44 |
Schema.Type.INT, Schema.Type.LONG, Schema.Type.BOOLEAN, |
|
45 |
Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.STRING); |
|
46 |
|
|
47 |
private final ObjectMapper mapper = new ObjectMapper(); |
|
48 |
private final Schema baseSchema; |
|
49 |
private int logMessageCounter = 0; |
|
50 |
|
|
51 |
public JsonConverter(Schema schema) { |
|
52 |
this.baseSchema = checkSchema(schema, true); |
|
53 |
} |
|
54 |
|
|
55 |
private Schema checkSchema(Schema schema, boolean mustBeRecord) { |
|
56 |
if (!mustBeRecord) { |
|
57 |
if (!SUPPORTED_TYPES.contains(schema.getType())) { |
|
58 |
throw new IllegalArgumentException("Unsupported type: " + schema.getType()); |
|
59 |
} |
|
60 |
if (schema.getType() != Schema.Type.RECORD) { |
|
61 |
return schema; |
|
62 |
} |
|
63 |
} |
|
64 |
for (Schema.Field f : schema.getFields()) { |
|
65 |
Schema fs = f.schema(); |
|
66 |
if (isNullableSchema(fs)) { |
|
67 |
fs = getNonNull(fs); |
|
68 |
} |
|
69 |
Schema.Type st = fs.getType(); |
|
70 |
if (!SUPPORTED_TYPES.contains(st)) { |
|
71 |
throw new IllegalArgumentException(String.format( |
|
72 |
"Unsupported type '%s' for field '%s'", st.toString(), f.name())); |
|
73 |
} |
|
74 |
switch (st) { |
|
75 |
case RECORD: |
|
76 |
checkSchema(fs, true); |
|
77 |
break; |
|
78 |
case MAP: |
|
79 |
checkSchema(fs.getValueType(), false); |
|
80 |
break; |
|
81 |
case ARRAY: |
|
82 |
checkSchema(fs.getElementType(), false); |
|
83 |
default: |
|
84 |
break; // No need to check primitives |
|
85 |
} |
|
86 |
} |
|
87 |
return schema; |
|
88 |
} |
|
89 |
|
|
90 |
public GenericRecord convert(String json) throws IOException { |
|
91 |
return convert(mapper.readValue(json, Map.class), baseSchema); |
|
92 |
} |
|
93 |
|
|
94 |
private GenericRecord convert(Map<String, Object> raw, Schema schema) |
|
95 |
throws IOException { |
|
96 |
GenericRecord result = new GenericData.Record(schema); |
|
97 |
Set<String> usedFields = Sets.newHashSet(); |
|
98 |
for (Schema.Field f : schema.getFields()) { |
|
99 |
String name = f.name(); |
|
100 |
if (raw.containsKey(name)) { |
|
101 |
result.put(f.pos(), typeConvert(raw.get(name), name, f.schema())); |
|
102 |
usedFields.add(name); |
|
103 |
} else { |
|
104 |
JsonNode defaultValue = f.defaultValue(); |
|
105 |
if (defaultValue == null) { |
|
106 |
if (isNullableSchema(f.schema())) { |
|
107 |
result.put(f.pos(), null); |
|
108 |
} else { |
|
109 |
throw new IllegalArgumentException( |
|
110 |
"No default value provided for non-nullable field: " + f.name()); |
|
111 |
} |
|
112 |
} else { |
|
113 |
Schema fieldSchema = f.schema(); |
|
114 |
if (isNullableSchema(fieldSchema)) { |
|
115 |
fieldSchema = getNonNull(fieldSchema); |
|
116 |
} |
|
117 |
Object value = null; |
|
118 |
switch (fieldSchema.getType()) { |
|
119 |
case BOOLEAN: |
|
120 |
value = defaultValue.getValueAsBoolean(); |
|
121 |
break; |
|
122 |
case DOUBLE: |
|
123 |
value = defaultValue.getValueAsDouble(); |
|
124 |
break; |
|
125 |
case FLOAT: |
|
126 |
value = (float) defaultValue.getValueAsDouble(); |
|
127 |
break; |
|
128 |
case INT: |
|
129 |
value = defaultValue.getValueAsInt(); |
|
130 |
break; |
|
131 |
case LONG: |
|
132 |
value = defaultValue.getValueAsLong(); |
|
133 |
break; |
|
134 |
case STRING: |
|
135 |
value = defaultValue.getValueAsText(); |
|
136 |
break; |
|
137 |
case MAP: |
|
138 |
Map<String, Object> fieldMap = mapper.readValue( |
|
139 |
defaultValue.getValueAsText(), Map.class); |
|
140 |
Map<String, Object> mvalue = Maps.newHashMap(); |
|
141 |
for (Map.Entry<String, Object> e : fieldMap.entrySet()) { |
|
142 |
mvalue.put(e.getKey(), |
|
143 |
typeConvert(e.getValue(), name, fieldSchema.getValueType())); |
|
144 |
} |
|
145 |
value = mvalue; |
|
146 |
break; |
|
147 |
case ARRAY: |
|
148 |
List fieldArray = mapper.readValue( |
|
149 |
defaultValue.getValueAsText(), List.class); |
|
150 |
List lvalue = Lists.newArrayList(); |
|
151 |
for (Object elem : fieldArray) { |
|
152 |
lvalue.add(typeConvert(elem, name, fieldSchema.getElementType())); |
|
153 |
} |
|
154 |
value = lvalue; |
|
155 |
break; |
|
156 |
case RECORD: |
|
157 |
Map<String, Object> fieldRec = mapper.readValue( |
|
158 |
defaultValue.getValueAsText(), Map.class); |
|
159 |
value = convert(fieldRec, fieldSchema); |
|
160 |
break; |
|
161 |
default: |
|
162 |
throw new IllegalArgumentException( |
|
163 |
"JsonConverter cannot handle type: " + fieldSchema.getType()); |
|
164 |
} |
|
165 |
result.put(f.pos(), value); |
|
166 |
} |
|
167 |
} |
|
168 |
} |
|
169 |
|
|
170 |
if (usedFields.size() < raw.size()) { |
|
171 |
// Log a notification about unused fields |
|
172 |
if (logMessageCounter % 1000 == 0) { |
|
173 |
LOG.warn("Ignoring unused JSON fields: " + Sets.difference(raw.keySet(), usedFields)); |
|
174 |
} |
|
175 |
logMessageCounter++; |
|
176 |
} |
|
177 |
|
|
178 |
return result; |
|
179 |
} |
|
180 |
|
|
181 |
private Object typeConvert(Object value, String name, Schema schema) throws IOException { |
|
182 |
if (isNullableSchema(schema)) { |
|
183 |
if (value == null) { |
|
184 |
return null; |
|
185 |
} else { |
|
186 |
schema = getNonNull(schema); |
|
187 |
} |
|
188 |
} else if (value == null) { |
|
189 |
// Always fail on null for non-nullable schemas |
|
190 |
throw new JsonConversionException(value, name, schema); |
|
191 |
} |
|
192 |
|
|
193 |
switch (schema.getType()) { |
|
194 |
case BOOLEAN: |
|
195 |
if (value instanceof Boolean) { |
|
196 |
return (Boolean) value; |
|
197 |
} else if (value instanceof String) { |
|
198 |
return Boolean.valueOf((String) value); |
|
199 |
} else if (value instanceof Number) { |
|
200 |
return ((Number) value).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE; |
|
201 |
} |
|
202 |
break; |
|
203 |
case DOUBLE: |
|
204 |
if (value instanceof Number) { |
|
205 |
return ((Number) value).doubleValue(); |
|
206 |
} else if (value instanceof String) { |
|
207 |
return Double.valueOf((String) value); |
|
208 |
} |
|
209 |
break; |
|
210 |
case FLOAT: |
|
211 |
if (value instanceof Number) { |
|
212 |
return ((Number) value).floatValue(); |
|
213 |
} else if (value instanceof String) { |
|
214 |
return Float.valueOf((String) value); |
|
215 |
} |
|
216 |
break; |
|
217 |
case INT: |
|
218 |
if (value instanceof Number) { |
|
219 |
return ((Number) value).intValue(); |
|
220 |
} else if (value instanceof String) { |
|
221 |
return Integer.valueOf((String) value); |
|
222 |
} |
|
223 |
break; |
|
224 |
case LONG: |
|
225 |
if (value instanceof Number) { |
|
226 |
return ((Number) value).longValue(); |
|
227 |
} else if (value instanceof String) { |
|
228 |
return Long.valueOf((String) value); |
|
229 |
} |
|
230 |
break; |
|
231 |
case STRING: |
|
232 |
return value.toString(); |
|
233 |
case RECORD: |
|
234 |
return convert((Map<String, Object>) value, schema); |
|
235 |
case ARRAY: |
|
236 |
Schema elementSchema = schema.getElementType(); |
|
237 |
List listRes = new ArrayList(); |
|
238 |
for (Object v : (List) value) { |
|
239 |
listRes.add(typeConvert(v, name, elementSchema)); |
|
240 |
} |
|
241 |
return listRes; |
|
242 |
case MAP: |
|
243 |
Schema valueSchema = schema.getValueType(); |
|
244 |
Map<String, Object> mapRes = new HashMap<String, Object>(); |
|
245 |
for (Map.Entry<String, Object> v : ((Map<String, Object>) value).entrySet()) { |
|
246 |
mapRes.put(v.getKey(), typeConvert(v.getValue(), name, valueSchema)); |
|
247 |
} |
|
248 |
return mapRes; |
|
249 |
default: |
|
250 |
throw new IllegalArgumentException( |
|
251 |
"JsonConverter cannot handle type: " + schema.getType()); |
|
252 |
} |
|
253 |
throw new JsonConversionException(value, name, schema); |
|
254 |
} |
|
255 |
|
|
256 |
private boolean isNullableSchema(Schema schema) { |
|
257 |
return schema.getType().equals(Schema.Type.UNION) && |
|
258 |
schema.getTypes().size() == 2 && |
|
259 |
(schema.getTypes().get(0).getType().equals(Schema.Type.NULL) || |
|
260 |
schema.getTypes().get(1).getType().equals(Schema.Type.NULL)); |
|
261 |
} |
|
262 |
|
|
263 |
private Schema getNonNull(Schema schema) { |
|
264 |
List<Schema> types = schema.getTypes(); |
|
265 |
return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0); |
|
266 |
} |
|
267 |
|
|
268 |
private static class JsonConversionException extends RuntimeException { |
|
269 |
|
|
270 |
private Object value; |
|
271 |
private String fieldName; |
|
272 |
private Schema schema; |
|
273 |
|
|
274 |
public JsonConversionException(Object value, String fieldName, Schema schema) { |
|
275 |
this.value = value; |
|
276 |
this.fieldName = fieldName; |
|
277 |
this.schema = schema; |
|
278 |
} |
|
279 |
|
|
280 |
@Override |
|
281 |
public String toString() { |
|
282 |
return String.format("Type conversion error for field %s, %s for %s", |
|
283 |
fieldName, value, schema); |
|
284 |
} |
|
285 |
} |
|
286 |
} |
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/common/SchemaLoader.java | ||
---|---|---|
1 |
/** |
|
2 |
* Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. |
|
3 |
* |
|
4 |
* Cloudera, Inc. licenses this file to you under the Apache License, |
|
5 |
* Version 2.0 (the "License"). You may not use this file except in |
|
6 |
* compliance with the License. You may obtain a copy of the License at |
|
7 |
* |
|
8 |
* http://www.apache.org/licenses/LICENSE-2.0 |
|
9 |
* |
|
10 |
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
|
11 |
* CONDITIONS OF ANY KIND, either express or implied. See the License for |
|
12 |
* the specific language governing permissions and limitations under the |
|
13 |
* License. |
|
14 |
*/ |
|
15 |
package com.cloudera.science.avro.common; |
|
16 |
|
|
17 |
import java.io.IOException; |
|
18 |
import java.io.InputStream; |
|
19 |
import java.net.URL; |
|
20 |
|
|
21 |
import org.apache.avro.Schema; |
|
22 |
import org.apache.hadoop.conf.Configuration; |
|
23 |
import org.apache.hadoop.fs.FSDataInputStream; |
|
24 |
import org.apache.hadoop.fs.FileSystem; |
|
25 |
import org.apache.hadoop.fs.Path; |
|
26 |
|
|
27 |
|
|
28 |
import eu.dnetlib.iis.core.common.AvroUtils; |
|
29 |
/** |
|
30 |
* Some changes introduced in the original code by Mateusz Kobos |
|
31 |
*/ |
|
32 |
public class SchemaLoader { |
|
33 |
|
|
34 |
private final Configuration conf; |
|
35 |
private final Schema.Parser parser = new Schema.Parser(); |
|
36 |
|
|
37 |
public SchemaLoader(Configuration conf) { |
|
38 |
this.conf = conf; |
|
39 |
} |
|
40 |
|
|
41 |
public Schema load(String schemaJson, String schemaUrl, String typeName) throws IOException { |
|
42 |
if (schemaJson != null && !"none".equals(schemaJson)) { |
|
43 |
return loadLiteral(schemaJson); |
|
44 |
} else if (schemaUrl != null && !"none".equals(schemaUrl)) { |
|
45 |
return loadFromUrl(schemaUrl); |
|
46 |
} else if (typeName != null && !"none".equals(typeName)) { |
|
47 |
return loadFromTypeName(typeName); |
|
48 |
} else { |
|
49 |
throw new IllegalArgumentException("No valid schema information provided"); |
|
50 |
} |
|
51 |
} |
|
52 |
|
|
53 |
public Schema loadLiteral(String schemaJson) throws IOException { |
|
54 |
return parser.parse(schemaJson); |
|
55 |
} |
|
56 |
|
|
57 |
public Schema loadFromUrl(String schemaUrl) throws IOException { |
|
58 |
if (schemaUrl.toLowerCase().startsWith("hdfs://")) { |
|
59 |
FileSystem fs = FileSystem.get(conf); |
|
60 |
FSDataInputStream input = null; |
|
61 |
try { |
|
62 |
input = fs.open(new Path(schemaUrl)); |
|
63 |
return parser.parse(input); |
|
64 |
} finally { |
|
65 |
if (input != null) { |
|
66 |
input.close(); |
|
67 |
} |
|
68 |
} |
|
69 |
} else { |
|
70 |
InputStream is = null; |
|
71 |
try { |
|
72 |
is = new URL(schemaUrl).openStream(); |
|
73 |
return parser.parse(is); |
|
74 |
} finally { |
|
75 |
if (is != null) { |
|
76 |
is.close(); |
|
77 |
} |
|
78 |
} |
|
79 |
} |
|
80 |
} |
|
81 |
|
|
82 |
public Schema loadFromTypeName(String typeName) { |
|
83 |
return AvroUtils.toSchema(typeName); |
|
84 |
} |
|
85 |
} |
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONOutputFormat.java | ||
---|---|---|
1 |
/** |
|
2 |
* Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. |
|
3 |
* |
|
4 |
* Cloudera, Inc. licenses this file to you under the Apache License, |
|
5 |
* Version 2.0 (the "License"). You may not use this file except in |
|
6 |
* compliance with the License. You may obtain a copy of the License at |
|
7 |
* |
|
8 |
* http://www.apache.org/licenses/LICENSE-2.0 |
|
9 |
* |
|
10 |
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
|
11 |
* CONDITIONS OF ANY KIND, either express or implied. See the License for |
|
12 |
* the specific language governing permissions and limitations under the |
|
13 |
* License. |
|
14 |
*/ |
|
15 |
package com.cloudera.science.avro.streaming; |
|
16 |
|
|
17 |
import java.io.IOException; |
|
18 |
|
|
19 |
import org.apache.avro.Schema; |
|
20 |
import org.apache.avro.file.CodecFactory; |
|
21 |
import org.apache.avro.file.DataFileConstants; |
|
22 |
import org.apache.avro.file.DataFileWriter; |
|
23 |
import org.apache.avro.generic.GenericDatumWriter; |
|
24 |
import org.apache.avro.generic.GenericRecord; |
|
25 |
import org.apache.avro.mapred.AvroOutputFormat; |
|
26 |
import org.apache.avro.mapreduce.AvroJob; |
|
27 |
import org.apache.hadoop.fs.FileSystem; |
|
28 |
import org.apache.hadoop.fs.Path; |
|
29 |
import org.apache.hadoop.io.Text; |
|
30 |
import org.apache.hadoop.mapred.FileOutputFormat; |
|
31 |
import org.apache.hadoop.mapred.JobConf; |
|
32 |
import org.apache.hadoop.mapred.RecordWriter; |
|
33 |
import org.apache.hadoop.util.Progressable; |
|
34 |
|
|
35 |
import com.cloudera.science.avro.common.JsonConverter; |
|
36 |
import com.cloudera.science.avro.common.SchemaLoader; |
|
37 |
|
|
38 |
/** |
|
39 |
* Some changes introduced in the original code by Mateusz Kobos |
|
40 |
*/ |
|
41 |
public class AvroAsJSONOutputFormat extends FileOutputFormat<Text, Text> { |
|
42 |
public static final String SCHEMA_LITERAL = "output.schema.literal"; |
|
43 |
public static final String SCHEMA_URL = "output.schema.url"; |
|
44 |
public static final String SCHEMA_TYPE_NAME = "eu.dnetlib.iis.avro.output.class"; |
|
45 |
public static final String READ_KEY = "output.read.key"; |
|
46 |
|
|
47 |
private Schema schema; |
|
48 |
private JsonConverter converter; |
|
49 |
private boolean readKey = true; |
|
50 |
|
|
51 |
@Override |
|
52 |
public RecordWriter<Text, Text> getRecordWriter(FileSystem ignored, JobConf job, String name, |
|
53 |
Progressable progress) throws IOException { |
|
54 |
if (schema == null) { |
|
55 |
SchemaLoader loader = new SchemaLoader(job); |
|
56 |
this.schema = loader.load( |
|
57 |
job.get(SCHEMA_LITERAL), job.get(SCHEMA_URL), job.get(SCHEMA_TYPE_NAME)); |
|
58 |
this.converter = new JsonConverter(schema); |
|
59 |
this.readKey = job.getBoolean(READ_KEY, true); |
|
60 |
} |
|
61 |
|
|
62 |
DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>( |
|
63 |
new GenericDatumWriter<GenericRecord>(schema)); |
|
64 |
if (getCompressOutput(job)) { |
|
65 |
int level = job.getInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, AvroOutputFormat.DEFAULT_DEFLATE_LEVEL); |
|
66 |
String codecName = job.get(AvroJob.CONF_OUTPUT_CODEC, |
|
67 |
org.apache.avro.file.DataFileConstants.DEFLATE_CODEC); |
|
68 |
CodecFactory codec = codecName.equals(DataFileConstants.DEFLATE_CODEC) |
|
69 |
? CodecFactory.deflateCodec(level) |
|
70 |
: CodecFactory.fromString(codecName); |
|
71 |
writer.setCodec(codec); |
|
72 |
} |
|
73 |
writer.setSyncInterval(job.getInt(AvroOutputFormat.SYNC_INTERVAL_KEY, |
|
74 |
DataFileConstants.DEFAULT_SYNC_INTERVAL)); |
|
75 |
|
|
76 |
Path path = FileOutputFormat.getTaskOutputPath(job, name + AvroOutputFormat.EXT); |
|
77 |
writer.create(schema, path.getFileSystem(job).create(path)); |
|
78 |
|
|
79 |
return new AvroAsJSONRecordWriter(writer, converter, readKey); |
|
80 |
} |
|
81 |
} |
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONRecordWriter.java | ||
---|---|---|
1 |
/** |
|
2 |
* Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. |
|
3 |
* |
|
4 |
* Cloudera, Inc. licenses this file to you under the Apache License, |
|
5 |
* Version 2.0 (the "License"). You may not use this file except in |
|
6 |
* compliance with the License. You may obtain a copy of the License at |
|
7 |
* |
|
8 |
* http://www.apache.org/licenses/LICENSE-2.0 |
|
9 |
* |
|
10 |
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
|
11 |
* CONDITIONS OF ANY KIND, either express or implied. See the License for |
|
12 |
* the specific language governing permissions and limitations under the |
|
13 |
* License. |
|
14 |
*/ |
|
15 |
package com.cloudera.science.avro.streaming; |
|
16 |
|
|
17 |
import java.io.IOException; |
|
18 |
|
|
19 |
import org.apache.avro.file.DataFileWriter; |
|
20 |
import org.apache.avro.generic.GenericRecord; |
|
21 |
import org.apache.hadoop.io.Text; |
|
22 |
import org.apache.hadoop.mapred.RecordWriter; |
|
23 |
import org.apache.hadoop.mapred.Reporter; |
|
24 |
|
|
25 |
import com.cloudera.science.avro.common.JsonConverter; |
|
26 |
|
|
27 |
public class AvroAsJSONRecordWriter implements RecordWriter<Text, Text> { |
|
28 |
|
|
29 |
private final DataFileWriter<GenericRecord> writer; |
|
30 |
private final JsonConverter converter; |
|
31 |
private final boolean readKey; |
|
32 |
|
|
33 |
public AvroAsJSONRecordWriter(DataFileWriter<GenericRecord> writer, JsonConverter converter, boolean readKey) { |
|
34 |
this.writer = writer; |
|
35 |
this.converter = converter; |
|
36 |
this.readKey = readKey; |
|
37 |
} |
|
38 |
|
|
39 |
@Override |
|
40 |
public void write(Text key, Text value) throws IOException { |
|
41 |
writer.append(converter.convert(readKey ? key.toString() : value.toString())); |
|
42 |
} |
|
43 |
|
|
44 |
@Override |
|
45 |
public void close(Reporter reporter) throws IOException { |
|
46 |
writer.close(); |
|
47 |
} |
|
48 |
} |
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONRecordReader.java | ||
---|---|---|
1 |
/** |
|
2 |
* Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. |
|
3 |
* |
|
4 |
* Cloudera, Inc. licenses this file to you under the Apache License, |
|
5 |
* Version 2.0 (the "License"). You may not use this file except in |
|
6 |
* compliance with the License. You may obtain a copy of the License at |
|
7 |
* |
|
8 |
* http://www.apache.org/licenses/LICENSE-2.0 |
|
9 |
* |
|
10 |
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
|
11 |
* CONDITIONS OF ANY KIND, either express or implied. See the License for |
|
12 |
* the specific language governing permissions and limitations under the |
|
13 |
* License. |
|
14 |
*/ |
|
15 |
package com.cloudera.science.avro.streaming; |
|
16 |
|
|
17 |
import java.io.IOException; |
|
18 |
|
|
19 |
import org.apache.avro.Schema; |
|
20 |
import org.apache.avro.file.DataFileReader; |
|
21 |
import org.apache.avro.file.FileReader; |
|
22 |
import org.apache.avro.generic.GenericDatumReader; |
|
23 |
import org.apache.avro.generic.GenericRecord; |
|
24 |
import org.apache.avro.mapred.FsInput; |
|
25 |
import org.apache.hadoop.io.Text; |
|
26 |
import org.apache.hadoop.mapred.FileSplit; |
|
27 |
import org.apache.hadoop.mapred.JobConf; |
|
28 |
import org.apache.hadoop.mapred.RecordReader; |
|
29 |
|
|
30 |
public class AvroAsJSONRecordReader implements RecordReader<Text, Text> { |
|
31 |
|
|
32 |
private FileReader<GenericRecord> reader; |
|
33 |
private GenericRecord datum; |
|
34 |
private long start; |
|
35 |
private long end; |
|
36 |
|
|
37 |
public AvroAsJSONRecordReader(Schema schema, JobConf job, FileSplit split) throws IOException { |
|
38 |
this(DataFileReader.openReader(new FsInput(split.getPath(), job), |
|
39 |
new GenericDatumReader<GenericRecord>(schema)), split); |
|
40 |
} |
|
41 |
|
|
42 |
protected AvroAsJSONRecordReader(FileReader<GenericRecord> reader, FileSplit split) |
|
43 |
throws IOException { |
|
44 |
this.reader = reader; |
|
45 |
reader.sync(split.getStart()); |
|
46 |
this.start = reader.tell(); |
|
47 |
this.end = split.getStart() + split.getLength(); |
|
48 |
} |
|
49 |
|
|
50 |
@Override |
|
51 |
public Text createKey() { |
|
52 |
return new Text(); |
|
53 |
} |
|
54 |
|
|
55 |
@Override |
|
56 |
public Text createValue() { |
|
57 |
return new Text(); |
|
58 |
} |
|
59 |
|
|
60 |
@Override |
|
61 |
public long getPos() throws IOException { |
|
62 |
return reader.tell(); |
|
63 |
} |
|
64 |
|
|
65 |
@Override |
|
66 |
public float getProgress() throws IOException { |
|
67 |
if (end == start) { |
|
68 |
return 0.0f; |
|
69 |
} else { |
|
70 |
return Math.min(1.0f, (getPos() - start) / (float)(end - start)); |
|
71 |
} |
|
72 |
} |
|
73 |
|
|
74 |
@Override |
|
75 |
public void close() throws IOException { |
|
76 |
reader.close(); |
|
77 |
} |
|
78 |
|
|
79 |
@Override |
|
80 |
public boolean next(Text key, Text value) throws IOException { |
|
81 |
if (!reader.hasNext() || reader.pastSync(end)) { |
|
82 |
return false; |
|
83 |
} |
|
84 |
datum = reader.next(datum); |
|
85 |
key.set(datum.toString()); |
|
86 |
return true; |
|
87 |
} |
|
88 |
} |
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONInputFormat.java | ||
---|---|---|
1 |
/** |
|
2 |
* Copyright (c) 2013, Cloudera, Inc. All Rights Reserved. |
|
3 |
* |
|
4 |
* Cloudera, Inc. licenses this file to you under the Apache License, |
|
5 |
* Version 2.0 (the "License"). You may not use this file except in |
|
6 |
* compliance with the License. You may obtain a copy of the License at |
|
7 |
* |
|
8 |
* http://www.apache.org/licenses/LICENSE-2.0 |
|
9 |
* |
|
10 |
* This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR |
|
11 |
* CONDITIONS OF ANY KIND, either express or implied. See the License for |
|
12 |
* the specific language governing permissions and limitations under the |
|
13 |
* License. |
|
14 |
*/ |
|
15 |
package com.cloudera.science.avro.streaming; |
|
16 |
|
|
17 |
import java.io.IOException; |
|
18 |
import java.util.List; |
|
19 |
|
|
20 |
import org.apache.avro.Schema; |
|
21 |
import org.apache.hadoop.fs.Path; |
|
22 |
import org.apache.hadoop.io.Text; |
|
23 |
import org.apache.hadoop.mapred.FileInputFormat; |
|
24 |
import org.apache.hadoop.mapred.FileSplit; |
|
25 |
import org.apache.hadoop.mapred.InputSplit; |
|
26 |
import org.apache.hadoop.mapred.JobConf; |
|
27 |
import org.apache.hadoop.mapred.RecordReader; |
|
28 |
import org.apache.hadoop.mapred.Reporter; |
|
29 |
|
|
30 |
import com.cloudera.science.avro.common.SchemaLoader; |
|
31 |
import com.google.common.base.Strings; |
|
32 |
import com.google.common.collect.Lists; |
|
33 |
|
|
34 |
/** |
|
35 |
* Some changes introduced in the original code by Mateusz Kobos |
|
36 |
*/ |
|
37 |
public class AvroAsJSONInputFormat extends FileInputFormat<Text, Text> { |
|
38 |
public static final String SCHEMA_LITERAL = "input.schema.literal"; |
|
39 |
public static final String SCHEMA_URL = "input.schema.url"; |
|
40 |
public static final String SCHEMA_TYPE_NAME = "eu.dnetlib.iis.avro.input.class"; |
|
41 |
|
|
42 |
private List<Schema> schemas; |
|
43 |
private String[] inputPaths; |
|
44 |
|
|
45 |
@Override |
|
46 |
public RecordReader<Text, Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) |
|
47 |
throws IOException { |
|
48 |
if (schemas == null) { |
|
49 |
loadSchemas(job); |
|
50 |
} |
|
51 |
FileSplit fs = (FileSplit) split; |
|
52 |
Schema schema = null; |
|
53 |
if (schemas.size() == 1) { |
|
54 |
schema = schemas.get(0); |
|
55 |
} else { |
|
56 |
// Need to figure out which schema we're loading |
|
57 |
String current = fs.getPath().toString(); |
|
58 |
int index = -1; |
|
59 |
int bestMatchLength = -1; |
|
60 |
for (int i = 0; i < inputPaths.length; i++) { |
|
61 |
int match = Strings.commonPrefix(current, inputPaths[i]).length(); |
|
62 |
if (match > bestMatchLength) { |
|
63 |
bestMatchLength = match; |
|
64 |
index = i; |
|
65 |
} |
|
66 |
} |
|
67 |
schema = schemas.get(index); |
|
68 |
} |
|
69 |
return new AvroAsJSONRecordReader(schema, job, fs); |
|
70 |
} |
|
71 |
|
|
72 |
private void loadSchemas(JobConf job) throws IOException { |
|
73 |
this.schemas = Lists.newArrayList(); |
|
74 |
SchemaLoader loader = new SchemaLoader(job); |
|
75 |
String schemaLiteral = job.get(SCHEMA_LITERAL); |
|
76 |
if (schemaLiteral != null) { |
|
77 |
schemas.add(loader.loadLiteral(schemaLiteral)); |
|
78 |
return; |
|
79 |
} else { |
|
80 |
String[] schemaUrls = job.getStrings(SCHEMA_URL); |
|
81 |
String[] typeNames = job.getStrings(SCHEMA_TYPE_NAME); |
|
82 |
if (schemaUrls != null) { |
|
83 |
for (String schemaUrl : schemaUrls) { |
|
84 |
schemas.add(loader.loadFromUrl(schemaUrl)); |
|
85 |
} |
|
86 |
} else if (typeNames != null) { |
|
87 |
for (String typeName : typeNames) { |
|
88 |
schemas.add(loader.loadFromTypeName(typeName)); |
|
89 |
} |
|
90 |
} else { |
|
91 |
throw new IllegalArgumentException("No schema information provided"); |
|
92 |
} |
|
93 |
|
|
94 |
if (schemas.size() > 1) { |
|
95 |
// Need to track input paths |
|
96 |
Path[] inputs = FileInputFormat.getInputPaths(job); |
|
97 |
if (inputs.length != schemas.size()) { |
|
98 |
throw new IllegalArgumentException(String.format( |
|
99 |
"Number of input paths (%d) does not match number of schemas specified (%d)", |
|
100 |
inputs.length, schemas.size())); |
|
101 |
} |
|
102 |
this.inputPaths = new String[inputs.length]; |
|
103 |
for (int i = 0; i < inputs.length; i++) { |
|
104 |
inputPaths[i] = inputs[i].toString(); |
|
105 |
} |
|
106 |
} |
|
107 |
} |
|
108 |
} |
|
109 |
} |
modules/icm-iis-3rdparty-avro-json/trunk/pom.xml | ||
---|---|---|
1 |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> |
|
2 |
<!-- |
|
3 |
Licensed to the Apache Software Foundation (ASF) under one |
|
4 |
or more contributor license agreements. See the NOTICE file |
|
5 |
distributed with this work for additional information |
|
6 |
regarding copyright ownership. The ASF licenses this file |
|
7 |
to you under the Apache License, Version 2.0 (the |
|
8 |
"License"); you may not use this file except in compliance |
|
9 |
with the License. You may obtain a copy of the License at |
|
10 |
|
|
11 |
http://www.apache.org/licenses/LICENSE-2.0 |
|
12 |
|
|
13 |
Unless required by applicable law or agreed to in writing, software |
|
14 |
distributed under the License is distributed on an "AS IS" BASIS, |
|
15 |
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
16 |
See the License for the specific language governing permissions and |
|
17 |
limitations under the License. |
|
18 |
--> |
|
19 |
<parent> |
|
20 |
<groupId>eu.dnetlib</groupId> |
|
21 |
<artifactId>dnet-parent</artifactId> |
|
22 |
<version>1.0.0</version> |
|
23 |
</parent> |
|
24 |
|
|
25 |
<modelVersion>4.0.0</modelVersion> |
|
26 |
|
|
27 |
<artifactId>icm-iis-3rdparty-avro-json</artifactId> |
|
28 |
<version>1.2-SNAPSHOT</version> |
|
29 |
<packaging>jar</packaging> |
|
30 |
|
|
31 |
<scm> |
|
32 |
<developerConnection> |
|
33 |
scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet40/modules/icm-iis-3rdparty-avro-json/trunk |
|
34 |
</developerConnection> |
|
35 |
</scm> |
|
36 |
|
|
37 |
<name>Avro/JSON InputFormat/OutputFormats and SerDes</name> |
|
38 |
<url>http://www.cloudera.com</url> |
|
39 |
|
|
40 |
<properties> |
|
41 |
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> |
|
42 |
<eclipse.output.directory>eclipse-classes</eclipse.output.directory> |
|
43 |
</properties> |
|
44 |
|
|
45 |
<build> |
|
46 |
<plugins> |
|
47 |
<plugin> |
|
48 |
<groupId>org.apache.maven.plugins</groupId> |
|
49 |
<artifactId>maven-eclipse-plugin</artifactId> |
|
50 |
<version>2.9</version> |
|
51 |
<configuration> |
|
52 |
<buildOutputDirectory>eclipse-classes</buildOutputDirectory> |
|
53 |
<downloadSources>true</downloadSources> |
|
54 |
<downloadJavadocs>false</downloadJavadocs> |
|
55 |
</configuration> |
|
56 |
</plugin> |
|
57 |
|
|
58 |
</plugins> |
|
59 |
</build> |
|
60 |
|
|
61 |
<dependencies> |
|
62 |
<dependency> |
|
63 |
<groupId>eu.dnetlib</groupId> |
|
64 |
<artifactId>icm-iis-core</artifactId> |
|
65 |
<version>[1.0.0,2.0.0)</version> |
|
66 |
</dependency> |
|
67 |
<dependency> |
|
68 |
<groupId>junit</groupId> |
|
69 |
<artifactId>junit</artifactId> |
|
70 |
<version>4.8.2</version> |
|
71 |
<scope>test</scope> |
|
72 |
</dependency> |
|
73 |
<dependency> |
|
74 |
<groupId>org.apache.avro</groupId> |
|
75 |
<artifactId>avro</artifactId> |
|
76 |
<version>${iis.avro.version}</version> |
|
77 |
</dependency> |
|
78 |
|
|
79 |
<dependency> |
|
80 |
<groupId>org.apache.avro</groupId> |
|
81 |
<artifactId>avro-mapred</artifactId> |
|
82 |
<version>${iis.avro.version}</version> |
|
83 |
<classifier>hadoop2</classifier> |
|
84 |
</dependency> |
|
85 |
|
|
86 |
<!-- Hadoop Dependencies --> |
|
87 |
<dependency> |
|
88 |
<groupId>org.apache.hive</groupId> |
|
89 |
<artifactId>hive-serde</artifactId> |
|
90 |
<version>${iis.hive.version}</version> |
|
91 |
<scope>provided</scope> |
|
92 |
</dependency> |
|
93 |
<dependency> |
|
94 |
<groupId>org.apache.hadoop</groupId> |
|
95 |
<artifactId>hadoop-client</artifactId> |
|
96 |
<version>${iis.hadoop.version}</version> |
|
97 |
<scope>provided</scope> |
|
98 |
</dependency> |
|
99 |
</dependencies> |
|
100 |
|
|
101 |
<repositories> |
|
102 |
<repository> |
|
103 |
<id>cloudera</id> |
|
104 |
<url>https://repository.cloudera.com/artifactory/cloudera-repos</url> |
|
105 |
<releases> |
|
106 |
<enabled>true</enabled> |
|
107 |
</releases> |
|
108 |
<snapshots> |
|
109 |
<enabled>false</enabled> |
|
110 |
</snapshots> |
|
111 |
</repository> |
|
112 |
</repositories> |
|
113 |
</project> |
modules/icm-iis-3rdparty-avro-json/trunk/deploy.info.disabled | ||
---|---|---|
1 |
{ |
|
2 |
"type_source": "SVN", |
|
3 |
"goal": "package -U -T 4C source:jar", |
|
4 |
"url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet40/modules/icm-iis-3rdparty-avro-json/trunk/", |
|
5 |
"deploy_repository": "dnet4-snapshots", |
|
6 |
"version": "4", |
|
7 |
"mail": "m.horst@icm.edu.pl,m.kobos@icm.edu.pl", |
|
8 |
"deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet4-snapshots", |
|
9 |
"name": "icm-iis-3rdparty-avro-json" |
|
10 |
} |
modules/icm-iis-3rdparty-avro-json/trunk/README.markdown | ||
---|---|---|
1 |
This project is a patched version of |
|
2 |
|
|
3 |
**Cloudera's Avro-JSON proxy for Hadoop Streaming** |
|
4 |
|
|
5 |
The goal of this patch is to introduce a way of passing fully qualified names of classes that define the input and output schemas in the Oozie description. The default approach of this library is to accept either a full schema given in XML property or a path to a file stored somewhere in the filesystem. |
|
6 |
|
|
7 |
See the parent directory of this directory for: |
|
8 |
|
|
9 |
- the original source, |
|
10 |
- scripts for downloading the original source and, |
|
11 |
- scripts for applying the patch in order to create this project. |
Also available in: Unified diff
introducing dnet45 version of IIS CDH4 legacy libs