Revision 45878
Added by Eri Katsari about 7 years ago
LinkCustomReducer.java | ||
---|---|---|
13 | 13 |
public class LinkCustomReducer extends Reducer<Text, Text, Text, Text> { |
14 | 14 |
private Logger log = Logger.getLogger(LinkCustomReducer.class); |
15 | 15 |
private RedisUtils redisUtils; |
16 |
private static final String LINE_SEPERATOR = "\t.\t"; |
|
17 |
private static final String FIELD_DELIM = "\t"; |
|
18 |
private static final String OPTIMAL_BLOCKS = "optimalBlockSize"; |
|
19 |
private long optimalBlockSize; |
|
20 | 16 |
private static final String SEPERATOR = ","; |
21 | 17 |
private static double RECORD_SIMILARITY_THRESHOLD = 0.8; |
22 | 18 |
|
... | ... | |
34 | 30 |
|
35 | 31 |
try { |
36 | 32 |
redisUtils = new RedisUtils(context); |
37 |
optimalBlockSize = Integer.valueOf(redisUtils.getValue(OPTIMAL_BLOCKS)); |
|
38 |
log.info("OPTIMAL BLOCK SIZE " + optimalBlockSize); |
|
39 | 33 |
} catch (Exception e) { |
40 |
log.error("Error computing stats" + e.toString());
|
|
34 |
log.error("Error connecting to Redis " + e.toString());
|
|
41 | 35 |
throw new RuntimeException(e); |
42 | 36 |
} |
43 | 37 |
|
... | ... | |
47 | 41 |
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { |
48 | 42 |
//each item in the list is a block with the given key |
49 | 43 |
for (Text block : values) { |
50 |
if (!key.toString().matches("^\\d{4}$")) { |
|
51 |
context.getCounter(LINK_RECUDER_COUNTERS.BLOCKS).increment(1); |
|
52 |
try { |
|
53 |
linkRecords(block.toString(), context); |
|
54 |
} catch (Exception e) { |
|
55 |
log.error("Error compators records" + e.toString()); |
|
56 |
throw new RuntimeException(e); |
|
57 |
} |
|
58 |
|
|
44 |
context.getCounter(LINK_RECUDER_COUNTERS.BLOCKS).increment(1); |
|
45 |
try { |
|
46 |
linkRecords(block.toString(), context); |
|
47 |
} catch (Exception e) { |
|
48 |
log.error("Error comparing records" + e.toString()); |
|
49 |
throw new RuntimeException(e); |
|
59 | 50 |
} |
60 | 51 |
} |
61 | 52 |
} |
... | ... | |
65 | 56 |
String[] split = block.split(SEPERATOR); |
66 | 57 |
Map<String, String> sourceRecords = new HashMap<>(); |
67 | 58 |
Map<String, String> targetRecords = new HashMap<>(); |
59 |
|
|
68 | 60 |
for (String recordId : split) { |
69 | 61 |
if (recordId.contains("source_")) { |
70 | 62 |
sourceRecords.put(recordId, ""); |
71 | 63 |
context.getCounter(LINK_RECUDER_COUNTERS.SOURCE_TRIPLES).increment(1); |
72 | 64 |
} else { |
73 |
targetRecords.put(recordId, "");
|
|
65 |
targetRecords.put(recordId, redisUtils.getValue(recordId));
|
|
74 | 66 |
context.getCounter(LINK_RECUDER_COUNTERS.TARGET_TRIPLES).increment(1); |
75 | 67 |
} |
76 | 68 |
} |
... | ... | |
82 | 74 |
// |
83 | 75 |
|
84 | 76 |
//TODO |
85 |
//String targetRecord = redisUtils.getValue(targetId); |
|
86 |
|
|
87 |
if (targetRecords.get(targetId).isEmpty()) { |
|
88 |
targetRecords.put(targetId, redisUtils.getValue(targetId)); |
|
89 |
} |
|
90 |
|
|
91 |
|
|
77 |
//String targetRecord = redisUtils.getValue(targetId);} |
|
92 | 78 |
String targetRecord = targetRecords.get(targetId); |
93 | 79 |
double similarity = MyComparator.findMatchingPair(sourceRecord, targetRecord); |
94 | 80 |
context.getCounter(LINK_RECUDER_COUNTERS.COMPARISONS).increment(1); |
95 | 81 |
if (similarity >= RECORD_SIMILARITY_THRESHOLD) { |
96 | 82 |
try { |
97 |
context.write(new Text(sourceId), new Text(targetId + "," + similarity));
|
|
83 |
context.write(new Text(sourceId), new Text(targetId + SEPERATOR + similarity));
|
|
98 | 84 |
context.getCounter(LINK_RECUDER_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1); |
99 | 85 |
} catch (Exception ex) { |
100 | 86 |
log.error("Error while writing records to output : " + ex.toString()); |
Also available in: Unified diff
Added M/R step for verification