Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

    
3
import java.io.IOException;
4
import java.lang.reflect.Type;
5
import java.util.Map;
6

    
7
import com.google.common.collect.Maps;
8
import com.google.gson.Gson;
9
import com.google.gson.reflect.TypeToken;
10
import eu.dnetlib.data.mapreduce.JobParams;
11
import eu.dnetlib.data.transform.Column;
12
import eu.dnetlib.data.transform.Row;
13
import eu.dnetlib.data.transform.XsltRowTransformer;
14
import eu.dnetlib.data.transform.XsltRowTransformerFactory;
15
import org.apache.commons.logging.Log;
16
import org.apache.commons.logging.LogFactory;
17
import org.apache.hadoop.hbase.client.Put;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.util.Bytes;
20
import org.apache.hadoop.io.Text;
21
import org.apache.hadoop.mapreduce.Mapper;
22

    
23
public class ImportRecordsMapper extends Mapper<Text, Text, ImmutableBytesWritable, Put> {
24

    
25
	/**
26
	 * logger.
27
	 */
28
	private static final Log log = LogFactory.getLog(ImportRecordsMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
29

    
30
	private XsltRowTransformer transformer;
31

    
32
	private ImmutableBytesWritable ibw;
33

    
34
	@Override
35
	protected void setup(final Context context) throws IOException, InterruptedException {
36
		super.setup(context);
37

    
38
		final String xslt = context.getConfiguration().get(JobParams.HBASE_IMPORT_XSLT).trim();
39

    
40
		if ((xslt == null) || xslt.isEmpty()) throw new IllegalArgumentException("missing xslt");
41
		final Map<String, Object> xslParams = Maps.newHashMap();
42

    
43
		boolean invisible = context.getConfiguration().getBoolean(JobParams.INVISIBLE, false);
44
		log.info("invisible flag: " + invisible);
45
		xslParams.put("invisible", invisible);
46

    
47
		transformer = XsltRowTransformerFactory.newInstance(xslt, xslParams);
48

    
49
		ibw = new ImmutableBytesWritable();
50

    
51
		log.info("got xslt: '" + xslt);
52
		log.info("using trasformer: '" + transformer.getTransformerClassName() + "'");
53
	}
54

    
55
	@Override
56
	protected void map(final Text key, final Text value, final Context context) throws IOException, InterruptedException {
57
		try {
58
			for (final Row row : transformer.apply(value.toString())) {
59

    
60
				final byte[] rowKey = Bytes.toBytes(row.getKey());
61
				final Put put = new Put(rowKey);
62
				put.setWriteToWAL(JobParams.WRITE_TO_WAL);
63

    
64
				for (final Column<String, byte[]> col : row) {
65
					final byte[] family = Bytes.toBytes(row.getColumnFamily());
66
					final byte[] qualifier = Bytes.toBytes(col.getName());
67
					put.add(family, qualifier, col.getValue());
68
				}
69

    
70
				ibw.set(rowKey);
71
				context.write(ibw, put);
72
				context.getCounter("mdstore", row.getColumnFamily()).increment(row.getColumns().size());
73
			}
74
		} catch (final Throwable e) {
75
			log.error("error importing the following record on HBase: " + value.toString(), e);
76
			context.getCounter("error", e.getClass().getName()).increment(1);
77
			throw new RuntimeException(e);
78
		}
79
	}
80

    
81
	private Map<String, String> getDatasourceTypeMap(final Context context) {
82
		final String dsType = context.getConfiguration().get("datasourceTypeMap");
83

    
84
		final Type token = new TypeToken<Map<String, String>>() {}.getType();
85
		final Map<String, String> dsMap = new Gson().fromJson(dsType, token);
86
		return dsMap;
87
	}
88

    
89
}
(3-3/4)