1
|
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
|
2
|
|
3
|
import com.lambdaworks.redis.RedisClient;
|
4
|
import com.lambdaworks.redis.RedisConnection;
|
5
|
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.FrequencyCounter;
|
6
|
import org.aksw.limes.core.execution.engine.ExecutionEngine;
|
7
|
import org.aksw.limes.core.execution.engine.ExecutionEngineFactory;
|
8
|
import org.aksw.limes.core.execution.planning.plan.NestedPlan;
|
9
|
import org.aksw.limes.core.execution.planning.planner.ExecutionPlannerFactory;
|
10
|
import org.aksw.limes.core.execution.planning.planner.IPlanner;
|
11
|
import org.aksw.limes.core.execution.rewriter.Rewriter;
|
12
|
import org.aksw.limes.core.execution.rewriter.RewriterFactory;
|
13
|
import org.aksw.limes.core.io.cache.MemoryCache;
|
14
|
import org.aksw.limes.core.io.config.KBInfo;
|
15
|
import org.aksw.limes.core.io.config.reader.xml.XMLConfigurationReader;
|
16
|
import org.aksw.limes.core.io.ls.LinkSpecification;
|
17
|
import org.aksw.limes.core.io.mapping.Mapping;
|
18
|
import org.aksw.limes.core.io.preprocessing.Preprocessor;
|
19
|
import org.apache.hadoop.io.Text;
|
20
|
import org.apache.hadoop.mapreduce.Reducer;
|
21
|
import org.apache.log4j.Logger;
|
22
|
|
23
|
import java.io.IOException;
|
24
|
import java.util.ArrayList;
|
25
|
import java.util.Iterator;
|
26
|
import java.util.List;
|
27
|
|
28
|
public class LimesReducerLimes extends Reducer<Text, Text, Text, Text> {
|
29
|
|
30
|
private Logger log = Logger.getLogger(LimesReducerLimes.class);
|
31
|
private MemoryCache sourceCache, targetCache;
|
32
|
private KBInfo sourceKb, targetKb;
|
33
|
private org.aksw.limes.core.io.config.Configuration config;
|
34
|
private String redisHost;
|
35
|
private Integer redisPort;
|
36
|
private RedisConnection connection;
|
37
|
private RedisClient client;
|
38
|
|
39
|
private long optimalBlockSize;
|
40
|
|
41
|
public static enum LIMES_COUNTERS {
|
42
|
TARGET_TRIPLES,
|
43
|
SOURCE_TRIPLES,
|
44
|
WRITTEN_OUT_ENTITIES,
|
45
|
LINKED_BLOCKS,
|
46
|
TOTAL_BLOCKS,
|
47
|
S_IN,
|
48
|
T_IN,
|
49
|
DISCARDED_BLOCKS
|
50
|
}
|
51
|
|
52
|
|
53
|
@Override
|
54
|
protected void setup(Context context) throws IOException, InterruptedException {
|
55
|
|
56
|
try {
|
57
|
String configXML = context.getConfiguration().get("lod.configXML");
|
58
|
String limesDTD = context.getConfiguration().get("lod.limesDTD");
|
59
|
XMLConfigurationReader reader = new XMLConfigurationReader();
|
60
|
config = reader.read(configXML, limesDTD);
|
61
|
sourceCache = new MemoryCache();
|
62
|
targetCache = new MemoryCache();
|
63
|
sourceKb = config.getSourceInfo();
|
64
|
targetKb = config.getTargetInfo();
|
65
|
redisHost = context.getConfiguration().get("lod.redisHost");
|
66
|
redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
|
67
|
client = RedisClient.create("redis://" + redisHost + ":" + redisPort);
|
68
|
connection = client.connect();
|
69
|
|
70
|
String statsPath = context.getConfiguration().get("lod.statsOutputPath");
|
71
|
FrequencyCounter frequencyCounter = new FrequencyCounter();
|
72
|
this.optimalBlockSize = frequencyCounter.getOptimalBlockSize(statsPath);
|
73
|
//this.optimalBlockSize = 17000;
|
74
|
log.info("OPTIMAL BLOCK SIZE" + optimalBlockSize);
|
75
|
} catch (Exception e) {
|
76
|
log.error("Error computing stats" + e.toString());
|
77
|
throw new RuntimeException(e);
|
78
|
}
|
79
|
|
80
|
}
|
81
|
|
82
|
@Override
|
83
|
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
|
84
|
Iterator<Text> recordsIterator = values.iterator();
|
85
|
//each list ia a block now
|
86
|
context.getCounter(LIMES_COUNTERS.TOTAL_BLOCKS).increment(1);
|
87
|
List<String> records = new ArrayList<>();
|
88
|
int totalRecords = 0;
|
89
|
boolean hasSource = false;
|
90
|
boolean hasTarget = false;
|
91
|
|
92
|
while (recordsIterator.hasNext()) {
|
93
|
String recordId = recordsIterator.next().toString();
|
94
|
if (recordId.contains("source_")) {
|
95
|
hasSource = true;
|
96
|
context.getCounter(LIMES_COUNTERS.S_IN).increment(1);
|
97
|
}
|
98
|
if (recordId.contains("target_")) {
|
99
|
hasTarget = true;
|
100
|
context.getCounter(LIMES_COUNTERS.T_IN).increment(1);
|
101
|
}
|
102
|
records.add(recordId);
|
103
|
totalRecords++;
|
104
|
}
|
105
|
|
106
|
if (totalRecords > 1 && totalRecords < optimalBlockSize && hasSource && hasTarget) {
|
107
|
try {
|
108
|
fillLimesCache(records, context);
|
109
|
linkRecords(context);
|
110
|
} catch (Exception e) {
|
111
|
log.error(e.toString());
|
112
|
throw new IOException(e);
|
113
|
}
|
114
|
context.getCounter(LIMES_COUNTERS.LINKED_BLOCKS).increment(1);
|
115
|
} else {
|
116
|
context.getCounter(LIMES_COUNTERS.DISCARDED_BLOCKS).increment(1);
|
117
|
}
|
118
|
|
119
|
}
|
120
|
|
121
|
|
122
|
private void fillLimesCache(List<String> records, Context context) {
|
123
|
|
124
|
for (String record : records) {
|
125
|
String[] Fields = ((String) connection.get(record)).split(",");
|
126
|
String subject = record.substring(record.indexOf("_") + 1);
|
127
|
String[] split;
|
128
|
for (int j = 0; j < Fields.length; j++) {
|
129
|
split = Fields[j].split("\t");
|
130
|
String field = split[0].replace("\t", "").trim();
|
131
|
if (sourceKb.getProperties().contains(field) || targetKb.getProperties().contains(field)) {
|
132
|
if (record.contains("source_")) {
|
133
|
for (String propertyDub : sourceKb.getFunctions().get(field).keySet()) {
|
134
|
String processedValue = Preprocessor.process(split[1], sourceKb.getFunctions().get(field).get(propertyDub));
|
135
|
sourceCache.addTriple(subject, propertyDub, processedValue);
|
136
|
}
|
137
|
context.getCounter(LIMES_COUNTERS.SOURCE_TRIPLES).increment(1);
|
138
|
} else {
|
139
|
for (String propertyDub : targetKb.getFunctions().get(field).keySet()) {
|
140
|
String processedValue = Preprocessor.process(split[1], targetKb.getFunctions().get(field).get(propertyDub));
|
141
|
targetCache.addTriple(subject, propertyDub, processedValue);
|
142
|
}
|
143
|
context.getCounter(LIMES_COUNTERS.TARGET_TRIPLES).increment(1);
|
144
|
}
|
145
|
}
|
146
|
|
147
|
}
|
148
|
}
|
149
|
records.clear();
|
150
|
}
|
151
|
|
152
|
private void linkRecords(Context context) throws IOException, InterruptedException {
|
153
|
//link specification
|
154
|
Rewriter rw = RewriterFactory.getRewriter("Default");
|
155
|
LinkSpecification ls = new LinkSpecification(config.getMetricExpression(), config.getVerificationThreshold());
|
156
|
LinkSpecification rwLs = rw.rewrite(ls);
|
157
|
//planning
|
158
|
IPlanner planner = ExecutionPlannerFactory.getPlanner(config.getExecutionPlan(), sourceCache, targetCache);
|
159
|
assert planner != null;
|
160
|
NestedPlan plan = planner.plan(rwLs);
|
161
|
ExecutionEngine engine = ExecutionEngineFactory.getEngine("Default", sourceCache, targetCache,
|
162
|
config.getSourceInfo().getVar(), config.getTargetInfo().getVar());
|
163
|
|
164
|
Mapping verificationMapping = engine.execute(plan); //mappings for verification
|
165
|
Mapping acceptanceMapping = verificationMapping.getSubMap(config.getAcceptanceThreshold()); //mappings for acceptance (auta theloume)
|
166
|
//output
|
167
|
for (String source : acceptanceMapping.getMap().keySet()) {//gia kathe source blepoume ta targets
|
168
|
for (String target : acceptanceMapping.getMap().get(source).keySet()) {//gia kathe target blepoume to confidence
|
169
|
context.write(new Text(source.replace("source_", "")), new Text(target.replace("target_", "") + "," + acceptanceMapping.getConfidence(source, target)));
|
170
|
context.getCounter(LIMES_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1);
|
171
|
}
|
172
|
}
|
173
|
|
174
|
}
|
175
|
|
176
|
@Override
|
177
|
protected void cleanup(Context context) throws IOException, InterruptedException {
|
178
|
|
179
|
}
|
180
|
|
181
|
}
|