Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.preprocessing;
2

    
3
import org.apache.hadoop.io.Text;
4
import org.apache.hadoop.mapreduce.Reducer;
5
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
6
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
7
import org.apache.log4j.Logger;
8

    
9
import java.io.IOException;
10
import java.nio.charset.Charset;
11
import java.util.Iterator;
12

    
13
public class DatasetReducer extends Reducer<Text, Text, Text, Text> {
14
    public static enum ENTITIES_COUNTER {
15

    
16
        TOTAL_ENTITIES,
17
    }
18

    
19
    private Logger log = Logger.getLogger(DatasetReducer.class);
20
    private MultipleOutputs MultipleOutputWriter;
21

    
22

    
23
    @Override
24
    protected void setup(Context context) throws IOException, InterruptedException {
25
        MultipleOutputWriter = new MultipleOutputs((TaskInputOutputContext) context);
26
    }
27

    
28
    ;
29

    
30
    @Override
31
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
32
        Iterator<Text> it = values.iterator();
33
        StringBuffer value = new StringBuffer();
34

    
35
        String[] tmp = key.toString().split(",");
36
        String dataset = tmp[0];
37
        String entityType = tmp[1];
38
        String id = tmp[2];
39

    
40

    
41
        while (it.hasNext()) {
42
            String field = new String(it.next().copyBytes(), Charset.forName("UTF-8"));
43
            value.append(field);
44
        }
45

    
46
        try {
47
            String output =entityType;
48
            String outputFile = dataset + "/" + output;
49

    
50
            MultipleOutputWriter.write(output, new Text("id\t" + id),
51
                    new Text(value.toString().getBytes(Charset.forName("UTF-8"))), outputFile);
52
            context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(1);
53
        } catch (Exception e) {
54
            log.error(entityType + e.toString());
55
            throw new InterruptedException(entityType + " " + id + " " + e.toString());
56

    
57
        }
58
    }
59

    
60
    @Override
61
    protected void cleanup(Context context) throws IOException, InterruptedException {
62
        log.info("Cleaning up reducer...");
63
        MultipleOutputWriter.close();
64

    
65
    }
66
}
(1-1/3)