Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
4
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.compators.MyComparator;
5
import org.apache.hadoop.io.Text;
6
import org.apache.hadoop.mapreduce.Reducer;
7
import org.apache.log4j.Logger;
8

    
9
import java.io.IOException;
10
import java.util.HashMap;
11
import java.util.Map;
12

    
13
public class LinkCustomReducer extends Reducer<Text, Text, Text, Text> {
14
    private Logger log = Logger.getLogger(LinkCustomReducer.class);
15
    private RedisUtils redisUtils;
16
    private static final String SEPERATOR = ",";
17
    private static double RECORD_SIMILARITY_THRESHOLD = 0.8;
18

    
19
    public static enum LINK_RECUDER_COUNTERS {
20
        TARGET_TRIPLES,
21
        SOURCE_TRIPLES,
22
        WRITTEN_OUT_ENTITIES,
23
        COMPARISONS,
24
        COMPARISONS_PER_BLOCK,
25
        BLOCKS
26
    }
27

    
28
    @Override
29
    protected void setup(Context context) throws IOException, InterruptedException {
30

    
31
        try {
32
            redisUtils = new RedisUtils(context);
33
        } catch (Exception e) {
34
            log.error("Error connecting to Redis " + e.toString());
35
            throw new RuntimeException(e);
36
        }
37

    
38
    }
39

    
40
    @Override
41
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
42
        //each item in the list is a block with the given key
43
        for (Text block : values) {
44
            context.getCounter(LINK_RECUDER_COUNTERS.BLOCKS).increment(1);
45
            try {
46
                linkRecords(block.toString(), context);
47
            } catch (Exception e) {
48
                log.error("Error comparing records" + e.toString());
49
                throw new RuntimeException(e);
50
            }
51
        }
52
    }
53

    
54

    
55
    private void linkRecords(String block, Context context) throws Exception {
56
        String[] split = block.split(SEPERATOR);
57
        Map<String, String> sourceRecords = new HashMap<>();
58
        Map<String, String> targetRecords = new HashMap<>();
59

    
60
        for (String recordId : split) {
61
            if (recordId.contains("source_")) {
62
                sourceRecords.put(recordId, "");
63
                context.getCounter(LINK_RECUDER_COUNTERS.SOURCE_TRIPLES).increment(1);
64
            } else {
65
                targetRecords.put(recordId, redisUtils.getValue(recordId));
66
                context.getCounter(LINK_RECUDER_COUNTERS.TARGET_TRIPLES).increment(1);
67
            }
68
        }
69
        for (String sourceId : sourceRecords.keySet()) {
70
            String sourceRecord = redisUtils.getValue(sourceId);
71
            for (String targetId : targetRecords.keySet()) {
72
                //store target records to have less I/O ops per record
73
                //test also version where we get each record again in each loop
74
                //
75

    
76
                //TODO
77
                //String targetRecord = redisUtils.getValue(targetId);}
78
                String targetRecord = targetRecords.get(targetId);
79
                double similarity = MyComparator.findMatchingPair(sourceRecord, targetRecord);
80
                context.getCounter(LINK_RECUDER_COUNTERS.COMPARISONS).increment(1);
81
                if (similarity >= RECORD_SIMILARITY_THRESHOLD) {
82
                    try {
83
                        context.write(new Text(sourceId), new Text(targetId + SEPERATOR + similarity));
84
                        context.getCounter(LINK_RECUDER_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1);
85
                    } catch (Exception ex) {
86
                        log.error("Error while writing records to output : " + ex.toString());
87
                        throw new IOException("Error while writing records to output", ex);
88
                    }
89
                }
90
            }
91
        }
92

    
93

    
94
    }
95

    
96

    
97
    @Override
98
    protected void cleanup(Context context) throws IOException
99
            , InterruptedException {
100
        context.getCounter(LINK_RECUDER_COUNTERS.COMPARISONS_PER_BLOCK).setValue(context.getCounter(LINK_RECUDER_COUNTERS.COMPARISONS).getValue() /
101
                context.getCounter(LINK_RECUDER_COUNTERS.BLOCKS).getValue());
102
    }
103
}
(4-4/5)