1
|
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
|
2
|
|
3
|
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
|
4
|
import org.aksw.limes.core.execution.engine.ExecutionEngine;
|
5
|
import org.aksw.limes.core.execution.engine.ExecutionEngineFactory;
|
6
|
import org.aksw.limes.core.execution.planning.plan.NestedPlan;
|
7
|
import org.aksw.limes.core.execution.planning.planner.ExecutionPlannerFactory;
|
8
|
import org.aksw.limes.core.execution.planning.planner.IPlanner;
|
9
|
import org.aksw.limes.core.execution.rewriter.Rewriter;
|
10
|
import org.aksw.limes.core.execution.rewriter.RewriterFactory;
|
11
|
import org.aksw.limes.core.io.cache.MemoryCache;
|
12
|
import org.aksw.limes.core.io.config.KBInfo;
|
13
|
import org.aksw.limes.core.io.config.reader.xml.XMLConfigurationReader;
|
14
|
import org.aksw.limes.core.io.ls.LinkSpecification;
|
15
|
import org.aksw.limes.core.io.mapping.Mapping;
|
16
|
import org.aksw.limes.core.io.preprocessing.Preprocessor;
|
17
|
import org.apache.hadoop.io.Text;
|
18
|
import org.apache.hadoop.mapreduce.Reducer;
|
19
|
import org.apache.log4j.Logger;
|
20
|
|
21
|
import java.io.IOException;
|
22
|
|
23
|
public class LimesReducer extends Reducer<Text, Text, Text, Text> {
|
24
|
|
25
|
private Logger log = Logger.getLogger(LimesReducer.class);
|
26
|
private MemoryCache sourceCache, targetCache;
|
27
|
private KBInfo sourceKb, targetKb;
|
28
|
private org.aksw.limes.core.io.config.Configuration config;
|
29
|
private RedisUtils redisUtils;
|
30
|
private int optimalBlockSize;
|
31
|
private static final String OPTIMAL_BLOCKS = "optimalBlockSize";
|
32
|
private static final String SEPERATOR = ",";
|
33
|
private static final String LINE_SEPERATOR = "\t.\t";
|
34
|
private static final String FIELD_DELIM = "\t";
|
35
|
|
36
|
public static enum LIMES_COUNTERS {
|
37
|
TARGET_TRIPLES,
|
38
|
SOURCE_TRIPLES,
|
39
|
WRITTEN_OUT_ENTITIES,
|
40
|
LINKED_BLOCKS,
|
41
|
DISCARDED_BLOCKS
|
42
|
}
|
43
|
|
44
|
|
45
|
@Override
|
46
|
protected void setup(Context context) throws IOException, InterruptedException {
|
47
|
|
48
|
try {
|
49
|
redisUtils = new RedisUtils(context);
|
50
|
String configXML = context.getConfiguration().get("lod.configXML");
|
51
|
String limesDTD = context.getConfiguration().get("lod.limesDTD");
|
52
|
XMLConfigurationReader reader = new XMLConfigurationReader();
|
53
|
config = reader.read(configXML, limesDTD);
|
54
|
sourceKb = config.getSourceInfo();
|
55
|
targetKb = config.getTargetInfo();
|
56
|
optimalBlockSize = Integer.valueOf(redisUtils.getValue(OPTIMAL_BLOCKS));
|
57
|
log.info("OPTIMAL BLOCK SIZE " + optimalBlockSize);
|
58
|
// optimalBlockSize = 1700;
|
59
|
log.info("sourceKB" + sourceKb.getProperties());
|
60
|
log.info("targetKB" + targetKb.getProperties());
|
61
|
|
62
|
} catch (Exception e) {
|
63
|
log.error("Error computing stats" + e.toString());
|
64
|
throw new RuntimeException(e);
|
65
|
}
|
66
|
|
67
|
}
|
68
|
|
69
|
@Override
|
70
|
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
|
71
|
//each item in the list is a block with the given key
|
72
|
|
73
|
for (Text block : values) {
|
74
|
try {
|
75
|
context.getCounter(LIMES_COUNTERS.LINKED_BLOCKS).increment(1);
|
76
|
sourceCache = new MemoryCache();
|
77
|
targetCache = new MemoryCache();
|
78
|
fillLimesCache(block, context);
|
79
|
linkRecords(context);
|
80
|
} catch (Exception e) {
|
81
|
log.error(e.toString(), e);
|
82
|
throw new IOException(e);
|
83
|
}
|
84
|
|
85
|
}
|
86
|
}
|
87
|
|
88
|
|
89
|
private void fillLimesCache(Text block, Context context) throws Exception {
|
90
|
|
91
|
String[] split = block.toString().split(SEPERATOR);
|
92
|
for (String recordId : split) {
|
93
|
String record = redisUtils.getValue(recordId);
|
94
|
if (record == null) {
|
95
|
log.error("Record " + recordId + " not found! ");
|
96
|
throw new Exception("Record " + recordId + " not found! ");
|
97
|
|
98
|
}
|
99
|
record = record.substring(record.indexOf(FIELD_DELIM) + 1).trim();
|
100
|
String[] Triples = record.split(LINE_SEPERATOR);
|
101
|
for (String triple : Triples) {
|
102
|
String[] Fields = triple.split(FIELD_DELIM);
|
103
|
String subject = Fields[0];
|
104
|
//props in sourceKB do not contain <> ,so we need to replace them.
|
105
|
String property = Fields[1].replaceAll("[<>]", "");
|
106
|
String value = Fields[2];
|
107
|
if (sourceKb.getProperties().contains(property) || targetKb.getProperties().contains(property)) {
|
108
|
if (recordId.contains("source_")) {
|
109
|
for (String propertyDub : sourceKb.getFunctions().get(property).keySet()) {
|
110
|
String processedValue = Preprocessor.process(value, sourceKb.getFunctions().get(property).get(propertyDub));
|
111
|
sourceCache.addTriple(subject, propertyDub, processedValue);
|
112
|
}
|
113
|
context.getCounter(LIMES_COUNTERS.SOURCE_TRIPLES).increment(1);
|
114
|
} else {
|
115
|
for (String propertyDub : targetKb.getFunctions().get(property).keySet()) {
|
116
|
String processedValue = Preprocessor.process(value, targetKb.getFunctions().get(property).get(propertyDub));
|
117
|
targetCache.addTriple(subject, propertyDub, processedValue);
|
118
|
}
|
119
|
context.getCounter(LIMES_COUNTERS.TARGET_TRIPLES).increment(1);
|
120
|
}
|
121
|
|
122
|
}
|
123
|
}
|
124
|
|
125
|
}
|
126
|
}
|
127
|
|
128
|
private void linkRecords(Context context) throws IOException, InterruptedException {
|
129
|
//link specification
|
130
|
Rewriter rw = RewriterFactory.getRewriter("Default");
|
131
|
LinkSpecification ls = new LinkSpecification(config.getMetricExpression(), config.getVerificationThreshold());
|
132
|
LinkSpecification rwLs = rw.rewrite(ls);
|
133
|
//planning
|
134
|
IPlanner planner = ExecutionPlannerFactory.getPlanner(config.getExecutionPlan(), sourceCache, targetCache);
|
135
|
assert planner != null;
|
136
|
NestedPlan plan = planner.plan(rwLs);
|
137
|
ExecutionEngine engine = ExecutionEngineFactory.getEngine("Default", sourceCache, targetCache,
|
138
|
config.getSourceInfo().getVar(), config.getTargetInfo().getVar());
|
139
|
|
140
|
Mapping verificationMapping = engine.execute(plan); //mappings for verification
|
141
|
Mapping acceptanceMapping = verificationMapping.getSubMap(config.getAcceptanceThreshold()); //mappings for acceptance (auta theloume)
|
142
|
//output
|
143
|
for (String source : acceptanceMapping.getMap().keySet()) {//gia kathe source blepoume ta targets
|
144
|
for (String target : acceptanceMapping.getMap().get(source).keySet()) {//gia kathe target blepoume to confidence
|
145
|
context.write(new Text(source), new Text(target + "," + acceptanceMapping.getConfidence(source, target)));
|
146
|
context.getCounter(LIMES_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1);
|
147
|
}
|
148
|
}
|
149
|
}
|
150
|
|
151
|
@Override
|
152
|
protected void cleanup(Context context) throws IOException, InterruptedException {
|
153
|
|
154
|
}
|
155
|
|
156
|
}
|