Project

General

Profile

« Previous | Next » 

Revision 45878

Added by Eri Katsari about 7 years ago

Added M/R step for verification

View differences:

LinkCustomReducer.java
13 13
public class LinkCustomReducer extends Reducer<Text, Text, Text, Text> {
14 14
    private Logger log = Logger.getLogger(LinkCustomReducer.class);
15 15
    private RedisUtils redisUtils;
16
    private static final String LINE_SEPERATOR = "\t.\t";
17
    private static final String FIELD_DELIM = "\t";
18
    private static final String OPTIMAL_BLOCKS = "optimalBlockSize";
19
    private long optimalBlockSize;
20 16
    private static final String SEPERATOR = ",";
21 17
    private static double RECORD_SIMILARITY_THRESHOLD = 0.8;
22 18

  
......
34 30

  
35 31
        try {
36 32
            redisUtils = new RedisUtils(context);
37
            optimalBlockSize = Integer.valueOf(redisUtils.getValue(OPTIMAL_BLOCKS));
38
            log.info("OPTIMAL BLOCK SIZE " + optimalBlockSize);
39 33
        } catch (Exception e) {
40
            log.error("Error computing stats" + e.toString());
34
            log.error("Error connecting to Redis " + e.toString());
41 35
            throw new RuntimeException(e);
42 36
        }
43 37

  
......
47 41
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
48 42
        //each item in the list is a block with the given key
49 43
        for (Text block : values) {
50
            if (!key.toString().matches("^\\d{4}$")) {
51
                context.getCounter(LINK_RECUDER_COUNTERS.BLOCKS).increment(1);
52
                try {
53
                    linkRecords(block.toString(), context);
54
                } catch (Exception e) {
55
                    log.error("Error compators records" + e.toString());
56
                    throw new RuntimeException(e);
57
                }
58

  
44
            context.getCounter(LINK_RECUDER_COUNTERS.BLOCKS).increment(1);
45
            try {
46
                linkRecords(block.toString(), context);
47
            } catch (Exception e) {
48
                log.error("Error comparing records" + e.toString());
49
                throw new RuntimeException(e);
59 50
            }
60 51
        }
61 52
    }
......
65 56
        String[] split = block.split(SEPERATOR);
66 57
        Map<String, String> sourceRecords = new HashMap<>();
67 58
        Map<String, String> targetRecords = new HashMap<>();
59

  
68 60
        for (String recordId : split) {
69 61
            if (recordId.contains("source_")) {
70 62
                sourceRecords.put(recordId, "");
71 63
                context.getCounter(LINK_RECUDER_COUNTERS.SOURCE_TRIPLES).increment(1);
72 64
            } else {
73
                targetRecords.put(recordId, "");
65
                targetRecords.put(recordId, redisUtils.getValue(recordId));
74 66
                context.getCounter(LINK_RECUDER_COUNTERS.TARGET_TRIPLES).increment(1);
75 67
            }
76 68
        }
......
82 74
                //
83 75

  
84 76
                //TODO
85
                //String targetRecord = redisUtils.getValue(targetId);
86

  
87
                if (targetRecords.get(targetId).isEmpty()) {
88
                    targetRecords.put(targetId, redisUtils.getValue(targetId));
89
                }
90

  
91

  
77
                //String targetRecord = redisUtils.getValue(targetId);}
92 78
                String targetRecord = targetRecords.get(targetId);
93 79
                double similarity = MyComparator.findMatchingPair(sourceRecord, targetRecord);
94 80
                context.getCounter(LINK_RECUDER_COUNTERS.COMPARISONS).increment(1);
95 81
                if (similarity >= RECORD_SIMILARITY_THRESHOLD) {
96 82
                    try {
97
                        context.write(new Text(sourceId), new Text(targetId + "," + similarity));
83
                        context.write(new Text(sourceId), new Text(targetId + SEPERATOR + similarity));
98 84
                        context.getCounter(LINK_RECUDER_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1);
99 85
                    } catch (Exception ex) {
100 86
                        log.error("Error while writing records to output : " + ex.toString());

Also available in: Unified diff