Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5

    
6
import org.apache.commons.lang.StringUtils;
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Reducer;
12
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
13

    
14
import com.google.common.base.Function;
15
import com.google.common.base.Joiner;
16
import com.google.common.collect.Iterables;
17

    
18
import eu.dnetlib.data.mapreduce.JobParams;
19
import eu.dnetlib.pace.config.DedupConfig;
20

    
21
public class DedupRootsToCsvReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
22

    
23
	/**
24
	 * logger.
25
	 */
26
	private static final Log log = LogFactory.getLog(DedupRootsToCsvReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
27

    
28
	private static final String COUNTER_GROUP = "csv";
29

    
30
	enum Tables {
31
		Groups, NativeGroups, NativeEntities
32
	}
33

    
34
	private DedupConfig dedupConf;
35

    
36
	private String DELIM;
37

    
38
	private String WRAP;
39

    
40
	private Text tKey;
41
	private Text tValue;
42
	private MultipleOutputs<Text, Text> mos;
43

    
44
	@Override
45
	protected void setup(final Context context) throws IOException, InterruptedException {
46
		super.setup(context);
47
		tKey = new Text();
48
		tValue = new Text();
49

    
50
		mos = new MultipleOutputs<Text, Text>(context);
51
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
52

    
53
		log.info("wf conf: " + dedupConf.toString());
54

    
55
		DELIM = context.getConfiguration().get("mapred.textoutputformat.separator", "!");
56
		WRAP = context.getConfiguration().get("mapred.textoutputformat.wrapper", "#");
57

    
58
		log.info("using field DELIMITER: '" + DELIM + "'");
59
	}
60

    
61
	@Override
62
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
63
	InterruptedException {
64

    
65
		final String csv = Joiner.on(DELIM).join(Iterables.transform(values, new Function<ImmutableBytesWritable, String>() {
66

    
67
			@Override
68
			public String apply(final ImmutableBytesWritable ibw) {
69

    
70
				return new String(ibw.copyBytes(), Charset.forName("UTF-8"));
71
			}
72
		}));
73

    
74
		tKey.set(StringUtils.substringBefore(csv, DELIM));
75
		tValue.set(StringUtils.substringAfter(csv, DELIM));
76

    
77
		mos.write(Tables.Groups.toString(), tKey, tValue, Tables.Groups.toString());
78
		context.getCounter(COUNTER_GROUP, "groups").increment(StringUtils.countMatches(csv, DELIM));
79
	}
80

    
81
	@Override
82
	public void cleanup(final Context context) throws IOException, InterruptedException {
83
		mos.close();
84
	}
85

    
86
}
(12-12/16)