Revision 35899
Added by Alessia Bardi about 9 years ago
modules/dnet-mapreduce-jobs/branches/0.0.6.3.x/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/ImportRecordsMapper.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.io.IOException; |
4 | 4 |
|
5 |
import org.apache.commons.logging.Log; |
|
6 |
import org.apache.commons.logging.LogFactory; |
|
5 | 7 |
import org.apache.hadoop.hbase.client.Put; |
6 | 8 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
7 | 9 |
import org.apache.hadoop.hbase.util.Bytes; |
... | ... | |
16 | 18 |
|
17 | 19 |
public class ImportRecordsMapper extends Mapper<Text, Text, ImmutableBytesWritable, Put> { |
18 | 20 |
|
21 |
/** |
|
22 |
* logger. |
|
23 |
*/ |
|
24 |
private static final Log log = LogFactory.getLog(ImportRecordsMapper.class); // NOPMD by marko on 11/24/08 5:02 PM |
|
19 | 25 |
private static final boolean WRITE_TO_WAL = false; |
20 | 26 |
|
21 | 27 |
private XsltRowTransformer transformer; |
... | ... | |
25 | 31 |
private ImmutableBytesWritable ibw; |
26 | 32 |
|
27 | 33 |
@Override |
28 |
protected void setup(Context context) throws IOException, InterruptedException { |
|
34 |
protected void setup(final Context context) throws IOException, InterruptedException {
|
|
29 | 35 |
super.setup(context); |
30 | 36 |
|
31 | 37 |
this.xslt = context.getConfiguration().get(JobParams.HBASE_IMPORT_XSLT).trim(); |
32 | 38 |
|
33 |
if (xslt == null || xslt.isEmpty()) { throw new IllegalArgumentException("missing xslt"); }
|
|
39 |
if ((xslt == null) || xslt.isEmpty()) throw new IllegalArgumentException("missing xslt");
|
|
34 | 40 |
|
35 | 41 |
transformer = new XsltRowTransformerFactory().getTransformer(xslt); |
36 | 42 |
ibw = new ImmutableBytesWritable(); |
... | ... | |
39 | 45 |
} |
40 | 46 |
|
41 | 47 |
@Override |
42 |
protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
|
|
48 |
protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
|
|
43 | 49 |
|
44 |
for (Row row : transformer.apply(value.toString())) { |
|
50 |
try { |
|
51 |
for (final Row row : transformer.apply(value.toString())) { |
|
45 | 52 |
|
46 |
final byte[] rowKey = Bytes.toBytes(row.getKey()); |
|
47 |
final Put put = new Put(rowKey); |
|
48 |
put.setWriteToWAL(WRITE_TO_WAL); |
|
53 |
final byte[] rowKey = Bytes.toBytes(row.getKey());
|
|
54 |
final Put put = new Put(rowKey);
|
|
55 |
put.setWriteToWAL(WRITE_TO_WAL);
|
|
49 | 56 |
|
50 |
for (Column<String, byte[]> col : row) { |
|
51 |
byte[] family = Bytes.toBytes(row.getColumnFamily()); |
|
52 |
byte[] qualifier = Bytes.toBytes(col.getName()); |
|
53 |
put.add(family, qualifier, col.getValue()); |
|
57 |
for (final Column<String, byte[]> col : row) { |
|
58 |
final byte[] family = Bytes.toBytes(row.getColumnFamily()); |
|
59 |
final byte[] qualifier = Bytes.toBytes(col.getName()); |
|
60 |
put.add(family, qualifier, col.getValue()); |
|
61 |
} |
|
62 |
ibw.set(rowKey); |
|
63 |
context.write(ibw, put); |
|
64 |
context.getCounter("mdstore", row.getColumnFamily()).increment(row.getColumns().size()); |
|
54 | 65 |
} |
55 |
ibw.set(rowKey); |
|
56 |
context.write(ibw, put); |
|
57 |
context.getCounter("mdstore", row.getColumnFamily()).increment(row.getColumns().size()); |
|
66 |
} catch (final Throwable e) { |
|
67 |
log.error("error importing the following record on HBase: " + value.toString(), e); |
|
68 |
context.getCounter("error", e.getClass().getName()).increment(1); |
|
69 |
throw new RuntimeException(e); |
|
58 | 70 |
} |
59 | 71 |
} |
60 | 72 |
|
Also available in: Unified diff
including changes to catch and fail for any exception of r35769 of trunk