Revision 58601
Added by Claudio Atzori almost 4 years ago
ExportInformationSpaceMapper2DHP.java | ||
---|---|---|
17 | 17 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
18 | 18 |
import org.apache.hadoop.hbase.mapreduce.TableMapper; |
19 | 19 |
import org.apache.hadoop.hbase.util.Bytes; |
20 |
import org.apache.hadoop.io.NullWritable; |
|
20 | 21 |
import org.apache.hadoop.io.Text; |
22 |
import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; |
|
21 | 23 |
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; |
22 | 24 |
|
23 | 25 |
import java.io.IOException; |
... | ... | |
25 | 27 |
import java.util.Map.Entry; |
26 | 28 |
import java.util.NavigableMap; |
27 | 29 |
import java.util.stream.Collectors; |
30 |
import java.util.stream.Stream; |
|
28 | 31 |
|
29 | 32 |
/** |
30 | 33 |
* Exports Oaf objects as their json serialization. |
... | ... | |
46 | 49 |
protected void setup(final Context context) throws IOException, InterruptedException { |
47 | 50 |
super.setup(context); |
48 | 51 |
|
49 |
keyOut = new Text(); |
|
52 |
keyOut = new Text("");
|
|
50 | 53 |
valueOut = new Text(); |
51 | 54 |
multipleOutputs = new MultipleOutputs(context); |
52 | 55 |
objectMapper = new ObjectMapper() |
... | ... | |
71 | 74 |
emit(context, oaf); |
72 | 75 |
} |
73 | 76 |
|
74 |
final Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
|
|
75 |
|
|
76 |
for (byte[] cf : row.keySet()) {
|
|
77 |
|
|
78 |
for (Entry<byte[], byte[]> q : value.getFamilyMap(cf).entrySet().stream().filter(e -> {
|
|
79 |
final String key = new String(e.getKey());
|
|
80 |
boolean skip = key.startsWith("update") || key.equals(DedupUtils.BODY_S);
|
|
81 |
if (skip) {
|
|
82 |
context.getCounter("export", String.format("skipped %s", StringUtils.substring(key, 0, 6))).increment(1);
|
|
83 |
}
|
|
84 |
return !skip;
|
|
85 |
}).collect(Collectors.toList())) {
|
|
86 |
if (new String(q.getValue()).equals("")) {
|
|
87 |
context.getCounter("export", "skipped " + new String(cf)).increment(1);
|
|
88 |
} else {
|
|
89 |
emit(context, OafProtos.Oaf.parseFrom(q.getValue()));
|
|
90 |
}
|
|
91 |
} |
|
92 |
}
|
|
77 |
Stream.of(value.raw())
|
|
78 |
.filter(kv -> { |
|
79 |
final String q = Bytes.toString(kv.getQualifier());
|
|
80 |
boolean skip = q.startsWith("update") || q.equals(DedupUtils.BODY_S); |
|
81 |
if (skip) {
|
|
82 |
context.getCounter("export", String.format("skipped %s", StringUtils.substring(q, 0, 6))).increment(1);
|
|
83 |
}
|
|
84 |
return !skip;
|
|
85 |
})
|
|
86 |
.filter(kv -> !"".equals(Bytes.toString(kv.getValue())))
|
|
87 |
.map(kv -> kv.getValue())
|
|
88 |
.forEach(v -> {
|
|
89 |
try {
|
|
90 |
emit(context, OafProtos.Oaf.parseFrom(v));
|
|
91 |
} catch (IOException | InterruptedException e) {
|
|
92 |
context.getCounter("export", "error: " + e.getClass().getName()).increment(1);
|
|
93 |
throw new RuntimeException(e);
|
|
94 |
}
|
|
95 |
});
|
|
93 | 96 |
} catch (final Throwable e) { |
94 | 97 |
context.getCounter("export", "error: " + e.getClass().getName()).increment(1); |
95 | 98 |
throw new RuntimeException(e); |
... | ... | |
104 | 107 |
context.getCounter("export", "error:" + e.getClass().getName()).increment(1); |
105 | 108 |
} |
106 | 109 |
if (result != null) { |
107 |
keyOut.set(result.getClass().getName()); |
|
108 | 110 |
valueOut.set(objectMapper.writeValueAsString(result)); |
109 | 111 |
|
110 | 112 |
final String type = result.getClass().getSimpleName(); |
Also available in: Unified diff
less memory pressure on the hbase table export job, context propagation utils. Proto exporter aligned with most recent dhp.model changes