Project

General

Profile

« Previous | Next » 

Revision 47545

introducing dnet45 version of IIS CDH4 legacy libs

View differences:

modules/icm-iis-3rdparty-avro-json/trunk/pom.xml.disabled
1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2
  <!--
3
  Licensed to the Apache Software Foundation (ASF) under one
4
  or more contributor license agreements.  See the NOTICE file
5
  distributed with this work for additional information
6
  regarding copyright ownership.  The ASF licenses this file
7
  to you under the Apache License, Version 2.0 (the
8
  "License"); you may not use this file except in compliance
9
  with the License.  You may obtain a copy of the License at
10

  
11
       http://www.apache.org/licenses/LICENSE-2.0
12

  
13
  Unless required by applicable law or agreed to in writing, software
14
  distributed under the License is distributed on an "AS IS" BASIS,
15
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
  See the License for the specific language governing permissions and
17
  limitations under the License.
18
  -->
19
	<parent>
20
    		<groupId>eu.dnetlib</groupId>
21
	        <artifactId>dnet-hadoop-parent</artifactId>
22
            <version>1.0.0</version>
23
	</parent>
24
  
25
  <modelVersion>4.0.0</modelVersion>
26

  
27
  <artifactId>icm-iis-3rdparty-avro-json</artifactId>
28
  <version>1.1-SNAPSHOT</version>
29
  <packaging>jar</packaging>
30

  
31
  <scm>
32
    <developerConnection>
33
      scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet40/modules/icm-iis-3rdparty-avro-json/trunk
34
    </developerConnection>
35
  </scm>
36

  
37
  <name>Avro/JSON InputFormat/OutputFormats and SerDes</name>
38
  <url>http://www.cloudera.com</url>
39

  
40
  <properties>
41
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
42
    <eclipse.output.directory>eclipse-classes</eclipse.output.directory>
43
  </properties>
44

  
45
  <build>
46
    <plugins>
47
      <plugin>
48
        <groupId>org.apache.maven.plugins</groupId>
49
        <artifactId>maven-eclipse-plugin</artifactId>
50
        <version>2.9</version>
51
        <configuration>
52
          <buildOutputDirectory>eclipse-classes</buildOutputDirectory>
53
          <downloadSources>true</downloadSources>
54
          <downloadJavadocs>false</downloadJavadocs>
55
        </configuration>
56
      </plugin>
57

  
58
    </plugins>
59
  </build>
60

  
61
  <dependencies>
62
    <dependency>
63
      <groupId>eu.dnetlib</groupId>
64
      <artifactId>icm-iis-core</artifactId>
65
      <version>[1.0.0,2.0.0)</version>
66
    </dependency>
67
    <dependency>
68
      <groupId>junit</groupId>
69
      <artifactId>junit</artifactId>
70
      <version>4.8.2</version>
71
      <scope>test</scope>      
72
    </dependency>
73
    <dependency>
74
      <groupId>org.apache.avro</groupId>
75
      <artifactId>avro</artifactId>
76
      <version>${iis.avro.version}</version>
77
    </dependency>
78

  
79
    <dependency>
80
      <groupId>org.apache.avro</groupId>
81
      <artifactId>avro-mapred</artifactId>
82
      <version>${iis.avro.version}</version>
83
      <classifier>hadoop2</classifier>
84
    </dependency>
85

  
86
    <!-- Hadoop Dependencies -->
87
    <dependency>
88
      <groupId>org.apache.hive</groupId>
89
      <artifactId>hive-serde</artifactId>
90
      <version>${iis.hive.version}</version>
91
      <scope>provided</scope>
92
    </dependency>
93
    <dependency>
94
      <groupId>org.apache.hadoop</groupId>
95
      <artifactId>hadoop-client</artifactId>
96
      <version>${iis.hadoop.version}</version>
97
      <scope>provided</scope>
98
    </dependency>
99
  </dependencies>
100

  
101
	<repositories>
102
		<repository>
103
			<id>cloudera</id>
104
			<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
105
			<releases>
106
				<enabled>true</enabled>
107
			</releases>
108
			<snapshots>
109
				<enabled>false</enabled>
110
			</snapshots>
111
		</repository>
112
  </repositories>
113
</project>
modules/icm-iis-3rdparty-avro-json/trunk/src/test/java/com/cloudera/science/avro/common/JsonConverterTest.java
1
/**
2
 * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
3
 *
4
 * Cloudera, Inc. licenses this file to you under the Apache License,
5
 * Version 2.0 (the "License"). You may not use this file except in
6
 * compliance with the License. You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11
 * CONDITIONS OF ANY KIND, either express or implied. See the License for
12
 * the specific language governing permissions and limitations under the
13
 * License.
14
 */
15
package com.cloudera.science.avro.common;
16

  
17
import static org.junit.Assert.assertEquals;
18

  
19
import java.io.IOException;
20
import java.util.Arrays;
21

  
22
import org.apache.avro.Schema;
23
import org.apache.avro.Schema.Type;
24
import org.apache.avro.generic.GenericRecord;
25
import org.junit.Test;
26

  
27
import com.google.common.collect.ImmutableList;
28

  
29
public class JsonConverterTest {
30

  
31
  private static Schema.Field sf(String name, Schema schema) {
32
    return new Schema.Field(name, schema, "", null);
33
  }
34
  
35
  private static Schema.Field sf(String name, Schema.Type type) {
36
    return sf(name, sc(type));
37
  }
38
  
39
  private static Schema sc(Schema.Type type) {
40
    return Schema.create(type);
41
  }
42
  
43
  private static Schema sr(Schema.Field... fields) {
44
    return Schema.createRecord(Arrays.asList(fields));
45
  }
46
  
47
  Schema.Field f1 = sf("field1", Type.LONG);
48
  Schema.Field f2 = sf("field2", Schema.createArray(sc(Type.BOOLEAN)));
49
  Schema.Field f3Map = sf("field3", Schema.createMap(sc(Type.STRING)));
50
  Schema.Field f3Rec = sf("field3", sr(sf("key", Type.STRING)));
51
  
52
  @Test
53
  public void testBasicWithMap() throws Exception {
54
    JsonConverter jc = new JsonConverter(sr(f1, f2, f3Map));
55
    String json = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}}";
56
    GenericRecord r = jc.convert(json);
57
    assertEquals(json, r.toString());
58
  }
59
  
60
  @Test
61
  public void testBasicWithRecord() throws Exception {
62
    JsonConverter jc = new JsonConverter(sr(f1, f2, f3Rec));
63
    String json = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}}";
64
    GenericRecord r = jc.convert(json);
65
    assertEquals(json, r.toString());
66
  }
67
  
68
  @Test(expected=IllegalArgumentException.class)
69
  public void testMissingRequiredField() throws Exception {
70
    JsonConverter jc = new JsonConverter(sr(f1, f2, f3Rec));
71
    String json = "{\"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}}";
72
    jc.convert(json);
73
  }
74
  
75
  @Test
76
  public void testMissingNullableField() throws Exception {
77
    Schema optional = Schema.createUnion(
78
        ImmutableList.of(Schema.create(Type.NULL), Schema.create(Type.DOUBLE)));
79
    Schema.Field f4 = sf("field4", optional);
80
    JsonConverter jc = new JsonConverter(sr(f1, f2, f3Rec, f4));
81
    String json = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}}";
82
    GenericRecord r = jc.convert(json);
83
    String expect = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}, \"field4\": null}";
84
    assertEquals(expect, r.toString());
85
  }
86
  
87
  @Test
88
  public void testMissingNullableArrayField() throws Exception {
89
	  Schema.Parser parser = new Schema.Parser();
90
      Schema schemaOptionalArray = parser.parse("[\"null\", {\"type\": \"array\", \"items\": \"int\"}]");
91
      Schema.Field f4 = sf("field4", schemaOptionalArray);
92
      JsonConverter jc = new JsonConverter(sr(f1, f2, f3Rec, f4));
93
      String json = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}}";
94
      GenericRecord r = jc.convert(json);
95
      String expect = "{\"field1\": 1729, \"field2\": [true, true, false], \"field3\": {\"key\": \"value\"}, \"field4\": null}";
96
      assertEquals(expect, r.toString());	  
97
  }
98
}
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/serde/AvroAsJSONSerde.java
1
/**
2
 * Copyright (c) 2012, Cloudera, Inc. All Rights Reserved.
3
 *
4
 * Cloudera, Inc. licenses this file to you under the Apache License,
5
 * Version 2.0 (the "License"). You may not use this file except in
6
 * compliance with the License. You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11
 * CONDITIONS OF ANY KIND, either express or implied. See the License for
12
 * the specific language governing permissions and limitations under the
13
 * License.
14
 */
15
package com.cloudera.science.avro.serde;
16

  
17
import java.io.IOException;
18
import java.util.List;
19
import java.util.Properties;
20

  
21
import org.apache.avro.Schema;
22
import org.apache.hadoop.conf.Configuration;
23
import org.apache.hadoop.hive.serde.Constants;
24
import org.apache.hadoop.hive.serde2.SerDe;
25
import org.apache.hadoop.hive.serde2.SerDeException;
26
import org.apache.hadoop.hive.serde2.SerDeStats;
27
import org.apache.hadoop.hive.serde2.avro.AvroGenericRecordWritable;
28
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
29
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
30
import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
31
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
32
import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
33
import org.apache.hadoop.io.Text;
34
import org.apache.hadoop.io.Writable;
35

  
36
import com.cloudera.science.avro.common.JsonConverter;
37
import com.cloudera.science.avro.common.SchemaLoader;
38
import com.google.common.collect.ImmutableList;
39
import com.google.common.collect.Lists;
40

  
41
public class AvroAsJSONSerde implements SerDe {
42
  // Follow convention of AvroSerDe
43
  public static final String SCHEMA_LITERAL = "avro.schema.literal";
44
  public static final String SCHEMA_URL = "avro.schema.url";
45
  public static final String SCHEMA_TYPE_NAME = "avro.schema.class";
46
  
47
  private Schema schema;
48
  private JsonConverter converter;
49
  private AvroGenericRecordWritable agrw = new AvroGenericRecordWritable();
50
  private List<Object> row = Lists.newArrayList();
51
  private ObjectInspector oi;
52
  
53
  @Override
54
  public void initialize(Configuration conf, Properties tbl) throws SerDeException {
55
    SchemaLoader loader = new SchemaLoader(conf);
56
    try {
57
      this.schema = loader.load(tbl.getProperty(SCHEMA_LITERAL), 
58
        tbl.getProperty(SCHEMA_URL), tbl.getProperty(SCHEMA_TYPE_NAME));
59
    } catch (IOException e) {
60
      throw new SerDeException(e);
61
    }
62
    this.converter = new JsonConverter(schema);
63
    row.add("");
64
    
65
    String colName = tbl.getProperty(Constants.LIST_COLUMNS);
66
    if (colName == null || colName.isEmpty()) {
67
      colName = "json"; // use a default
68
    }
69
    
70
    this.oi = ObjectInspectorFactory.getStandardStructObjectInspector(
71
        ImmutableList.of(colName),
72
        ImmutableList.<ObjectInspector>of(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
73
  }
74

  
75
  @Override
76
  public Object deserialize(Writable blob) throws SerDeException {
77
    row.set(0, ((AvroGenericRecordWritable) blob).getRecord().toString());
78
    return row;
79
  }
80

  
81
  @Override
82
  public ObjectInspector getObjectInspector() throws SerDeException {
83
    return oi;
84
  }
85

  
86
  @Override
87
  public SerDeStats getSerDeStats() {
88
    return null;
89
  }
90

  
91
  @Override
92
  public Class<? extends Writable> getSerializedClass() {
93
    return Text.class;
94
  }
95

  
96
  @Override
97
  public Writable serialize(Object obj, ObjectInspector oi) throws SerDeException {
98
    StructObjectInspector soi = (StructObjectInspector) oi;
99
    List<Object> data = soi.getStructFieldsDataAsList(soi);
100
    StringObjectInspector foi = (StringObjectInspector) 
101
        soi.getAllStructFieldRefs().get(0).getFieldObjectInspector();
102
      try {
103
        agrw.setRecord(converter.convert(foi.getPrimitiveJavaObject(data.get(0))));
104
      } catch (IOException e) {
105
      throw new SerDeException(e);
106
    }
107
    return agrw;
108
  }
109
}
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/common/JsonConverter.java
1
/**
2
 * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
3
 *
4
 * Cloudera, Inc. licenses this file to you under the Apache License,
5
 * Version 2.0 (the "License"). You may not use this file except in
6
 * compliance with the License. You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11
 * CONDITIONS OF ANY KIND, either express or implied. See the License for
12
 * the specific language governing permissions and limitations under the
13
 * License.
14
 */
15
package com.cloudera.science.avro.common;
16

  
17
import java.io.IOException;
18
import java.util.ArrayList;
19
import java.util.HashMap;
20
import java.util.List;
21
import java.util.Map;
22
import java.util.Set;
23

  
24
import org.apache.avro.Schema;
25
import org.apache.avro.generic.GenericData;
26
import org.apache.avro.generic.GenericRecord;
27
import org.apache.commons.logging.Log;
28
import org.apache.commons.logging.LogFactory;
29
import org.codehaus.jackson.JsonNode;
30
import org.codehaus.jackson.map.ObjectMapper;
31

  
32
import com.google.common.collect.ImmutableSet;
33
import com.google.common.collect.Lists;
34
import com.google.common.collect.Maps;
35
import com.google.common.collect.Sets;
36

  
37
/**
38
 *
39
 */
40
public class JsonConverter {
41
  private static final Log LOG = LogFactory.getLog(JsonConverter.class);
42
  private static final Set<Schema.Type> SUPPORTED_TYPES = ImmutableSet.of(
43
      Schema.Type.RECORD, Schema.Type.ARRAY, Schema.Type.MAP,
44
      Schema.Type.INT, Schema.Type.LONG, Schema.Type.BOOLEAN,
45
      Schema.Type.FLOAT, Schema.Type.DOUBLE, Schema.Type.STRING);
46
  
47
  private final ObjectMapper mapper = new ObjectMapper();
48
  private final Schema baseSchema;
49
  private int logMessageCounter = 0;
50
  
51
  public JsonConverter(Schema schema) {
52
    this.baseSchema = checkSchema(schema, true);
53
  }
54
  
55
  private Schema checkSchema(Schema schema, boolean mustBeRecord) {
56
    if (!mustBeRecord) {
57
      if (!SUPPORTED_TYPES.contains(schema.getType())) {
58
        throw new IllegalArgumentException("Unsupported type: " + schema.getType());
59
      }
60
      if (schema.getType() != Schema.Type.RECORD) {
61
        return schema;
62
      }
63
    }
64
    for (Schema.Field f : schema.getFields()) {
65
      Schema fs = f.schema();
66
      if (isNullableSchema(fs)) {
67
        fs = getNonNull(fs);
68
      }
69
      Schema.Type st = fs.getType();
70
      if (!SUPPORTED_TYPES.contains(st)) {
71
        throw new IllegalArgumentException(String.format(
72
            "Unsupported type '%s' for field '%s'", st.toString(), f.name()));
73
      }
74
      switch (st) {
75
      case RECORD:
76
        checkSchema(fs, true);
77
        break;
78
      case MAP:
79
        checkSchema(fs.getValueType(), false);
80
        break;
81
      case ARRAY:
82
        checkSchema(fs.getElementType(), false);
83
        default:
84
          break; // No need to check primitives
85
      }
86
    }
87
    return schema;
88
  }
89
  
90
  public GenericRecord convert(String json) throws IOException {
91
    return convert(mapper.readValue(json, Map.class), baseSchema);
92
  }
93
  
94
  private GenericRecord convert(Map<String, Object> raw, Schema schema)
95
      throws IOException {
96
    GenericRecord result = new GenericData.Record(schema);
97
    Set<String> usedFields = Sets.newHashSet();
98
    for (Schema.Field f : schema.getFields()) {
99
      String name = f.name();
100
      if (raw.containsKey(name)) {
101
        result.put(f.pos(), typeConvert(raw.get(name), name, f.schema()));
102
        usedFields.add(name);
103
      } else {
104
        JsonNode defaultValue = f.defaultValue();
105
        if (defaultValue == null) {
106
          if (isNullableSchema(f.schema())) {
107
            result.put(f.pos(), null);
108
          } else {
109
            throw new IllegalArgumentException(
110
                "No default value provided for non-nullable field: " + f.name());
111
          }
112
        } else {
113
          Schema fieldSchema = f.schema();
114
          if (isNullableSchema(fieldSchema)) {
115
            fieldSchema = getNonNull(fieldSchema);
116
          }
117
          Object value = null;
118
          switch (fieldSchema.getType()) {
119
          case BOOLEAN:
120
            value = defaultValue.getValueAsBoolean();
121
            break;
122
          case DOUBLE:
123
            value = defaultValue.getValueAsDouble();
124
            break;
125
          case FLOAT:
126
            value = (float) defaultValue.getValueAsDouble();
127
            break;
128
          case INT:
129
            value = defaultValue.getValueAsInt();
130
            break;
131
          case LONG:
132
            value = defaultValue.getValueAsLong();
133
            break;
134
          case STRING:
135
            value = defaultValue.getValueAsText();
136
            break;
137
          case MAP:
138
            Map<String, Object> fieldMap = mapper.readValue(
139
                defaultValue.getValueAsText(), Map.class);
140
            Map<String, Object> mvalue = Maps.newHashMap();
141
            for (Map.Entry<String, Object> e : fieldMap.entrySet()) {
142
              mvalue.put(e.getKey(),
143
                  typeConvert(e.getValue(), name, fieldSchema.getValueType()));
144
            }
145
            value = mvalue;
146
            break;
147
          case ARRAY:
148
            List fieldArray = mapper.readValue(
149
                defaultValue.getValueAsText(), List.class);
150
            List lvalue = Lists.newArrayList();
151
            for (Object elem : fieldArray) {
152
              lvalue.add(typeConvert(elem, name, fieldSchema.getElementType()));
153
            }
154
            value = lvalue;
155
            break;
156
          case RECORD:
157
            Map<String, Object> fieldRec = mapper.readValue(
158
                defaultValue.getValueAsText(), Map.class);
159
            value = convert(fieldRec, fieldSchema);
160
            break;
161
            default:
162
              throw new IllegalArgumentException(
163
                  "JsonConverter cannot handle type: " + fieldSchema.getType());
164
          }
165
          result.put(f.pos(), value);
166
        }
167
      }
168
    }
169
    
170
    if (usedFields.size() < raw.size()) {
171
      // Log a notification about unused fields
172
      if (logMessageCounter % 1000 == 0) {
173
        LOG.warn("Ignoring unused JSON fields: " + Sets.difference(raw.keySet(), usedFields));
174
      }
175
      logMessageCounter++;
176
    }
177
    
178
    return result;
179
  }
180
  
181
  private Object typeConvert(Object value, String name, Schema schema) throws IOException {
182
    if (isNullableSchema(schema)) {
183
      if (value == null) {
184
        return null;
185
      } else {
186
        schema = getNonNull(schema);
187
      }
188
    } else if (value == null) {
189
      // Always fail on null for non-nullable schemas
190
      throw new JsonConversionException(value, name, schema);
191
    }
192
    
193
    switch (schema.getType()) {
194
    case BOOLEAN:
195
      if (value instanceof Boolean) {
196
        return (Boolean) value;
197
      } else if (value instanceof String) {
198
        return Boolean.valueOf((String) value);
199
      } else if (value instanceof Number) {
200
        return ((Number) value).intValue() == 0 ? Boolean.FALSE : Boolean.TRUE;
201
      }
202
      break;
203
    case DOUBLE:
204
      if (value instanceof Number) {
205
        return ((Number) value).doubleValue();
206
      } else if (value instanceof String) {
207
        return Double.valueOf((String) value);
208
      }
209
      break;
210
    case FLOAT:
211
      if (value instanceof Number) {
212
        return ((Number) value).floatValue();
213
      } else if (value instanceof String) {
214
        return Float.valueOf((String) value);
215
      }
216
      break;
217
    case INT:
218
      if (value instanceof Number) {
219
        return ((Number) value).intValue();
220
      } else if (value instanceof String) {
221
        return Integer.valueOf((String) value);
222
      }
223
      break;
224
    case LONG:
225
      if (value instanceof Number) {
226
        return ((Number) value).longValue();
227
      } else if (value instanceof String) {
228
        return Long.valueOf((String) value);
229
      }
230
      break;
231
    case STRING:
232
      return value.toString();
233
    case RECORD:
234
      return convert((Map<String, Object>) value, schema);
235
    case ARRAY:
236
      Schema elementSchema = schema.getElementType();
237
      List listRes = new ArrayList();
238
      for (Object v : (List) value) {
239
        listRes.add(typeConvert(v, name, elementSchema));
240
      }
241
      return listRes;
242
    case MAP:
243
      Schema valueSchema = schema.getValueType();
244
      Map<String, Object> mapRes = new HashMap<String, Object>();
245
      for (Map.Entry<String, Object> v : ((Map<String, Object>) value).entrySet()) {
246
        mapRes.put(v.getKey(), typeConvert(v.getValue(), name, valueSchema));
247
      }
248
      return mapRes;
249
      default:
250
        throw new IllegalArgumentException(
251
            "JsonConverter cannot handle type: " + schema.getType());
252
    }
253
    throw new JsonConversionException(value, name, schema);
254
  }
255
  
256
  private boolean isNullableSchema(Schema schema) {
257
    return schema.getType().equals(Schema.Type.UNION) &&
258
        schema.getTypes().size() == 2 &&
259
        (schema.getTypes().get(0).getType().equals(Schema.Type.NULL) ||
260
         schema.getTypes().get(1).getType().equals(Schema.Type.NULL));
261
  }
262
  
263
  private Schema getNonNull(Schema schema) {
264
    List<Schema> types = schema.getTypes();
265
    return types.get(0).getType().equals(Schema.Type.NULL) ? types.get(1) : types.get(0);
266
  }
267
  
268
  private static class JsonConversionException extends RuntimeException {
269

  
270
    private Object value;
271
    private String fieldName;
272
    private Schema schema;
273
    
274
    public JsonConversionException(Object value, String fieldName, Schema schema) {
275
      this.value = value;
276
      this.fieldName = fieldName;
277
      this.schema = schema;
278
    }
279
    
280
    @Override
281
    public String toString() {
282
      return String.format("Type conversion error for field %s, %s for %s",
283
          fieldName, value, schema);
284
    }
285
  }
286
}
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/common/SchemaLoader.java
1
/**
2
 * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
3
 *
4
 * Cloudera, Inc. licenses this file to you under the Apache License,
5
 * Version 2.0 (the "License"). You may not use this file except in
6
 * compliance with the License. You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11
 * CONDITIONS OF ANY KIND, either express or implied. See the License for
12
 * the specific language governing permissions and limitations under the
13
 * License.
14
 */
15
package com.cloudera.science.avro.common;
16

  
17
import java.io.IOException;
18
import java.io.InputStream;
19
import java.net.URL;
20

  
21
import org.apache.avro.Schema;
22
import org.apache.hadoop.conf.Configuration;
23
import org.apache.hadoop.fs.FSDataInputStream;
24
import org.apache.hadoop.fs.FileSystem;
25
import org.apache.hadoop.fs.Path;
26

  
27

  
28
import eu.dnetlib.iis.core.common.AvroUtils;
29
/**
30
 * Some changes introduced in the original code by Mateusz Kobos
31
 */
32
public class SchemaLoader {
33
  
34
  private final Configuration conf;
35
  private final Schema.Parser parser = new Schema.Parser();
36
  
37
  public SchemaLoader(Configuration conf) {
38
    this.conf = conf;
39
  }
40
  
41
  public Schema load(String schemaJson, String schemaUrl, String typeName) throws IOException {
42
    if (schemaJson != null && !"none".equals(schemaJson)) {
43
      return loadLiteral(schemaJson);
44
    } else if (schemaUrl != null && !"none".equals(schemaUrl)) {
45
      return loadFromUrl(schemaUrl);
46
    } else if (typeName != null && !"none".equals(typeName)) {
47
      return loadFromTypeName(typeName);
48
    } else {
49
      throw new IllegalArgumentException("No valid schema information provided");
50
    }
51
  }
52
  
53
  public Schema loadLiteral(String schemaJson) throws IOException {
54
    return parser.parse(schemaJson);
55
  }
56
  
57
  public Schema loadFromUrl(String schemaUrl) throws IOException {
58
    if (schemaUrl.toLowerCase().startsWith("hdfs://")) {
59
      FileSystem fs = FileSystem.get(conf);
60
      FSDataInputStream input = null;
61
      try {
62
        input = fs.open(new Path(schemaUrl));
63
        return parser.parse(input);
64
      } finally {
65
        if (input != null) {
66
          input.close();
67
        }
68
      }
69
    } else {
70
      InputStream is = null;
71
      try {
72
        is = new URL(schemaUrl).openStream();
73
        return parser.parse(is);
74
      } finally {
75
        if (is != null) {
76
          is.close();
77
        }
78
      }
79
    }
80
  }
81
  
82
  public Schema loadFromTypeName(String typeName) {
83
    return AvroUtils.toSchema(typeName);
84
  }
85
}
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONOutputFormat.java
1
/**
2
 * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
3
 *
4
 * Cloudera, Inc. licenses this file to you under the Apache License,
5
 * Version 2.0 (the "License"). You may not use this file except in
6
 * compliance with the License. You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11
 * CONDITIONS OF ANY KIND, either express or implied. See the License for
12
 * the specific language governing permissions and limitations under the
13
 * License.
14
 */
15
package com.cloudera.science.avro.streaming;
16

  
17
import java.io.IOException;
18

  
19
import org.apache.avro.Schema;
20
import org.apache.avro.file.CodecFactory;
21
import org.apache.avro.file.DataFileConstants;
22
import org.apache.avro.file.DataFileWriter;
23
import org.apache.avro.generic.GenericDatumWriter;
24
import org.apache.avro.generic.GenericRecord;
25
import org.apache.avro.mapred.AvroOutputFormat;
26
import org.apache.avro.mapreduce.AvroJob;
27
import org.apache.hadoop.fs.FileSystem;
28
import org.apache.hadoop.fs.Path;
29
import org.apache.hadoop.io.Text;
30
import org.apache.hadoop.mapred.FileOutputFormat;
31
import org.apache.hadoop.mapred.JobConf;
32
import org.apache.hadoop.mapred.RecordWriter;
33
import org.apache.hadoop.util.Progressable;
34

  
35
import com.cloudera.science.avro.common.JsonConverter;
36
import com.cloudera.science.avro.common.SchemaLoader;
37

  
38
/**
39
 * Some changes introduced in the original code by Mateusz Kobos
40
 */
41
public class AvroAsJSONOutputFormat extends FileOutputFormat<Text, Text> {
42
  public static final String SCHEMA_LITERAL = "output.schema.literal";
43
  public static final String SCHEMA_URL = "output.schema.url";
44
  public static final String SCHEMA_TYPE_NAME = "eu.dnetlib.iis.avro.output.class";
45
  public static final String READ_KEY = "output.read.key";
46
  
47
  private Schema schema;
48
  private JsonConverter converter;
49
  private boolean readKey = true;
50
  
51
  @Override
52
  public RecordWriter<Text, Text> getRecordWriter(FileSystem ignored, JobConf job, String name,
53
      Progressable progress) throws IOException {
54
    if (schema == null) {
55
      SchemaLoader loader = new SchemaLoader(job);
56
      this.schema = loader.load(
57
        job.get(SCHEMA_LITERAL), job.get(SCHEMA_URL), job.get(SCHEMA_TYPE_NAME));
58
      this.converter = new JsonConverter(schema);
59
      this.readKey = job.getBoolean(READ_KEY, true);
60
    }
61
    
62
    DataFileWriter<GenericRecord> writer = new DataFileWriter<GenericRecord>(
63
        new GenericDatumWriter<GenericRecord>(schema));
64
    if (getCompressOutput(job)) {
65
      int level = job.getInt(AvroOutputFormat.DEFLATE_LEVEL_KEY, AvroOutputFormat.DEFAULT_DEFLATE_LEVEL);
66
      String codecName = job.get(AvroJob.CONF_OUTPUT_CODEC, 
67
          org.apache.avro.file.DataFileConstants.DEFLATE_CODEC);
68
      CodecFactory codec = codecName.equals(DataFileConstants.DEFLATE_CODEC)
69
          ? CodecFactory.deflateCodec(level)
70
          : CodecFactory.fromString(codecName);
71
      writer.setCodec(codec);
72
    }
73
    writer.setSyncInterval(job.getInt(AvroOutputFormat.SYNC_INTERVAL_KEY,
74
        DataFileConstants.DEFAULT_SYNC_INTERVAL));
75
    
76
    Path path = FileOutputFormat.getTaskOutputPath(job, name + AvroOutputFormat.EXT);
77
    writer.create(schema, path.getFileSystem(job).create(path));
78
    
79
    return new AvroAsJSONRecordWriter(writer, converter, readKey);
80
  }
81
}
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONRecordWriter.java
1
/**
2
 * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
3
 *
4
 * Cloudera, Inc. licenses this file to you under the Apache License,
5
 * Version 2.0 (the "License"). You may not use this file except in
6
 * compliance with the License. You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11
 * CONDITIONS OF ANY KIND, either express or implied. See the License for
12
 * the specific language governing permissions and limitations under the
13
 * License.
14
 */
15
package com.cloudera.science.avro.streaming;
16

  
17
import java.io.IOException;
18

  
19
import org.apache.avro.file.DataFileWriter;
20
import org.apache.avro.generic.GenericRecord;
21
import org.apache.hadoop.io.Text;
22
import org.apache.hadoop.mapred.RecordWriter;
23
import org.apache.hadoop.mapred.Reporter;
24

  
25
import com.cloudera.science.avro.common.JsonConverter;
26

  
27
public class AvroAsJSONRecordWriter implements RecordWriter<Text, Text> {
28

  
29
  private final DataFileWriter<GenericRecord> writer;
30
  private final JsonConverter converter;
31
  private final boolean readKey;
32
  
33
  public AvroAsJSONRecordWriter(DataFileWriter<GenericRecord> writer, JsonConverter converter, boolean readKey) {
34
    this.writer = writer;
35
    this.converter = converter;
36
    this.readKey = readKey;
37
  }
38
  
39
  @Override
40
  public void write(Text key, Text value) throws IOException {
41
    writer.append(converter.convert(readKey ? key.toString() : value.toString()));
42
  }
43

  
44
  @Override
45
  public void close(Reporter reporter) throws IOException {
46
    writer.close();
47
  }
48
}
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONRecordReader.java
1
/**
2
 * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
3
 *
4
 * Cloudera, Inc. licenses this file to you under the Apache License,
5
 * Version 2.0 (the "License"). You may not use this file except in
6
 * compliance with the License. You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11
 * CONDITIONS OF ANY KIND, either express or implied. See the License for
12
 * the specific language governing permissions and limitations under the
13
 * License.
14
 */
15
package com.cloudera.science.avro.streaming;
16

  
17
import java.io.IOException;
18

  
19
import org.apache.avro.Schema;
20
import org.apache.avro.file.DataFileReader;
21
import org.apache.avro.file.FileReader;
22
import org.apache.avro.generic.GenericDatumReader;
23
import org.apache.avro.generic.GenericRecord;
24
import org.apache.avro.mapred.FsInput;
25
import org.apache.hadoop.io.Text;
26
import org.apache.hadoop.mapred.FileSplit;
27
import org.apache.hadoop.mapred.JobConf;
28
import org.apache.hadoop.mapred.RecordReader;
29

  
30
public class AvroAsJSONRecordReader implements RecordReader<Text, Text> {
31

  
32
  private FileReader<GenericRecord> reader;
33
  private GenericRecord datum;
34
  private long start;
35
  private long end;
36
  
37
  public AvroAsJSONRecordReader(Schema schema, JobConf job, FileSplit split) throws IOException {
38
    this(DataFileReader.openReader(new FsInput(split.getPath(), job),
39
        new GenericDatumReader<GenericRecord>(schema)), split);
40
  }
41

  
42
  protected AvroAsJSONRecordReader(FileReader<GenericRecord> reader, FileSplit split)
43
      throws IOException {
44
    this.reader = reader;
45
    reader.sync(split.getStart());
46
    this.start = reader.tell();
47
    this.end = split.getStart() + split.getLength();
48
  }
49
    
50
  @Override
51
  public Text createKey() {
52
    return new Text();
53
  }
54

  
55
  @Override
56
  public Text createValue() {
57
    return new Text();
58
  }
59

  
60
  @Override
61
  public long getPos() throws IOException {
62
    return reader.tell();
63
  }
64

  
65
  @Override
66
  public float getProgress() throws IOException {
67
    if (end == start) {
68
      return 0.0f;
69
    } else {
70
      return Math.min(1.0f, (getPos() - start) / (float)(end - start));
71
    }
72
  }
73

  
74
  @Override
75
  public void close() throws IOException {
76
    reader.close();
77
  }
78

  
79
  @Override
80
  public boolean next(Text key, Text value) throws IOException {
81
    if (!reader.hasNext() || reader.pastSync(end)) {
82
      return false;
83
    }
84
    datum = reader.next(datum);
85
    key.set(datum.toString());
86
    return true;
87
  }
88
}
modules/icm-iis-3rdparty-avro-json/trunk/src/main/java/com/cloudera/science/avro/streaming/AvroAsJSONInputFormat.java
1
/**
2
 * Copyright (c) 2013, Cloudera, Inc. All Rights Reserved.
3
 *
4
 * Cloudera, Inc. licenses this file to you under the Apache License,
5
 * Version 2.0 (the "License"). You may not use this file except in
6
 * compliance with the License. You may obtain a copy of the License at
7
 *
8
 *     http://www.apache.org/licenses/LICENSE-2.0
9
 *
10
 * This software is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR
11
 * CONDITIONS OF ANY KIND, either express or implied. See the License for
12
 * the specific language governing permissions and limitations under the
13
 * License.
14
 */
15
package com.cloudera.science.avro.streaming;
16

  
17
import java.io.IOException;
18
import java.util.List;
19

  
20
import org.apache.avro.Schema;
21
import org.apache.hadoop.fs.Path;
22
import org.apache.hadoop.io.Text;
23
import org.apache.hadoop.mapred.FileInputFormat;
24
import org.apache.hadoop.mapred.FileSplit;
25
import org.apache.hadoop.mapred.InputSplit;
26
import org.apache.hadoop.mapred.JobConf;
27
import org.apache.hadoop.mapred.RecordReader;
28
import org.apache.hadoop.mapred.Reporter;
29

  
30
import com.cloudera.science.avro.common.SchemaLoader;
31
import com.google.common.base.Strings;
32
import com.google.common.collect.Lists;
33

  
34
/**
35
 * Some changes introduced in the original code by Mateusz Kobos
36
 */
37
public class AvroAsJSONInputFormat extends FileInputFormat<Text, Text> {
38
  public static final String SCHEMA_LITERAL = "input.schema.literal";
39
  public static final String SCHEMA_URL = "input.schema.url";
40
  public static final String SCHEMA_TYPE_NAME = "eu.dnetlib.iis.avro.input.class";
41
  
42
  private List<Schema> schemas;
43
  private String[] inputPaths;
44
  
45
  @Override
46
  public RecordReader<Text, Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter)
47
      throws IOException {
48
    if (schemas == null) {
49
      loadSchemas(job);
50
    }
51
    FileSplit fs = (FileSplit) split;
52
    Schema schema = null;
53
    if (schemas.size() == 1) {
54
      schema = schemas.get(0);
55
    } else {
56
      // Need to figure out which schema we're loading
57
      String current = fs.getPath().toString();
58
      int index = -1;
59
      int bestMatchLength = -1;
60
      for (int i = 0; i < inputPaths.length; i++) {
61
        int match = Strings.commonPrefix(current, inputPaths[i]).length();
62
        if (match > bestMatchLength) {
63
          bestMatchLength = match;
64
          index = i;
65
        }
66
      }
67
      schema = schemas.get(index);
68
    }
69
    return new AvroAsJSONRecordReader(schema, job, fs);
70
  }
71
  
72
  private void loadSchemas(JobConf job) throws IOException {
73
    this.schemas = Lists.newArrayList();
74
    SchemaLoader loader = new SchemaLoader(job);
75
    String schemaLiteral = job.get(SCHEMA_LITERAL);
76
    if (schemaLiteral != null) {
77
      schemas.add(loader.loadLiteral(schemaLiteral));
78
      return;
79
    } else {
80
      String[] schemaUrls = job.getStrings(SCHEMA_URL);
81
      String[] typeNames = job.getStrings(SCHEMA_TYPE_NAME);
82
      if (schemaUrls != null) {
83
        for (String schemaUrl : schemaUrls) {
84
          schemas.add(loader.loadFromUrl(schemaUrl));
85
        }
86
      } else if (typeNames != null) {
87
        for (String typeName : typeNames) {
88
          schemas.add(loader.loadFromTypeName(typeName));
89
        }    	
90
      } else {
91
        throw new IllegalArgumentException("No schema information provided");
92
      }
93

  
94
      if (schemas.size() > 1) {
95
        // Need to track input paths
96
        Path[] inputs = FileInputFormat.getInputPaths(job);
97
        if (inputs.length != schemas.size()) {
98
          throw new IllegalArgumentException(String.format(
99
              "Number of input paths (%d) does not match number of schemas specified (%d)",
100
              inputs.length, schemas.size()));
101
        }
102
        this.inputPaths = new String[inputs.length];
103
        for (int i = 0; i < inputs.length; i++) {
104
          inputPaths[i] = inputs[i].toString();
105
        }
106
      }
107
    }
108
  }
109
}
modules/icm-iis-3rdparty-avro-json/trunk/pom.xml
1
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
2
  <!--
3
  Licensed to the Apache Software Foundation (ASF) under one
4
  or more contributor license agreements.  See the NOTICE file
5
  distributed with this work for additional information
6
  regarding copyright ownership.  The ASF licenses this file
7
  to you under the Apache License, Version 2.0 (the
8
  "License"); you may not use this file except in compliance
9
  with the License.  You may obtain a copy of the License at
10

  
11
       http://www.apache.org/licenses/LICENSE-2.0
12

  
13
  Unless required by applicable law or agreed to in writing, software
14
  distributed under the License is distributed on an "AS IS" BASIS,
15
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
16
  See the License for the specific language governing permissions and
17
  limitations under the License.
18
  -->
19
	<parent>
20
    		<groupId>eu.dnetlib</groupId>
21
	        <artifactId>dnet-parent</artifactId>
22
            <version>1.0.0</version>
23
	</parent>
24
  
25
  <modelVersion>4.0.0</modelVersion>
26

  
27
  <artifactId>icm-iis-3rdparty-avro-json</artifactId>
28
  <version>1.2-SNAPSHOT</version>
29
  <packaging>jar</packaging>
30

  
31
  <scm>
32
    <developerConnection>
33
      scm:svn:https://svn.driver.research-infrastructures.eu/driver/dnet40/modules/icm-iis-3rdparty-avro-json/trunk
34
    </developerConnection>
35
  </scm>
36

  
37
  <name>Avro/JSON InputFormat/OutputFormats and SerDes</name>
38
  <url>http://www.cloudera.com</url>
39

  
40
  <properties>
41
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
42
    <eclipse.output.directory>eclipse-classes</eclipse.output.directory>
43
  </properties>
44

  
45
  <build>
46
    <plugins>
47
      <plugin>
48
        <groupId>org.apache.maven.plugins</groupId>
49
        <artifactId>maven-eclipse-plugin</artifactId>
50
        <version>2.9</version>
51
        <configuration>
52
          <buildOutputDirectory>eclipse-classes</buildOutputDirectory>
53
          <downloadSources>true</downloadSources>
54
          <downloadJavadocs>false</downloadJavadocs>
55
        </configuration>
56
      </plugin>
57

  
58
    </plugins>
59
  </build>
60

  
61
  <dependencies>
62
    <dependency>
63
      <groupId>eu.dnetlib</groupId>
64
      <artifactId>icm-iis-core</artifactId>
65
      <version>[1.0.0,2.0.0)</version>
66
    </dependency>
67
    <dependency>
68
      <groupId>junit</groupId>
69
      <artifactId>junit</artifactId>
70
      <version>4.8.2</version>
71
      <scope>test</scope>      
72
    </dependency>
73
    <dependency>
74
      <groupId>org.apache.avro</groupId>
75
      <artifactId>avro</artifactId>
76
      <version>${iis.avro.version}</version>
77
    </dependency>
78

  
79
    <dependency>
80
      <groupId>org.apache.avro</groupId>
81
      <artifactId>avro-mapred</artifactId>
82
      <version>${iis.avro.version}</version>
83
      <classifier>hadoop2</classifier>
84
    </dependency>
85

  
86
    <!-- Hadoop Dependencies -->
87
    <dependency>
88
      <groupId>org.apache.hive</groupId>
89
      <artifactId>hive-serde</artifactId>
90
      <version>${iis.hive.version}</version>
91
      <scope>provided</scope>
92
    </dependency>
93
    <dependency>
94
      <groupId>org.apache.hadoop</groupId>
95
      <artifactId>hadoop-client</artifactId>
96
      <version>${iis.hadoop.version}</version>
97
      <scope>provided</scope>
98
    </dependency>
99
  </dependencies>
100

  
101
	<repositories>
102
		<repository>
103
			<id>cloudera</id>
104
			<url>https://repository.cloudera.com/artifactory/cloudera-repos</url>
105
			<releases>
106
				<enabled>true</enabled>
107
			</releases>
108
			<snapshots>
109
				<enabled>false</enabled>
110
			</snapshots>
111
		</repository>
112
  </repositories>
113
</project>
modules/icm-iis-3rdparty-avro-json/trunk/deploy.info.disabled
1
{
2
  "type_source": "SVN", 
3
  "goal": "package -U -T 4C source:jar", 
4
  "url": "http://svn-public.driver.research-infrastructures.eu/driver/dnet40/modules/icm-iis-3rdparty-avro-json/trunk/", 
5
  "deploy_repository": "dnet4-snapshots", 
6
  "version": "4",
7
  "mail": "m.horst@icm.edu.pl,m.kobos@icm.edu.pl",
8
  "deploy_repository_url": "http://maven.research-infrastructures.eu/nexus/content/repositories/dnet4-snapshots", 
9
  "name": "icm-iis-3rdparty-avro-json"
10
}
modules/icm-iis-3rdparty-avro-json/trunk/README.markdown
1
This project is a patched version of 
2

  
3
**Cloudera's Avro-JSON proxy for Hadoop Streaming**
4

  
5
The goal of this patch is to introduce a way of passing fully qualified names of classes that define the input and output schemas in the Oozie description. The default approach of this library is to accept either a full schema given in XML property or a path to a file stored somewhere in the filesystem.
6

  
7
See the parent directory of this directory for:
8
 
9
 - the original source,
10
 - scripts for downloading the original source and,
11
 - scripts for applying the patch in order to create this project.

Also available in: Unified diff