Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

    
3
import com.google.common.base.Joiner;
4
import com.google.protobuf.InvalidProtocolBufferException;
5
import com.googlecode.protobuf.format.JsonFormat;
6
import eu.dnetlib.data.mapreduce.util.OafDecoder;
7
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
8
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
9
import eu.dnetlib.data.proto.OafProtos;
10
import eu.dnetlib.data.proto.TypeProtos;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.apache.hadoop.hbase.client.Result;
14
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15
import org.apache.hadoop.hbase.mapreduce.TableMapper;
16
import org.apache.hadoop.hbase.util.Bytes;
17
import org.apache.hadoop.io.Text;
18

    
19
import java.io.IOException;
20
import java.util.Map;
21
import java.util.Map.Entry;
22
import java.util.NavigableMap;
23

    
24
/**
25
 * Exports Oaf objects as their json serialization.
26
 *
27
 * @author claudio
28
 *
29
 */
30
public class ExportInformationSpaceMergedUpdatesMapper extends TableMapper<Text, Text> {
31

    
32
	/**
33
	 * logger.
34
	 */
35
	private static final Log log = LogFactory.getLog(ExportInformationSpaceMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
36

    
37
	private static final String SEPARATOR = "@";
38

    
39
	private Text keyOut;
40

    
41
	private Text valueOut;
42

    
43
	@Override
44
	protected void setup(final Context context) throws IOException, InterruptedException {
45
		super.setup(context);
46

    
47
		keyOut = new Text();
48
		valueOut = new Text();
49
	}
50

    
51
	@Override
52
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
53
		try {
54
			byte[] rowKey = keyIn.copyBytes();
55

    
56
			final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(keyIn.get());
57

    
58
			final OafProtos.Oaf oaf = mergeUpdates(value, context, rkd.getType());
59

    
60
			if (oaf == null) {
61
				return;
62
			}
63

    
64
			Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
65

    
66
			for (byte[] cf : row.keySet()) {
67

    
68
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
69

    
70
					if(Bytes.toString(q.getKey()).startsWith("update")) {
71
						continue;
72
					}
73

    
74
					if (Bytes.toString(q.getKey()).equals("body")) {
75
						emit(rowKey, cf, q.getKey(), oaf.toByteArray(), context);
76
					} else {
77
						emit(rowKey, cf, q.getKey(), q.getValue(), context);
78
					}
79
				}
80
			}
81
		} catch (final Throwable e) {
82
			log.error("error exporting the following record from HBase: " + value.toString(), e);
83
			context.getCounter("error", e.getClass().getName()).increment(1);
84
			throw new RuntimeException(e);
85
		}
86
	}
87

    
88
	private void emit(final byte[] rowKey, final byte[] cf, final byte[] q, final byte[] value, final Context context) throws IOException, InterruptedException {
89

    
90
		keyOut.set(Joiner.on(SEPARATOR).join(new String(rowKey), new String(cf), new String(q)));
91

    
92
		if ((value == null) || (value.length == 0)) {
93
			valueOut.set("");
94
		} else {
95
			valueOut.set(new JsonFormat().printToString(OafDecoder.decode(value).getOaf()));
96
		}
97
		context.write(keyOut, valueOut);
98
	}
99

    
100
	private OafProtos.Oaf mergeUpdates(final Result value, final Context context, final TypeProtos.Type type)
101
			throws InvalidProtocolBufferException {
102
		try {
103
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
104
		} catch (final InvalidProtocolBufferException e) {
105
			throw e;
106
		}
107
	}
108

    
109

    
110
}
(5-5/9)