Project

General

Profile

« Previous | Next » 

Revision 35899

including changes to catch and fail for any exception of r35769 of trunk

View differences:

modules/dnet-mapreduce-jobs/branches/0.0.6.3.x/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataimport/ImportRecordsMapper.java
2 2

  
3 3
import java.io.IOException;
4 4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
5 7
import org.apache.hadoop.hbase.client.Put;
6 8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
7 9
import org.apache.hadoop.hbase.util.Bytes;
......
16 18

  
17 19
public class ImportRecordsMapper extends Mapper<Text, Text, ImmutableBytesWritable, Put> {
18 20

  
21
	/**
22
	 * logger.
23
	 */
24
	private static final Log log = LogFactory.getLog(ImportRecordsMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
19 25
	private static final boolean WRITE_TO_WAL = false;
20 26

  
21 27
	private XsltRowTransformer transformer;
......
25 31
	private ImmutableBytesWritable ibw;
26 32

  
27 33
	@Override
28
	protected void setup(Context context) throws IOException, InterruptedException {
34
	protected void setup(final Context context) throws IOException, InterruptedException {
29 35
		super.setup(context);
30 36

  
31 37
		this.xslt = context.getConfiguration().get(JobParams.HBASE_IMPORT_XSLT).trim();
32 38

  
33
		if (xslt == null || xslt.isEmpty()) { throw new IllegalArgumentException("missing xslt"); }
39
		if ((xslt == null) || xslt.isEmpty()) throw new IllegalArgumentException("missing xslt");
34 40

  
35 41
		transformer = new XsltRowTransformerFactory().getTransformer(xslt);
36 42
		ibw = new ImmutableBytesWritable();
......
39 45
	}
40 46

  
41 47
	@Override
42
	protected void map(Text key, Text value, Context context) throws IOException, InterruptedException {
48
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
43 49

  
44
		for (Row row : transformer.apply(value.toString())) {
50
		try {
51
			for (final Row row : transformer.apply(value.toString())) {
45 52

  
46
			final byte[] rowKey = Bytes.toBytes(row.getKey());
47
			final Put put = new Put(rowKey);
48
			put.setWriteToWAL(WRITE_TO_WAL);
53
				final byte[] rowKey = Bytes.toBytes(row.getKey());
54
				final Put put = new Put(rowKey);
55
				put.setWriteToWAL(WRITE_TO_WAL);
49 56

  
50
			for (Column<String, byte[]> col : row) {
51
				byte[] family = Bytes.toBytes(row.getColumnFamily());
52
				byte[] qualifier = Bytes.toBytes(col.getName());
53
				put.add(family, qualifier, col.getValue());
57
				for (final Column<String, byte[]> col : row) {
58
					final byte[] family = Bytes.toBytes(row.getColumnFamily());
59
					final byte[] qualifier = Bytes.toBytes(col.getName());
60
					put.add(family, qualifier, col.getValue());
61
				}
62
				ibw.set(rowKey);
63
				context.write(ibw, put);
64
				context.getCounter("mdstore", row.getColumnFamily()).increment(row.getColumns().size());
54 65
			}
55
			ibw.set(rowKey);
56
			context.write(ibw, put);
57
			context.getCounter("mdstore", row.getColumnFamily()).increment(row.getColumns().size());
66
		} catch (final Throwable e) {
67
			log.error("error importing the following record on HBase: " + value.toString(), e);
68
			context.getCounter("error", e.getClass().getName()).increment(1);
69
			throw new RuntimeException(e);
58 70
		}
59 71
	}
60 72

  

Also available in: Unified diff