Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Objects;
6

    
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Reducer;
12

    
13
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
14

    
15
public class DliArticleEnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
16

    
17
	private static final int LIMIT = 1000;
18
	private Text tKey = new Text("");
19

    
20
	private static final Log log = LogFactory.getLog(DliArticleEnrichmentReducer.class);
21

    
22
	@Override
23
	protected void setup(final Context context) throws IOException, InterruptedException {
24
		super.setup(context);
25

    
26
		System.out.println("LIMIT: " + LIMIT);
27

    
28
	}
29

    
30
	@Override
31
	protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context)
32
			throws IOException, InterruptedException {
33
		// TODO Auto-generated method stub
34
		super.reduce(key, values, context);
35
	}
36

    
37
	private void emit(final List<EventWrapper> eventWrappers, final Context context) {
38
		eventWrappers.stream().filter(Objects::nonNull).forEach(ew -> emit(ew, context));
39
	}
40

    
41
	private void emit(final EventWrapper eventWrapper, final Context context) {
42
		try {
43
			final Text valueout = new Text(eventWrapper.asBrokerEvent().asJson());
44
			context.write(tKey, valueout);
45
			eventWrapper.incrementCounter(context);
46
		} catch (Exception e) {
47
			throw new RuntimeException(e);
48
		}
49
	}
50

    
51
}
(2-2/2)