Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.Iterator;
6

    
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Reducer;
9
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
10
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
11
import org.apache.log4j.Logger;
12

    
13
public class BlockReducer extends Reducer<Text, Text, Text, Text> {
14
    private static final String SEPERATOR = ",";
15

    
16
    public static enum BLOCKS_COUNTER {
17
        TOTAL_ENTITIES
18
    }
19

    
20
    private Logger log = Logger.getLogger(BlockReducer.class);
21

    
22
    private MultipleOutputs MultipleOutputWriter;
23

    
24

    
25
    @Override
26
    protected void setup(Context context) throws IOException, InterruptedException {
27
        MultipleOutputWriter = new MultipleOutputs((TaskInputOutputContext) context);
28
    }
29

    
30

    
31
    @Override
32
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
33
        Iterator<Text> it = values.iterator();
34

    
35
        try {
36
            //each list is a block
37
            StringBuilder field = new StringBuilder();
38
            int nunberOfEntities = 0;
39
            while (it.hasNext()) {
40
                field.append(it.next()).append(SEPERATOR);
41
                nunberOfEntities++;
42
            }
43

    
44
            MultipleOutputWriter.write("b", key, new Text(field.toString()), "blocks/b");
45
            MultipleOutputWriter.write("entitiesNumber", key, new Text(String.valueOf(nunberOfEntities).getBytes(Charset.forName("UTF-8"))), "stats/entitiesNumber");
46
            context.getCounter(BLOCKS_COUNTER.TOTAL_ENTITIES).increment(1);
47
        } catch (Exception e) {
48
            throw new InterruptedException(e.getMessage());
49
        }
50

    
51
    }
52

    
53

    
54
    @Override
55
    protected void cleanup(Context context) throws IOException, InterruptedException {
56
        log.info("Cleaning up reducer...");
57
        MultipleOutputWriter.close();
58

    
59
    }
60
}
(1-1/3)