Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import com.google.common.base.Joiner;
4
import com.google.gson.Gson;
5
import com.googlecode.protobuf.format.JsonFormat;
6
import eu.dnetlib.data.mapreduce.util.OafDecoder;
7
import eu.dnetlib.data.proto.OafProtos;
8
import eu.dnetlib.dhp.schema.oaf.Oaf;
9
import eu.dnetlib.dhp.schema.util.ProtoConverter;
10
import eu.dnetlib.dhp.schema.util.ProtoUtils;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.apache.hadoop.hbase.client.Result;
14
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15
import org.apache.hadoop.hbase.mapreduce.TableMapper;
16
import org.apache.hadoop.io.Text;
17

    
18
import java.io.IOException;
19
import java.util.Map;
20
import java.util.Map.Entry;
21
import java.util.NavigableMap;
22

    
23
/**
24
 * Exports Oaf objects as their json serialization.
25
 *
26
 * @author claudio
27
 *
28
 */
29
public class ExportInformationSpaceMapper2DHP extends TableMapper<Text, Text> {
30

    
31
	/**
32
	 * logger.
33
	 */
34
	private static final Log log = LogFactory.getLog(ExportInformationSpaceMapper2DHP.class);
35

    
36

    
37
	private Text keyOut;
38

    
39
	private Text valueOut;
40

    
41
	private Gson g;
42

    
43
	@Override
44
	protected void setup(final Context context) throws IOException, InterruptedException {
45
		super.setup(context);
46

    
47
		keyOut = new Text();
48
		valueOut = new Text();
49
		g = new Gson();
50
	}
51

    
52
	@Override
53
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
54
		try {
55
			Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
56

    
57
			for (byte[] cf : row.keySet()) {
58

    
59
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
60
					final OafProtos.Oaf oaf = OafDecoder.decode(q.getValue()).getOaf();
61
					Oaf result = null;
62
					try {
63
						result = ProtoConverter.convert(JsonFormat.printToString(oaf));
64
					} catch (Throwable e) {
65
						context.getCounter("Convert", "error" ).increment(1);
66
					}
67
					if (result != null) {
68
						emit(result.getClass().getName(), result, context);
69
					}
70
				}
71
			}
72
		} catch (final Throwable e) {
73
			log.error("error exporting the following record from HBase: " + value.toString(), e);
74
			context.getCounter("error", e.getClass().getName()).increment(1);
75
			throw new RuntimeException(e);
76
		}
77
	}
78
	private void emit(final String type,  final Oaf result, final Context context) throws IOException, InterruptedException {
79
		keyOut.set(type);
80
		valueOut.set(g.toJson(result));
81
		context.write(keyOut, valueOut);
82
	}
83
}
(4-4/9)