Project

General

Profile

1 36164 claudio.at
package eu.dnetlib.data.mapreduce.hbase.dedup;
2
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
6 36796 claudio.at
import org.apache.commons.lang.StringUtils;
7 36164 claudio.at
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Reducer;
12
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
13
14 36796 claudio.at
import com.google.common.base.Function;
15
import com.google.common.base.Joiner;
16
import com.google.common.collect.Iterables;
17 36164 claudio.at
18
import eu.dnetlib.data.mapreduce.JobParams;
19 36670 claudio.at
import eu.dnetlib.pace.config.DedupConfig;
20 36164 claudio.at
21
public class DedupRootsToCsvReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
22
23
	/**
24
	 * logger.
25
	 */
26
	private static final Log log = LogFactory.getLog(DedupRootsToCsvReducer.class); // NOPMD by marko on 11/24/08 5:02 PM
27
28
	private static final String COUNTER_GROUP = "csv";
29
30
	enum Tables {
31
		Groups, NativeGroups, NativeEntities
32
	}
33
34
	private DedupConfig dedupConf;
35
36
	private String DELIM;
37
38
	private String WRAP;
39
40
	private Text tKey;
41
	private Text tValue;
42
	private MultipleOutputs<Text, Text> mos;
43
44
	@Override
45
	protected void setup(final Context context) throws IOException, InterruptedException {
46
		super.setup(context);
47
		tKey = new Text();
48
		tValue = new Text();
49
50
		mos = new MultipleOutputs<Text, Text>(context);
51 36670 claudio.at
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
52 36164 claudio.at
53
		log.info("wf conf: " + dedupConf.toString());
54
55
		DELIM = context.getConfiguration().get("mapred.textoutputformat.separator", "!");
56
		WRAP = context.getConfiguration().get("mapred.textoutputformat.wrapper", "#");
57
58 36796 claudio.at
		log.info("using field DELIMITER: '" + DELIM + "'");
59 36164 claudio.at
	}
60
61
	@Override
62
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
63 36796 claudio.at
	InterruptedException {
64 36164 claudio.at
65 36796 claudio.at
		final String csv = Joiner.on(DELIM).join(Iterables.transform(values, new Function<ImmutableBytesWritable, String>() {
66 36164 claudio.at
67 36796 claudio.at
			@Override
68
			public String apply(final ImmutableBytesWritable ibw) {
69 36164 claudio.at
70 36796 claudio.at
				return new String(ibw.copyBytes(), Charset.forName("UTF-8"));
71
			}
72
		}));
73 36164 claudio.at
74 36796 claudio.at
		tKey.set(StringUtils.substringBefore(csv, DELIM));
75
		tValue.set(StringUtils.substringAfter(csv, DELIM));
76 36164 claudio.at
77
		mos.write(Tables.Groups.toString(), tKey, tValue, Tables.Groups.toString());
78 36796 claudio.at
		context.getCounter(COUNTER_GROUP, "groups").increment(StringUtils.countMatches(csv, DELIM));
79 36164 claudio.at
	}
80
81
	@Override
82
	public void cleanup(final Context context) throws IOException, InterruptedException {
83
		mos.close();
84
	}
85
86
}