Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties;
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Mapper;
7
import org.apache.log4j.Logger;
8

    
9
import java.io.IOException;
10
import java.io.InterruptedIOException;
11

    
12
public class LinkMapper extends Mapper<Text, Text, Text, Text> {
13
    private Logger log = Logger.getLogger(this.getClass());
14
    private RedisUtils redisUtils;
15
    private int optimalBlockSize;
16
    private static final char SEPERATOR = ',';
17

    
18
    public static enum LINK_MAPPER_COUNTERS {
19
        OPTIMAL_BLOCK_SIZE,
20
        ERROR_COUNTERS,
21
        DISCARDED_BLOCKS,
22
        PASSED_BLOCKS,
23
        TOTAL_COMPARISONS,
24
        DISCARDED_COMPARISONS,
25
        TOTAL_BLOCKS
26
    }
27

    
28
    @Override
29
    protected void setup(Context context) throws IOException, InterruptedException {
30
        try {
31
            redisUtils = new RedisUtils(context.getConfiguration());
32
            optimalBlockSize = Integer.valueOf(redisUtils.getValue(Properties.OPTIMAL_BLOCKS));
33
            context.getCounter(LINK_MAPPER_COUNTERS.OPTIMAL_BLOCK_SIZE).setValue(optimalBlockSize);
34
            log.info("optimalBlockSize " + optimalBlockSize);
35
        } catch (Exception e) {
36
            log.error("ERROR CONNECTING TO REDIS ", e);
37
            throw new InterruptedIOException("ERROR CONNECTING TO REDIS " + e.toString());
38
        }
39
    }
40

    
41
    @Override
42
    protected void map(final Text keyIn, final Text result, final Context context) throws IOException {
43
        try {
44

    
45
            //purge blocks with number of  records > optimal
46
            int recordsNumber = countRecords(result);
47
            //purge blocks that contain only source or target entities
48
            //how many comparisons we have purged
49
            context.getCounter(LINK_MAPPER_COUNTERS.TOTAL_COMPARISONS).increment(recordsNumber * (recordsNumber - 1));
50
            if (recordsNumber >= optimalBlockSize) {
51
                context.getCounter(LINK_MAPPER_COUNTERS.DISCARDED_BLOCKS).increment(1);
52
                //TODO count source / target records and get actual comparison number
53
                context.getCounter(LINK_MAPPER_COUNTERS.DISCARDED_COMPARISONS).increment(recordsNumber * (recordsNumber - 1));
54
            } else {
55
                context.write(keyIn, result);
56
                context.getCounter(LINK_MAPPER_COUNTERS.PASSED_BLOCKS).increment(1);
57
            }
58
            context.getCounter(LINK_MAPPER_COUNTERS.TOTAL_BLOCKS).increment(1);
59

    
60
        } catch (Exception e) {
61
            log.error("key causing error is " + keyIn);
62
            log.error("ERROR ", e);
63
            context.getCounter(LINK_MAPPER_COUNTERS.ERROR_COUNTERS).increment(1);
64
            throw new IOException(e.toString(), e);
65
        }
66
    }
67

    
68
    private static int countRecords(Text block) {
69
        int count = 0;
70
        for (int i = 0; i < block.getLength(); i++) {
71
            if (block.charAt(i) == SEPERATOR) {
72
                count++;
73
            }
74
        }
75
        return count;
76
    }
77

    
78
}
(3-3/3)