1
|
package eu.dnetlib.dhp.common.protobuf;
|
2
|
|
3
|
import com.google.protobuf.Message;
|
4
|
import org.apache.avro.generic.IndexedRecord;
|
5
|
import org.apache.avro.mapred.AvroKey;
|
6
|
import org.apache.hadoop.io.BytesWritable;
|
7
|
import org.apache.hadoop.io.NullWritable;
|
8
|
import org.apache.hadoop.io.Text;
|
9
|
import org.apache.hadoop.mapreduce.Mapper;
|
10
|
import org.apache.log4j.Logger;
|
11
|
|
12
|
import java.io.IOException;
|
13
|
|
14
|
/**
|
15
|
* @author Mateusz Fedoryszak (m.fedoryszak@icm.edu.pl)
|
16
|
*/
|
17
|
public class AvroToProtoBufOneToOneMapper<IN extends IndexedRecord, OUT extends Message>
|
18
|
extends Mapper<AvroKey<IN>, NullWritable, Text, BytesWritable> {
|
19
|
private static final String CONVERTER_CLASS_PROPERTY = "converter_class";
|
20
|
private static final Logger log = Logger.getLogger(AvroToProtoBufOneToOneMapper.class);
|
21
|
|
22
|
private final Text keyWritable = new Text();
|
23
|
private final BytesWritable valueWritable = new BytesWritable();
|
24
|
private AvroToProtoBufConverter<IN, OUT> converter;
|
25
|
|
26
|
@SuppressWarnings("unchecked")
|
27
|
@Override
|
28
|
public void setup(Context context) throws IOException, InterruptedException {
|
29
|
Class<?> converterClass = context.getConfiguration().getClass(CONVERTER_CLASS_PROPERTY, null);
|
30
|
|
31
|
if (converterClass == null) {
|
32
|
throw new IOException("Please specify " + CONVERTER_CLASS_PROPERTY);
|
33
|
}
|
34
|
|
35
|
try {
|
36
|
converter = (AvroToProtoBufConverter<IN, OUT>) converterClass.newInstance();
|
37
|
} catch (ClassCastException e) {
|
38
|
throw new IOException(
|
39
|
"Class specified in " + CONVERTER_CLASS_PROPERTY + " doesn't implement AvroToProtoBufConverter", e);
|
40
|
} catch (Exception e) {
|
41
|
throw new IOException(
|
42
|
"Could not instantiate specified AvroToProtoBufConverter class, " + converterClass, e);
|
43
|
}
|
44
|
}
|
45
|
|
46
|
@Override
|
47
|
public void map(AvroKey<IN> avro, NullWritable ignore, Context context)
|
48
|
throws IOException, InterruptedException {
|
49
|
String key = null;
|
50
|
try {
|
51
|
key = converter.convertIntoKey(avro.datum());
|
52
|
keyWritable.set(key);
|
53
|
|
54
|
byte[] value = converter.convertIntoValue(avro.datum()).toByteArray();
|
55
|
valueWritable.set(value, 0, value.length);
|
56
|
|
57
|
context.write(keyWritable, valueWritable);
|
58
|
} catch (Exception e) {
|
59
|
log.error("Error" + (key != null ? " while processing " + key : ""), e);
|
60
|
}
|
61
|
}
|
62
|
}
|