Revision 56483
Added by Antonis Lempesis almost 5 years ago
StatsMapper.java | ||
---|---|---|
75 | 75 |
@Override |
76 | 76 |
protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) { |
77 | 77 |
|
78 |
Oaf oaf = null; |
|
79 |
OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes()); |
|
78 |
|
|
80 | 79 |
try { |
81 |
oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(keyDecoder.getType().toString()))); |
|
80 |
OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes()); |
|
81 |
Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(keyDecoder.getType().toString()))); |
|
82 | 82 |
|
83 |
if (isValid(oaf)) |
|
84 |
emitProtos(context, result, oaf); |
|
85 |
|
|
83 | 86 |
} catch (Exception e) { |
84 |
/* |
|
85 |
System.err.println("Unable to parse proto in row: " + oaf.getEntity().getType().toString() + keyDecoder.getKey()); |
|
86 |
log.error("Unable to parse proto in row: " + oaf.getEntity().getType().toString() + keyDecoder.getKey(), e); |
|
87 |
*/ |
|
87 |
System.err.println("Unable to parse proto in row: " + new String(keyIn.copyBytes())); |
|
88 |
log.error("Unable to parse proto in row: " + new String(keyIn.copyBytes()), e); |
|
88 | 89 |
|
89 |
System.err.println("Unable to parse proto in row: " + keyDecoder.getKey()); |
|
90 |
log.error("Unable to parse proto in row: " + keyDecoder.getKey(), e); |
|
91 |
|
|
92 | 90 |
context.getCounter(ERROR_COUNTERS.rottenRecords).increment(1); |
93 | 91 |
} |
94 |
|
|
95 |
if (isValid(oaf)) { |
|
96 |
emitProtos(context, result, oaf); |
|
97 |
|
|
98 |
} |
|
99 |
|
|
100 |
|
|
101 | 92 |
} |
102 | 93 |
|
103 | 94 |
|
104 | 95 |
private boolean isValid(Oaf oaf) { |
105 |
return oaf != null && oaf.isInitialized() && !deletedByInference(oaf) && !invisible(oaf); |
|
106 |
//if (oaf != null && oaf.isInitialized() && !deletedByInference(oaf)) return true; |
|
96 |
return oaf != null && oaf.isInitialized() && !oaf.getDataInfo().getDeletedbyinference() && !oaf.getDataInfo().getInvisible(); |
|
107 | 97 |
} |
108 | 98 |
|
109 | 99 |
|
... | ... | |
270 | 260 |
return columnMap != null && !columnMap.isEmpty(); |
271 | 261 |
} |
272 | 262 |
|
273 |
private boolean deletedByInference(final Oaf oaf) { |
|
274 |
return oaf.getDataInfo().getDeletedbyinference(); |
|
275 |
} |
|
276 |
|
|
277 |
private boolean invisible(final Oaf oaf) { |
|
278 |
return oaf.getDataInfo().getInvisible(); |
|
279 |
} |
|
280 |
|
|
281 | 263 |
@Override |
282 | 264 |
protected void cleanup(Context context) throws IOException, InterruptedException { |
283 | 265 |
|
Also available in: Unified diff
Minor bug fixes