Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
4
import org.apache.hadoop.io.Text;
5
import org.apache.hadoop.mapreduce.Reducer;
6
import org.apache.hadoop.mapreduce.TaskInputOutputContext;
7
import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
8
import org.apache.log4j.Logger;
9

    
10
import java.io.IOException;
11
import java.util.HashMap;
12
import java.util.Iterator;
13
import java.util.Map;
14

    
15
;
16

    
17
public class BlockNoCacheReducer extends Reducer<Text, Text, Text, Text> {
18
    private static final String SEPERATOR = ",";
19
    private static final String FIELD_DELIM = "\t";
20
    private static String ERROR_KEY = "";
21
    private static int RECORD_LIMIT = 10000;
22

    
23
    public static enum BLOCKS_COUNTER {
24
        BLOCKS,
25
        DISCARDED_BLOCKS,
26
        REDIS_RECORDS
27
    }
28

    
29
    private Logger log = Logger.getLogger(BlockNoCacheReducer.class);
30

    
31
    private MultipleOutputs MultipleOutputWriter;
32
    private RedisUtils redisUtils;
33

    
34
    @Override
35
    protected void setup(Context context) throws IOException, InterruptedException {
36
        MultipleOutputWriter = new MultipleOutputs((TaskInputOutputContext) context);
37
        redisUtils = new RedisUtils(context.getConfiguration());
38
    }
39

    
40

    
41
    @Override
42
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
43
        Iterator<Text> it = values.iterator();
44
        StringBuilder field = new StringBuilder();
45
        Map<String, String> records = new HashMap<>();
46

    
47
        try {
48
            boolean hasSource = false;
49
            boolean hasTarget = false;
50
            int entitiesNumber = 0;
51

    
52
            while (it.hasNext()) {
53
                entitiesNumber++;
54
                if (entitiesNumber >= RECORD_LIMIT) {
55
                    break;
56
                }
57

    
58
                String record = it.next().toString();
59
                String id = record.substring(0, record.indexOf(FIELD_DELIM));
60
                field.append(id).append(SEPERATOR);
61
                if (id.contains("source_")) {
62
                    hasSource = true;
63
                }
64
                if (id.contains("target_")) {
65
                    hasTarget = true;
66
                }
67

    
68
                records.put(id, record);
69

    
70
            }
71

    
72
            if (hasSource && hasTarget) {
73
                redisUtils.writeMap(records);
74
                context.getCounter(BLOCKS_COUNTER.REDIS_RECORDS).increment(1);
75
            }
76

    
77
            if (hasSource && hasTarget && entitiesNumber < RECORD_LIMIT) {
78
                MultipleOutputWriter.write("blocks", key, field.toString(), "blocks/b");
79
                MultipleOutputWriter.write("entitiesNumber", key, new Text(String.valueOf(entitiesNumber)), "stats/entitiesNumber");
80
                context.getCounter(BLOCKS_COUNTER.BLOCKS).increment(1);
81
            } else {
82
                context.getCounter(BLOCKS_COUNTER.DISCARDED_BLOCKS).increment(1);
83
            }
84

    
85
        } catch (Exception e) {
86
            ERROR_KEY = key.toString();
87
            log.error("key that caused the issue: " + ERROR_KEY);
88
            throw new IOException("Error writing out entities. key that caused the issue: " + ERROR_KEY + "", e);
89

    
90
        }
91
    }
92

    
93

    
94
    protected void cleanup(Context context) throws IOException, InterruptedException {
95
        log.info("Cleaning up reducer...");
96
        if (!ERROR_KEY.isEmpty()) {
97
            log.error("key that caused the issue: " + ERROR_KEY);
98
        }
99
        redisUtils.close();
100
    }
101
}
(1-1/4)