Project

General

Profile

1 43025 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2 42692 eri.katsar
3 43143 giorgos.al
import java.io.IOException;
4 43395 eri.katsar
import java.nio.charset.Charset;
5 43143 giorgos.al
import java.util.Iterator;
6
7 42692 eri.katsar
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Reducer;
9 43395 eri.katsar
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
10
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
11 42692 eri.katsar
import org.apache.log4j.Logger;
12
13 43025 eri.katsar
public class BlockReducer extends Reducer<Text, Text, Text, Text> {
14 43950 eri.katsar
    private static final String SEPERATOR = ",";
15 43897 eri.katsar
16 43485 eri.katsar
    public static enum BLOCKS_COUNTER {
17
        TOTAL_ENTITIES
18 43093 eri.katsar
    }
19 42692 eri.katsar
20 43093 eri.katsar
    private Logger log = Logger.getLogger(BlockReducer.class);
21 42692 eri.katsar
22 43395 eri.katsar
    private MultipleOutputs MultipleOutputWriter;
23 43025 eri.katsar
24 43395 eri.katsar
25 43093 eri.katsar
    @Override
26
    protected void setup(Context context) throws IOException, InterruptedException {
27 43395 eri.katsar
        MultipleOutputWriter = new MultipleOutputs((TaskInputOutputContext) context);
28 43093 eri.katsar
    }
29 42692 eri.katsar
30
31 43093 eri.katsar
    @Override
32
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
33
        Iterator<Text> it = values.iterator();
34 43054 eri.katsar
35 43093 eri.katsar
        try {
36
            //each list is a block
37
            StringBuilder field = new StringBuilder();
38 43395 eri.katsar
            int nunberOfEntities = 0;
39 43093 eri.katsar
            while (it.hasNext()) {
40 43897 eri.katsar
                field.append(it.next()).append(SEPERATOR);
41 43395 eri.katsar
                nunberOfEntities++;
42
            }
43 43100 eri.katsar
44 43897 eri.katsar
            MultipleOutputWriter.write("b", key, new Text(field.toString()), "blocks/b");
45 43485 eri.katsar
            MultipleOutputWriter.write("entitiesNumber", key, new Text(String.valueOf(nunberOfEntities).getBytes(Charset.forName("UTF-8"))), "stats/entitiesNumber");
46
            context.getCounter(BLOCKS_COUNTER.TOTAL_ENTITIES).increment(1);
47 43093 eri.katsar
        } catch (Exception e) {
48
            throw new InterruptedException(e.getMessage());
49
        }
50 42692 eri.katsar
51 43093 eri.katsar
    }
52 42802 eri.katsar
53 43093 eri.katsar
54
    @Override
55
    protected void cleanup(Context context) throws IOException, InterruptedException {
56
        log.info("Cleaning up reducer...");
57 43395 eri.katsar
        MultipleOutputWriter.close();
58 43093 eri.katsar
59
    }
60 42692 eri.katsar
}