Project

General

Profile

1
package eu.dnetlib.dhp.common.protobuf;
2

    
3
import com.google.protobuf.Message;
4
import org.apache.avro.generic.IndexedRecord;
5
import org.apache.avro.mapred.AvroKey;
6
import org.apache.hadoop.io.BytesWritable;
7
import org.apache.hadoop.io.NullWritable;
8
import org.apache.hadoop.io.Text;
9
import org.apache.hadoop.mapreduce.Mapper;
10
import org.apache.log4j.Logger;
11

    
12
import java.io.IOException;
13

    
14
/**
15
 * @author Mateusz Fedoryszak (m.fedoryszak@icm.edu.pl)
16
 */
17
public class AvroToProtoBufOneToOneMapper<IN extends IndexedRecord, OUT extends Message>
18
        extends Mapper<AvroKey<IN>, NullWritable, Text, BytesWritable> {
19
    private static final String CONVERTER_CLASS_PROPERTY = "converter_class";
20
    private static final Logger log = Logger.getLogger(AvroToProtoBufOneToOneMapper.class);
21

    
22
    private final Text keyWritable = new Text();
23
    private final BytesWritable valueWritable = new BytesWritable();
24
    private AvroToProtoBufConverter<IN, OUT> converter;
25

    
26
    @SuppressWarnings("unchecked")
27
    @Override
28
    public void setup(Context context) throws IOException, InterruptedException {
29
        Class<?> converterClass = context.getConfiguration().getClass(CONVERTER_CLASS_PROPERTY, null);
30

    
31
        if (converterClass == null) {
32
            throw new IOException("Please specify " + CONVERTER_CLASS_PROPERTY);
33
        }
34

    
35
        try {
36
            converter = (AvroToProtoBufConverter<IN, OUT>) converterClass.newInstance();
37
        } catch (ClassCastException e) {
38
            throw new IOException(
39
                    "Class specified in " + CONVERTER_CLASS_PROPERTY + " doesn't implement AvroToProtoBufConverter", e);
40
        } catch (Exception e) {
41
            throw new IOException(
42
                    "Could not instantiate specified AvroToProtoBufConverter class, " + converterClass, e);
43
        }
44
    }
45

    
46
    @Override
47
    public void map(AvroKey<IN> avro, NullWritable ignore, Context context)
48
            throws IOException, InterruptedException {
49
        String key = null;
50
        try {
51
            key = converter.convertIntoKey(avro.datum());
52
            keyWritable.set(key);
53

    
54
            byte[] value = converter.convertIntoValue(avro.datum()).toByteArray();
55
            valueWritable.set(value, 0, value.length);
56

    
57
            context.write(keyWritable, valueWritable);
58
        } catch (Exception e) {
59
            log.error("Error" + (key != null ? " while processing  " + key : ""), e);
60
        }
61
    }
62
}
(2-2/2)