Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport;
2

    
3
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
4
import org.apache.hadoop.io.Text;
5
import org.apache.hadoop.mapreduce.Reducer;
6
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
7
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
8
import org.apache.log4j.Logger;
9

    
10
import java.io.IOException;
11
import java.util.Iterator;
12

    
13
public class LodReducer extends Reducer<Text, Text, Text, Text> {
14

    
15
    private Logger log = Logger.getLogger(LodReducer.class);
16
    private MultipleOutputs MultipleOutputWriter;
17

    
18
    public static enum REDUCE_COUNTER {
19
        WRITTEN_RECORDS
20
    }
21

    
22
    /**
23
     * Reducer that splits input according to their Type ( datasource, results
24
     * etc..) and writes each kind to a seperate output
25
     */
26

    
27
    @Override
28
    protected void setup(Context context) throws IOException, InterruptedException {
29
        MultipleOutputWriter = new MultipleOutputs((TaskInputOutputContext) context);
30
    }
31

    
32
    @Override
33
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
34

    
35
        Iterator<Text> it = values.iterator();
36

    
37
        while (it.hasNext()) {
38
            String value = it.next().toString();
39
            String[] split = value.split(context.getConfiguration().get("lod.delim"));
40
            value = value.replaceFirst(split[0] + context.getConfiguration().get("lod.delim"), "");
41
            String basePath = key.toString().equals("relations") ? "relations/" + key.toString() : "entities/" + key.toString();
42
            MultipleOutputWriter.write(key.toString(), new Text(split[0]), new Text(value), basePath);
43
            context.getCounter(REDUCE_COUNTER.WRITTEN_RECORDS).increment(1);
44
        }
45
    }
46

    
47
    @Override
48
    protected void cleanup(Context context) throws IOException, InterruptedException {
49
        log.info("Cleaning up reducer...");
50
        MultipleOutputWriter.close();
51
    }
52
}
(2-2/2)