Project

General

Profile

« Previous | Next » 

Revision 57502

Added new mapper

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dataexport/ExportInformationSpaceMapper2DHP.java
1
package eu.dnetlib.data.mapreduce.hbase.dataexport;
2

  
3
import com.google.common.base.Joiner;
4
import com.google.gson.Gson;
5
import com.googlecode.protobuf.format.JsonFormat;
6
import eu.dnetlib.data.mapreduce.util.OafDecoder;
7
import eu.dnetlib.data.proto.OafProtos;
8
import eu.dnetlib.dhp.schema.oaf.Oaf;
9
import eu.dnetlib.dhp.schema.util.ProtoConverter;
10
import eu.dnetlib.dhp.schema.util.ProtoUtils;
11
import org.apache.commons.logging.Log;
12
import org.apache.commons.logging.LogFactory;
13
import org.apache.hadoop.hbase.client.Result;
14
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15
import org.apache.hadoop.hbase.mapreduce.TableMapper;
16
import org.apache.hadoop.io.Text;
17

  
18
import java.io.IOException;
19
import java.util.Map;
20
import java.util.Map.Entry;
21
import java.util.NavigableMap;
22

  
23
/**
24
 * Exports Oaf objects as their json serialization.
25
 *
26
 * @author claudio
27
 *
28
 */
29
public class ExportInformationSpaceMapper2DHP extends TableMapper<Text, Text> {
30

  
31
	/**
32
	 * logger.
33
	 */
34
	private static final Log log = LogFactory.getLog(ExportInformationSpaceMapper2DHP.class);
35

  
36

  
37
	private Text keyOut;
38

  
39
	private Text valueOut;
40

  
41
	private Gson g;
42

  
43
	@Override
44
	protected void setup(final Context context) throws IOException, InterruptedException {
45
		super.setup(context);
46

  
47
		keyOut = new Text();
48
		valueOut = new Text();
49
		g = new Gson();
50
	}
51

  
52
	@Override
53
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
54
		try {
55
			Map<byte[], NavigableMap<byte[], byte[]>> row = value.getNoVersionMap();
56

  
57
			for (byte[] cf : row.keySet()) {
58

  
59
				for (Entry<byte[], byte[]> q : row.get(cf).entrySet()) {
60
					final OafProtos.Oaf oaf = OafDecoder.decode(q.getValue()).getOaf();
61
					Oaf result = null;
62
					try {
63
						result = ProtoConverter.convert(JsonFormat.printToString(oaf));
64
					} catch (Throwable e) {
65
						context.getCounter("Convert", "error" ).increment(1);
66
					}
67
					if (result != null) {
68
						emit(result.getClass().getName(), result, context);
69
					}
70
				}
71
			}
72
		} catch (final Throwable e) {
73
			log.error("error exporting the following record from HBase: " + value.toString(), e);
74
			context.getCounter("error", e.getClass().getName()).increment(1);
75
			throw new RuntimeException(e);
76
		}
77
	}
78
	private void emit(final String type,  final Oaf result, final Context context) throws IOException, InterruptedException {
79
		keyOut.set(type);
80
		valueOut.set(g.toJson(result));
81
		context.write(keyOut, valueOut);
82
	}
83
}
modules/dnet-mapreduce-jobs/trunk/pom.xml
234 234
		</dependency>
235 235

  
236 236
		<dependency>
237
			<groupId>eu.dnetlib.dhp</groupId>
238
			<artifactId>dhp-schemas</artifactId>
239
			<version>1.0.2</version>
240
		</dependency>
241

  
242
		<dependency>
237 243
			<groupId>org.elasticsearch</groupId>
238 244
			<artifactId>elasticsearch-hadoop-mr</artifactId>
239 245
			<version>5.2.0</version>

Also available in: Unified diff