Project

General

Profile

« Previous | Next » 

Revision 58601

less memory pressure on the hbase table export job, context propagation utils. Proto exporter aligned with most recent dhp.model changes

View differences:

ExportInformationSpaceMapper.java
4 4
import java.util.Map;
5 5
import java.util.Map.Entry;
6 6
import java.util.NavigableMap;
7
import java.util.stream.Stream;
7 8

  
9
import eu.dnetlib.data.mapreduce.util.DedupUtils;
10
import eu.dnetlib.data.proto.OafProtos;
11
import org.apache.commons.lang.StringUtils;
8 12
import org.apache.commons.logging.Log;
9 13
import org.apache.commons.logging.LogFactory;
10 14
import org.apache.hadoop.hbase.client.Result;
11 15
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12 16
import org.apache.hadoop.hbase.mapreduce.TableMapper;
17
import org.apache.hadoop.hbase.util.Bytes;
13 18
import org.apache.hadoop.io.Text;
14 19

  
15 20
import com.google.common.base.Joiner;
......
48 53
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
49 54
		try {
50 55
			byte[] rowKey = keyIn.copyBytes();
51
			Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
56
			Stream.of(value.raw())
57
					.filter(kv -> !"".equals(Bytes.toString(kv.getValue())))
58
					.forEach(kv -> {
59
						try {
60
							emit(rowKey, kv.getFamily(), kv.getQualifier(), kv.getValue(), context);
61
						} catch (IOException | InterruptedException e) {
62
							throw new RuntimeException(e);
63
						}
64
					});
52 65

  
53
			for (byte[] cf : row.keySet()) {
54

  
55
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
56

  
57
					emit(rowKey, cf, q.getKey(), q.getValue(), context);
58
				}
59
			}
60 66
		} catch (final Throwable e) {
61
			log.error("error exporting the following record from HBase: " + value.toString(), e);
67
			//log.error("error exporting the following record from HBase: " + value.toString(), e);
62 68
			context.getCounter("error", e.getClass().getName()).increment(1);
63 69
			throw new RuntimeException(e);
64 70
		}

Also available in: Unified diff