Project

General

Profile

1 45807 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
2
3 45843 eri.katsar
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
4 48805 eri.katsar
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.compators.DistanceAlgorithms;
5 45844 eri.katsar
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.compators.MyComparator;
6 48805 eri.katsar
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.configuration.Properties;
7 45807 eri.katsar
import org.apache.hadoop.io.Text;
8
import org.apache.hadoop.mapreduce.Reducer;
9
import org.apache.log4j.Logger;
10
11
import java.io.IOException;
12
import java.util.HashMap;
13
import java.util.Map;
14
15
public class LinkCustomReducer extends Reducer<Text, Text, Text, Text> {
16
    private Logger log = Logger.getLogger(LinkCustomReducer.class);
17
    private RedisUtils redisUtils;
18
    private static final String SEPERATOR = ",";
19 48805 eri.katsar
    private double similarityThreshold;
20
    private String distanceAlgorithm;
21 45807 eri.katsar
22
    public static enum LINK_RECUDER_COUNTERS {
23 47680 eri.katsar
        TARGET_RECORDS_EXAMINED,
24
        SOURCE_RECORDS_EXAMINED,
25 48225 eri.katsar
        MATCHING_PAIRS,
26 47680 eri.katsar
        RECORD_COMPARISONS,
27 45807 eri.katsar
        COMPARISONS_PER_BLOCK,
28 48225 eri.katsar
        TOTAL_BLOCKS,
29
        NON_MATCHING_PAIRS
30 45807 eri.katsar
    }
31
32
    @Override
33
    protected void setup(Context context) throws IOException, InterruptedException {
34
        try {
35 48805 eri.katsar
            similarityThreshold = context.getConfiguration().getDouble(Properties.LOD_SIMILARITY_THRESHOLD, 0.8);
36
            log.info("SIMILARITY THRESHOLD " + similarityThreshold);
37
            distanceAlgorithm = context.getConfiguration().get(Properties.LOD_DISTANCE_ALGORITHM);
38 47680 eri.katsar
            redisUtils = new RedisUtils(context.getConfiguration());
39 45807 eri.katsar
        } catch (Exception e) {
40 45878 eri.katsar
            log.error("Error connecting to Redis " + e.toString());
41 45807 eri.katsar
            throw new RuntimeException(e);
42
        }
43
    }
44
45
    @Override
46
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
47
        //each item in the list is a block with the given key
48
        for (Text block : values) {
49 48225 eri.katsar
            context.getCounter(LINK_RECUDER_COUNTERS.TOTAL_BLOCKS).increment(1);
50 45878 eri.katsar
            try {
51
                linkRecords(block.toString(), context);
52
            } catch (Exception e) {
53
                log.error("Error comparing records" + e.toString());
54
                throw new RuntimeException(e);
55 45807 eri.katsar
            }
56
        }
57
    }
58
59
60
    private void linkRecords(String block, Context context) throws Exception {
61
        String[] split = block.split(SEPERATOR);
62
        Map<String, String> sourceRecords = new HashMap<>();
63
        Map<String, String> targetRecords = new HashMap<>();
64 45878 eri.katsar
65 45807 eri.katsar
        for (String recordId : split) {
66 45821 eri.katsar
            if (recordId.contains("source_")) {
67
                sourceRecords.put(recordId, "");
68 47680 eri.katsar
                context.getCounter(LINK_RECUDER_COUNTERS.SOURCE_RECORDS_EXAMINED).increment(1);
69 45821 eri.katsar
            } else {
70 45878 eri.katsar
                targetRecords.put(recordId, redisUtils.getValue(recordId));
71 47680 eri.katsar
                context.getCounter(LINK_RECUDER_COUNTERS.TARGET_RECORDS_EXAMINED).increment(1);
72 45821 eri.katsar
            }
73
        }
74 48796 eri.katsar
75 45821 eri.katsar
        for (String sourceId : sourceRecords.keySet()) {
76
            String sourceRecord = redisUtils.getValue(sourceId);
77
            for (String targetId : targetRecords.keySet()) {
78 45842 eri.katsar
                //store target records to have less I/O ops per record
79
                //test also version where we get each record again in each loop
80
81
                //TODO
82 45878 eri.katsar
                //String targetRecord = redisUtils.getValue(targetId);}
83 45842 eri.katsar
                String targetRecord = targetRecords.get(targetId);
84 48805 eri.katsar
                double similarity = MyComparator.computeSimilarity(DistanceAlgorithms.valueOf(distanceAlgorithm), sourceRecord, targetRecord);
85 47680 eri.katsar
                context.getCounter(LINK_RECUDER_COUNTERS.RECORD_COMPARISONS).increment(1);
86 48796 eri.katsar
87 48805 eri.katsar
                if (similarity >= similarityThreshold) {
88 45821 eri.katsar
                    try {
89 48805 eri.katsar
                        context.write(new Text(sourceId), new Text(targetId + SEPERATOR + similarity + SEPERATOR + sourceRecord.replaceAll(sourceId, "")
90 48796 eri.katsar
                                        + SEPERATOR + targetRecord.replaceAll(targetId, "")));
91 48225 eri.katsar
                        context.getCounter(LINK_RECUDER_COUNTERS.MATCHING_PAIRS).increment(1);
92 45821 eri.katsar
                    } catch (Exception ex) {
93
                        log.error("Error while writing records to output : " + ex.toString());
94
                        throw new IOException("Error while writing records to output", ex);
95 45807 eri.katsar
                    }
96 47680 eri.katsar
                } else {
97 48225 eri.katsar
                    context.getCounter(LINK_RECUDER_COUNTERS.NON_MATCHING_PAIRS).increment(1);
98 47680 eri.katsar
99 45807 eri.katsar
                }
100
            }
101
        }
102 45821 eri.katsar
103
104 45807 eri.katsar
    }
105
106
107
    @Override
108 48796 eri.katsar
    protected void cleanup(Context context) throws IOException, InterruptedException {
109 47680 eri.katsar
        context.getCounter(LINK_RECUDER_COUNTERS.COMPARISONS_PER_BLOCK).setValue(context.getCounter(LINK_RECUDER_COUNTERS.RECORD_COMPARISONS).getValue() /
110 48225 eri.katsar
                context.getCounter(LINK_RECUDER_COUNTERS.TOTAL_BLOCKS).getValue());
111 46462 eri.katsar
        redisUtils.close();
112 45807 eri.katsar
    }
113
}