1 |
45807
|
eri.katsar
|
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
|
2 |
|
|
|
3 |
45843
|
eri.katsar
|
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
|
4 |
45844
|
eri.katsar
|
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.compators.MyComparator;
|
5 |
45807
|
eri.katsar
|
import org.apache.hadoop.io.Text;
|
6 |
|
|
import org.apache.hadoop.mapreduce.Reducer;
|
7 |
|
|
import org.apache.log4j.Logger;
|
8 |
|
|
|
9 |
|
|
import java.io.IOException;
|
10 |
|
|
import java.util.HashMap;
|
11 |
|
|
import java.util.Map;
|
12 |
|
|
|
13 |
|
|
public class LinkCustomReducer extends Reducer<Text, Text, Text, Text> {
|
14 |
|
|
private Logger log = Logger.getLogger(LinkCustomReducer.class);
|
15 |
|
|
private RedisUtils redisUtils;
|
16 |
|
|
private static final String SEPERATOR = ",";
|
17 |
45821
|
eri.katsar
|
private static double RECORD_SIMILARITY_THRESHOLD = 0.8;
|
18 |
45807
|
eri.katsar
|
|
19 |
|
|
public static enum LINK_RECUDER_COUNTERS {
|
20 |
|
|
TARGET_TRIPLES,
|
21 |
|
|
SOURCE_TRIPLES,
|
22 |
|
|
WRITTEN_OUT_ENTITIES,
|
23 |
|
|
COMPARISONS,
|
24 |
|
|
COMPARISONS_PER_BLOCK,
|
25 |
|
|
BLOCKS
|
26 |
|
|
}
|
27 |
|
|
|
28 |
|
|
@Override
|
29 |
|
|
protected void setup(Context context) throws IOException, InterruptedException {
|
30 |
|
|
|
31 |
|
|
try {
|
32 |
|
|
redisUtils = new RedisUtils(context);
|
33 |
|
|
} catch (Exception e) {
|
34 |
45878
|
eri.katsar
|
log.error("Error connecting to Redis " + e.toString());
|
35 |
45807
|
eri.katsar
|
throw new RuntimeException(e);
|
36 |
|
|
}
|
37 |
|
|
|
38 |
|
|
}
|
39 |
|
|
|
40 |
|
|
@Override
|
41 |
|
|
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
|
42 |
|
|
//each item in the list is a block with the given key
|
43 |
|
|
for (Text block : values) {
|
44 |
45878
|
eri.katsar
|
context.getCounter(LINK_RECUDER_COUNTERS.BLOCKS).increment(1);
|
45 |
|
|
try {
|
46 |
|
|
linkRecords(block.toString(), context);
|
47 |
|
|
} catch (Exception e) {
|
48 |
|
|
log.error("Error comparing records" + e.toString());
|
49 |
|
|
throw new RuntimeException(e);
|
50 |
45807
|
eri.katsar
|
}
|
51 |
|
|
}
|
52 |
|
|
}
|
53 |
|
|
|
54 |
|
|
|
55 |
|
|
private void linkRecords(String block, Context context) throws Exception {
|
56 |
|
|
String[] split = block.split(SEPERATOR);
|
57 |
|
|
Map<String, String> sourceRecords = new HashMap<>();
|
58 |
|
|
Map<String, String> targetRecords = new HashMap<>();
|
59 |
45878
|
eri.katsar
|
|
60 |
45807
|
eri.katsar
|
for (String recordId : split) {
|
61 |
45821
|
eri.katsar
|
if (recordId.contains("source_")) {
|
62 |
|
|
sourceRecords.put(recordId, "");
|
63 |
|
|
context.getCounter(LINK_RECUDER_COUNTERS.SOURCE_TRIPLES).increment(1);
|
64 |
|
|
} else {
|
65 |
45878
|
eri.katsar
|
targetRecords.put(recordId, redisUtils.getValue(recordId));
|
66 |
45821
|
eri.katsar
|
context.getCounter(LINK_RECUDER_COUNTERS.TARGET_TRIPLES).increment(1);
|
67 |
|
|
}
|
68 |
|
|
}
|
69 |
|
|
for (String sourceId : sourceRecords.keySet()) {
|
70 |
|
|
String sourceRecord = redisUtils.getValue(sourceId);
|
71 |
|
|
for (String targetId : targetRecords.keySet()) {
|
72 |
45842
|
eri.katsar
|
//store target records to have less I/O ops per record
|
73 |
|
|
//test also version where we get each record again in each loop
|
74 |
|
|
//
|
75 |
|
|
|
76 |
|
|
//TODO
|
77 |
45878
|
eri.katsar
|
//String targetRecord = redisUtils.getValue(targetId);}
|
78 |
45842
|
eri.katsar
|
String targetRecord = targetRecords.get(targetId);
|
79 |
45821
|
eri.katsar
|
double similarity = MyComparator.findMatchingPair(sourceRecord, targetRecord);
|
80 |
|
|
context.getCounter(LINK_RECUDER_COUNTERS.COMPARISONS).increment(1);
|
81 |
|
|
if (similarity >= RECORD_SIMILARITY_THRESHOLD) {
|
82 |
|
|
try {
|
83 |
45878
|
eri.katsar
|
context.write(new Text(sourceId), new Text(targetId + SEPERATOR + similarity));
|
84 |
45821
|
eri.katsar
|
context.getCounter(LINK_RECUDER_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1);
|
85 |
|
|
} catch (Exception ex) {
|
86 |
|
|
log.error("Error while writing records to output : " + ex.toString());
|
87 |
|
|
throw new IOException("Error while writing records to output", ex);
|
88 |
45807
|
eri.katsar
|
}
|
89 |
|
|
}
|
90 |
|
|
}
|
91 |
|
|
}
|
92 |
45821
|
eri.katsar
|
|
93 |
|
|
|
94 |
45807
|
eri.katsar
|
}
|
95 |
|
|
|
96 |
|
|
|
97 |
|
|
@Override
|
98 |
|
|
protected void cleanup(Context context) throws IOException
|
99 |
|
|
, InterruptedException {
|
100 |
|
|
context.getCounter(LINK_RECUDER_COUNTERS.COMPARISONS_PER_BLOCK).setValue(context.getCounter(LINK_RECUDER_COUNTERS.COMPARISONS).getValue() /
|
101 |
|
|
context.getCounter(LINK_RECUDER_COUNTERS.BLOCKS).getValue());
|
102 |
|
|
}
|
103 |
|
|
}
|