Project

General

Profile

« Previous | Next » 

Revision 57505

included infospace mapping towards the new OAF DHP model

View differences:

ExportInformationSpaceMapper2DHP.java
1 1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2 2

  
3
import com.google.common.base.Joiner;
4 3
import com.google.gson.Gson;
5
import com.googlecode.protobuf.format.JsonFormat;
6
import eu.dnetlib.data.mapreduce.util.OafDecoder;
4
import com.google.protobuf.InvalidProtocolBufferException;
5
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
6
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
7 7
import eu.dnetlib.data.proto.OafProtos;
8
import eu.dnetlib.data.proto.TypeProtos;
8 9
import eu.dnetlib.dhp.schema.oaf.Oaf;
9
import eu.dnetlib.dhp.schema.util.ProtoConverter;
10
import eu.dnetlib.dhp.schema.util.ProtoUtils;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13 10
import org.apache.hadoop.hbase.client.Result;
14 11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15 12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.hbase.util.Bytes;
16 14
import org.apache.hadoop.io.Text;
17 15

  
18 16
import java.io.IOException;
......
28 26
 */
29 27
public class ExportInformationSpaceMapper2DHP extends TableMapper<Text, Text> {
30 28

  
31
	/**
32
	 * logger.
33
	 */
34
	private static final Log log = LogFactory.getLog(ExportInformationSpaceMapper2DHP.class);
35

  
36

  
37 29
	private Text keyOut;
38 30

  
39 31
	private Text valueOut;
......
52 44
	@Override
53 45
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
54 46
		try {
55
			Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
47
			final OafRowKeyDecoder rkd = OafRowKeyDecoder.decode(keyIn.get());
56 48

  
49
			final OafProtos.Oaf oaf = mergeUpdates(value, context, rkd.getType());
50

  
51
			if (oaf == null) {
52
				return;
53
			}
54

  
55
			final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
56

  
57 57
			for (byte[] cf : row.keySet()) {
58 58

  
59 59
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
60
					final OafProtos.Oaf oaf = OafDecoder.decode(q.getValue()).getOaf();
60

  
61
					if(Bytes.toString(q.getKey()).startsWith("update")) {
62
						continue;
63
					}
64

  
61 65
					Oaf result = null;
62 66
					try {
63
						result = ProtoConverter.convert(JsonFormat.printToString(oaf));
67
						result = ProtoConverter.convert(oaf);
64 68
					} catch (Throwable e) {
65 69
						context.getCounter("Convert", "error" ).increment(1);
66 70
					}
......
70 74
				}
71 75
			}
72 76
		} catch (final Throwable e) {
73
			log.error("error exporting the following record from HBase: " + value.toString(), e);
74
			context.getCounter("error", e.getClass().getName()).increment(1);
77
			context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
75 78
			throw new RuntimeException(e);
76 79
		}
77 80
	}
......
79 82
		keyOut.set(type);
80 83
		valueOut.set(g.toJson(result));
81 84
		context.write(keyOut, valueOut);
85

  
86
		context.getCounter("export", type).increment(1);
82 87
	}
88

  
89
	private OafProtos.Oaf mergeUpdates(final Result value, final Context context, final TypeProtos.Type type)
90
			throws InvalidProtocolBufferException {
91
		try {
92
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
93
		} catch (final InvalidProtocolBufferException e) {
94
			throw e;
95
		}
96
	}
97

  
83 98
}

Also available in: Unified diff