Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import com.google.common.base.Function;
4
import com.google.common.collect.Iterables;
5
import eu.dnetlib.data.mapreduce.JobParams;
6
import eu.dnetlib.data.mapreduce.util.DedupUtils;
7
import eu.dnetlib.pace.config.DedupConfig;
8
import eu.dnetlib.pace.model.MapDocument;
9
import eu.dnetlib.pace.model.MapDocumentSerializer;
10
import eu.dnetlib.pace.util.BlockProcessor;
11
import eu.dnetlib.pace.util.Reporter;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.apache.hadoop.hbase.client.Durability;
15
import org.apache.hadoop.hbase.client.Put;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableReducer;
18
import org.apache.hadoop.hbase.util.Bytes;
19
import org.apache.hadoop.io.Text;
20

    
21
import javax.annotation.Nullable;
22
import java.io.IOException;
23

    
24
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
25

    
26
	private static final Log log = LogFactory.getLog(DedupReducer.class);
27

    
28
	private DedupConfig dedupConf;
29

    
30
	private ImmutableBytesWritable ibw;
31

    
32
	@Override
33
	protected void setup(final Context context) throws IOException, InterruptedException {
34

    
35
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
36
		ibw = new ImmutableBytesWritable();
37

    
38
		log.info("dedup reduce phase \npace conf: " + dedupConf.toString());
39
	}
40

    
41
	@Override
42
	protected void reduce(final Text key, final Iterable<ImmutableBytesWritable> values, final Context context) {
43

    
44
		final Iterable<MapDocument> docs = Iterables.transform(values, new Function<ImmutableBytesWritable, MapDocument>() {
45
			@Nullable
46
			@Override
47
			public MapDocument apply(@Nullable ImmutableBytesWritable b) {
48
				return MapDocumentSerializer.decode(b.copyBytes());
49
			}
50
		});
51

    
52
		new BlockProcessor(dedupConf).process(key.toString(), docs, new Reporter() {
53

    
54
			@Override
55
			public void incrementCounter(String counterGroup, String counterName, long delta) {
56
				context.getCounter(counterGroup, counterName).increment(delta);
57
			}
58

    
59
			@Override
60
			public void emit(String type, String from, String to) {
61

    
62
				emitRel(context, type, from, to);
63
				emitRel(context, type, to, from);
64
			}
65

    
66
			private void emitRel(final Context context, final String type, final String from, final String to) {
67
				final Put put = new Put(Bytes.toBytes(from)).add(DedupUtils.getSimilarityCFBytes(type), Bytes.toBytes(to), Bytes.toBytes(""));
68
				put.setDurability(Durability.SKIP_WAL);
69
				ibw.set(Bytes.toBytes(from));
70
				try {
71
					context.write(ibw, put);
72
				} catch (IOException | InterruptedException e) {
73
					e.printStackTrace();
74
				}
75
			}
76
		});
77
	}
78

    
79
}
(10-10/16)