Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.compators.DistanceAlgorithms;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.compators.MyComparator;
6
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties;
7
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Reducer;
9
import org.apache.log4j.Logger;
10

    
11
import java.io.IOException;
12
import java.util.HashMap;
13
import java.util.Map;
14

    
15
public class LinkCustomReducer extends Reducer<Text, Text, Text, Text> {
16
    private Logger log = Logger.getLogger(LinkCustomReducer.class);
17
    private RedisUtils redisUtils;
18
    private static final String SEPERATOR = ",";
19
    private double similarityThreshold;
20
    private String distanceAlgorithm;
21

    
22
    public static enum LINK_RECUDER_COUNTERS {
23
        TARGET_RECORDS_EXAMINED,
24
        SOURCE_RECORDS_EXAMINED,
25
        MATCHING_PAIRS,
26
        RECORD_COMPARISONS,
27
        COMPARISONS_PER_BLOCK,
28
        TOTAL_BLOCKS,
29
        NON_MATCHING_PAIRS
30
    }
31

    
32
    @Override
33
    protected void setup(Context context) throws IOException, InterruptedException {
34
        try {
35
            similarityThreshold = context.getConfiguration().getDouble(Properties.LOD_SIMILARITY_THRESHOLD, 0.8);
36
            log.info("SIMILARITY THRESHOLD " + similarityThreshold);
37
            distanceAlgorithm = context.getConfiguration().get(Properties.LOD_DISTANCE_ALGORITHM);
38
            redisUtils = new RedisUtils(context.getConfiguration());
39
        } catch (Exception e) {
40
            log.error("Error connecting to Redis " + e.toString());
41
            throw new RuntimeException(e);
42
        }
43
    }
44

    
45
    @Override
46
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
47
        //each item in the list is a block with the given key
48
        for (Text block : values) {
49
            context.getCounter(LINK_RECUDER_COUNTERS.TOTAL_BLOCKS).increment(1);
50
            try {
51
                linkRecords(block.toString(), context);
52
            } catch (Exception e) {
53
                log.error("Error comparing records" + e.toString());
54
                throw new RuntimeException(e);
55
            }
56
        }
57
    }
58

    
59

    
60
    private void linkRecords(String block, Context context) throws Exception {
61
        String[] split = block.split(SEPERATOR);
62
        Map<String, String> sourceRecords = new HashMap<>();
63
        Map<String, String> targetRecords = new HashMap<>();
64

    
65
        for (String recordId : split) {
66
            if (recordId.contains("source_")) {
67
                sourceRecords.put(recordId, "");
68
                context.getCounter(LINK_RECUDER_COUNTERS.SOURCE_RECORDS_EXAMINED).increment(1);
69
            } else {
70
                targetRecords.put(recordId, redisUtils.getValue(recordId));
71
                context.getCounter(LINK_RECUDER_COUNTERS.TARGET_RECORDS_EXAMINED).increment(1);
72
            }
73
        }
74

    
75
        for (String sourceId : sourceRecords.keySet()) {
76
            String sourceRecord = redisUtils.getValue(sourceId);
77
            for (String targetId : targetRecords.keySet()) {
78
                //store target records to have less I/O ops per record
79
                //test also version where we get each record again in each loop
80

    
81
                //TODO
82
                //String targetRecord = redisUtils.getValue(targetId);}
83
                String targetRecord = targetRecords.get(targetId);
84
                double similarity = MyComparator.computeSimilarity(DistanceAlgorithms.valueOf(distanceAlgorithm), sourceRecord, targetRecord);
85
                context.getCounter(LINK_RECUDER_COUNTERS.RECORD_COMPARISONS).increment(1);
86

    
87
                if (similarity >= similarityThreshold) {
88
                    try {
89
                        context.write(new Text(sourceId), new Text(targetId + SEPERATOR + similarity + SEPERATOR + sourceRecord.replaceAll(sourceId, "")
90
                                        + SEPERATOR + targetRecord.replaceAll(targetId, "")));
91
                        context.getCounter(LINK_RECUDER_COUNTERS.MATCHING_PAIRS).increment(1);
92
                    } catch (Exception ex) {
93
                        log.error("Error while writing records to output : " + ex.toString());
94
                        throw new IOException("Error while writing records to output", ex);
95
                    }
96
                } else {
97
                    context.getCounter(LINK_RECUDER_COUNTERS.NON_MATCHING_PAIRS).increment(1);
98

    
99
                }
100
            }
101
        }
102

    
103

    
104
    }
105

    
106

    
107
    @Override
108
    protected void cleanup(Context context) throws IOException, InterruptedException {
109
        context.getCounter(LINK_RECUDER_COUNTERS.COMPARISONS_PER_BLOCK).setValue(context.getCounter(LINK_RECUDER_COUNTERS.RECORD_COMPARISONS).getValue() /
110
                context.getCounter(LINK_RECUDER_COUNTERS.TOTAL_BLOCKS).getValue());
111
        redisUtils.close();
112
    }
113
}
(2-2/3)