Project

General

Profile

« Previous | Next » 

Revision 37563

playing with elasticsearch

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/es/ElasticsearchFeedMapper.java
1
package eu.dnetlib.data.mapreduce.es;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.hadoop.hbase.client.Result;
6
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
7
import org.apache.hadoop.hbase.mapreduce.TableMapper;
8
import org.apache.hadoop.hbase.util.Bytes;
9
import org.apache.hadoop.io.BytesWritable;
10
import org.apache.hadoop.io.NullWritable;
11

  
12
import com.google.protobuf.InvalidProtocolBufferException;
13
import com.googlecode.protobuf.format.JsonFormat;
14

  
15
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
16
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
17
import eu.dnetlib.data.proto.OafProtos.Oaf;
18
import eu.dnetlib.data.proto.TypeProtos.Type;
19

  
20
public class ElasticsearchFeedMapper extends TableMapper<NullWritable, BytesWritable> {
21

  
22
	@Override
23
	protected void setup(final Context context) throws IOException, InterruptedException {
24

  
25
	}
26

  
27
	@Override
28
	protected void map(final ImmutableBytesWritable keyIn, final Result value, final Context context) throws IOException, InterruptedException {
29

  
30
		final OafRowKeyDecoder keyDecoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
31

  
32
		final Oaf oaf = mergeUpdates(value, context, keyDecoder.getType(), keyDecoder);
33

  
34
		if (isValid(oaf)) {
35

  
36
			context.write(NullWritable.get(), new BytesWritable(Bytes.toBytes(JsonFormat.printToString(oaf))));
37
		}
38
	}
39

  
40
	private Oaf mergeUpdates(final Result value, final Context context, final Type type, final OafRowKeyDecoder keyDecoder)
41
			throws InvalidProtocolBufferException {
42
		try {
43
			return UpdateMerger.mergeBodyUpdates(context, value.getFamilyMap(Bytes.toBytes(type.toString())));
44
		} catch (final InvalidProtocolBufferException e) {
45
			System.err.println(String.format("Unable to parse proto (Type: %s) in row: %s", type.toString(), keyDecoder.getKey()));
46
			throw e;
47
		}
48
	}
49

  
50
	private boolean isValid(final Oaf oaf) {
51
		return (oaf != null) && oaf.isInitialized();
52
	}
53

  
54
}
modules/dnet-mapreduce-jobs/trunk/pom.xml
209 209
			<artifactId>dnet-actionmanager-common</artifactId>
210 210
			<version>[2.0.0,3.0.0)</version>
211 211
		</dependency>
212
		
213
<!-- 		<dependency> -->
214
<!-- 		    <groupId>org.elasticsearch</groupId> -->
215
<!-- 		    <artifactId>elasticsearch-hadoop</artifactId> -->
216
<!-- 		    <version>2.0.2</version> -->
217
<!-- 		</dependency>		 -->
218
		<dependency>
219
		    <groupId>org.elasticsearch</groupId>
220
		    <artifactId>elasticsearch-hadoop-mr</artifactId>
221
		    <version>2.0.2</version>
222
		</dependency>
223
		
212 224
	</dependencies>
213 225
</project>

Also available in: Unified diff