Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
2

    
3
import com.lambdaworks.redis.RedisClient;
4
import com.lambdaworks.redis.RedisConnection;
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Mapper;
7
import org.apache.log4j.Logger;
8

    
9
import java.io.IOException;
10
import java.io.InterruptedIOException;
11

    
12
public class LinkMapper extends Mapper<Text, Text, Text, Text> {
13
    private Logger log = Logger.getLogger(this.getClass());
14
    private String redisHost;
15
    private Integer redisPort;
16
    private RedisConnection connection;
17
    private RedisClient client;
18
    private int optimalBlockSize;
19
    private static final char SEPERATOR = ',';
20
    private static final String OPTIMAL_BLOCKS = "optimalBlockSize";
21

    
22
    public static enum TEST_COUNTERS {
23
        OPTIMAL_BLOCK_SIZE,
24
        ERROR_COUNTERS,
25
        DISCARDED_BLOCKS,
26
        PASSED_BLOCKS,
27
        TOTAL_COMPARISONS,
28
        DISCARDED_COMPARISONS
29
    }
30

    
31
    @Override
32
    protected void setup(Context context) throws IOException, InterruptedException {
33
        try {
34
            initRedis(context);
35
            optimalBlockSize = Integer.valueOf((String) connection.get(OPTIMAL_BLOCKS));
36
            //optimalBlockSize=90000;
37
            context.getCounter(TEST_COUNTERS.OPTIMAL_BLOCK_SIZE).setValue(Long.parseLong((String) connection.get("optimalBlockSize")));
38
        } catch (Exception e) {
39
            log.error("ERROR CONNECTING TO REDIS ", e);
40
            throw new InterruptedIOException("ERROR CONNECTING TO REDIS " + e.toString());
41
        }
42
    }
43

    
44
    private void initRedis(Context context) {
45
        redisHost = context.getConfiguration().get("lod.redisHost");
46
        redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
47
        client = RedisClient.create("redis://" + redisHost + ":" + redisPort);
48
        connection = client.connect();
49
    }
50

    
51
    @Override
52
    protected void map(final Text keyIn, final Text result, final Context context) throws IOException {
53
        try {
54

    
55
            //purge blocks with number of  records > optimal
56
            int recordsNumber = countRecords(result, SEPERATOR);
57
            //purge blocks that contain only source or target entities
58
            //          boolean hasBothSouceAndTarget = resultString.contains("source_") && result.toString().contains("target_");
59
            //how many comparisons we have purged
60
            context.getCounter(TEST_COUNTERS.TOTAL_COMPARISONS).increment(recordsNumber * recordsNumber);
61

    
62
            //       if (recordsNumber == 1 || recordsNumber >= optimalBlockSize || !hasBothSouceAndTarget) {
63
            if (recordsNumber >= optimalBlockSize) {
64
                context.getCounter(TEST_COUNTERS.DISCARDED_BLOCKS).increment(1);
65
                context.getCounter(TEST_COUNTERS.DISCARDED_COMPARISONS).increment(recordsNumber * recordsNumber);
66
            } else {
67
                context.write(keyIn, result);
68
                context.getCounter(TEST_COUNTERS.PASSED_BLOCKS).increment(1);
69
            }
70
        } catch (Exception e) {
71

    
72
            log.error("key causing error is " + keyIn);
73
            log.error("ERROR ", e);
74
            context.getCounter(TEST_COUNTERS.ERROR_COUNTERS).increment(1);
75
            throw new InterruptedIOException(e.toString());
76
        }
77
    }
78

    
79
    private static int countRecords(Text block, char seperator) {
80
        int count = 0;
81
        for (int i = 0; i < block.getLength(); i++) {
82
            if (block.charAt(i) == seperator) {
83
                count++;
84
            }
85
        }
86
        return count;
87
    }
88

    
89
    @Override
90
    protected void cleanup(Context context) throws IOException, InterruptedException {
91
        if (connection != null) {
92
            connection.close();
93
        }
94
        super.cleanup(context);
95
    }
96
}
(4-4/5)