Revision 58601
Added by Claudio Atzori almost 4 years ago
ExportInformationSpaceMapper.java | ||
---|---|---|
4 | 4 |
import java.util.Map; |
5 | 5 |
import java.util.Map.Entry; |
6 | 6 |
import java.util.NavigableMap; |
7 |
import java.util.stream.Stream; |
|
7 | 8 |
|
9 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
|
10 |
import eu.dnetlib.data.proto.OafProtos; |
|
11 |
import org.apache.commons.lang.StringUtils; |
|
8 | 12 |
import org.apache.commons.logging.Log; |
9 | 13 |
import org.apache.commons.logging.LogFactory; |
10 | 14 |
import org.apache.hadoop.hbase.client.Result; |
11 | 15 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
12 | 16 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
17 |
import org.apache.hadoop.hbase.util.Bytes; |
|
13 | 18 |
import org.apache.hadoop.io.Text; |
14 | 19 |
|
15 | 20 |
import com.google.common.base.Joiner; |
... | ... | |
48 | 53 |
protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException { |
49 | 54 |
try { |
50 | 55 |
byte[] rowKey = keyIn.copyBytes(); |
51 |
Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap(); |
|
56 |
Stream.of(value.raw()) |
|
57 |
.filter(kv -> !"".equals(Bytes.toString(kv.getValue()))) |
|
58 |
.forEach(kv -> { |
|
59 |
try { |
|
60 |
emit(rowKey, kv.getFamily(), kv.getQualifier(), kv.getValue(), context); |
|
61 |
} catch (IOException | InterruptedException e) { |
|
62 |
throw new RuntimeException(e); |
|
63 |
} |
|
64 |
}); |
|
52 | 65 |
|
53 |
for (byte[] cf : row.keySet()) { |
|
54 |
|
|
55 |
for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) { |
|
56 |
|
|
57 |
emit(rowKey, cf, q.getKey(), q.getValue(), context); |
|
58 |
} |
|
59 |
} |
|
60 | 66 |
} catch (final Throwable e) { |
61 |
log.error("error exporting the following record from HBase: " + value.toString(), e); |
|
67 |
//log.error("error exporting the following record from HBase: " + value.toString(), e);
|
|
62 | 68 |
context.getCounter("error", e.getClass().getName()).increment(1); |
63 | 69 |
throw new RuntimeException(e); |
64 | 70 |
} |
Also available in: Unified diff
less memory pressure on the hbase table export job, context propagation utils. Proto exporter aligned with most recent dhp.model changes