1
|
/*
|
2
|
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
|
3
|
|
4
|
import com.lambdaworks.redis.RedisClient;
|
5
|
import com.lambdaworks.redis.RedisConnection;
|
6
|
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.FrequencyCounter;
|
7
|
import org.aksw.limes.core.execution.engine.ExecutionEngine;
|
8
|
import org.aksw.limes.core.execution.engine.ExecutionEngineFactory;
|
9
|
import org.aksw.limes.core.execution.planning.plan.NestedPlan;
|
10
|
import org.aksw.limes.core.execution.planning.planner.ExecutionPlannerFactory;
|
11
|
import org.aksw.limes.core.execution.planning.planner.IPlanner;
|
12
|
import org.aksw.limes.core.execution.rewriter.Rewriter;
|
13
|
import org.aksw.limes.core.execution.rewriter.RewriterFactory;
|
14
|
import org.aksw.limes.core.io.cache.MemoryCache;
|
15
|
import org.aksw.limes.core.io.config.KBInfo;
|
16
|
import org.aksw.limes.core.io.config.reader.xml.XMLConfigurationReader;
|
17
|
import org.aksw.limes.core.io.ls.LinkSpecification;
|
18
|
import org.aksw.limes.core.io.mapping.Mapping;
|
19
|
import org.aksw.limes.core.io.preprocessing.Preprocessor;
|
20
|
import org.apache.hadoop.io.Text;
|
21
|
import org.apache.hadoop.mapreduce.Reducer;
|
22
|
import org.apache.log4j.Logger;
|
23
|
|
24
|
import java.io.IOException;
|
25
|
import java.util.ArrayList;
|
26
|
import java.util.Iterator;
|
27
|
import java.util.List;
|
28
|
|
29
|
public class LimesReducerLimes extends Reducer<Text, Text, Text, Text> {
|
30
|
|
31
|
private Logger log = Logger.getLogger(LimesReducerLimes.class);
|
32
|
private MemoryCache sourceCache, targetCache;
|
33
|
private KBInfo sourceKb, targetKb;
|
34
|
private org.aksw.limes.core.io.config.Configuration config;
|
35
|
private String redisHost;
|
36
|
private Integer redisPort;
|
37
|
private RedisConnection connection;
|
38
|
private RedisClient client;
|
39
|
|
40
|
private long optimalBlockSize;
|
41
|
|
42
|
public static enum LIMES_COUNTERS {
|
43
|
TARGET_TRIPLES,
|
44
|
SOURCE_TRIPLES,
|
45
|
WRITTEN_OUT_ENTITIES,
|
46
|
LINKED_BLOCKS,
|
47
|
TOTAL_BLOCKS,
|
48
|
S_IN,
|
49
|
T_IN,
|
50
|
DISCARDED_BLOCKS
|
51
|
}
|
52
|
|
53
|
|
54
|
@Override
|
55
|
protected void setup(Context context) throws IOException, InterruptedException {
|
56
|
|
57
|
try {
|
58
|
String configXML = context.getConfiguration().get("lod.configXML");
|
59
|
String limesDTD = context.getConfiguration().get("lod.limesDTD");
|
60
|
XMLConfigurationReader reader = new XMLConfigurationReader();
|
61
|
config = reader.read(configXML, limesDTD);
|
62
|
sourceCache = new MemoryCache();
|
63
|
targetCache = new MemoryCache();
|
64
|
sourceKb = config.getSourceInfo();
|
65
|
targetKb = config.getTargetInfo();
|
66
|
redisHost = context.getConfiguration().get("lod.redisHost");
|
67
|
redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
|
68
|
client = RedisClient.create("redis://" + redisHost + ":" + redisPort);
|
69
|
connection = client.connect();
|
70
|
|
71
|
String statsPath = context.getConfiguration().get("lod.statsOutputPath");
|
72
|
FrequencyCounter frequencyCounter = new FrequencyCounter();
|
73
|
this.optimalBlockSize = frequencyCounter.getOptimalBlockSize(statsPath);
|
74
|
//this.optimalBlockSize = 17000;
|
75
|
log.info("OPTIMAL BLOCK SIZE" + optimalBlockSize);
|
76
|
} catch (Exception e) {
|
77
|
log.error("Error computing stats" + e.toString());
|
78
|
throw new RuntimeException(e);
|
79
|
}
|
80
|
|
81
|
}
|
82
|
|
83
|
@Override
|
84
|
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
|
85
|
Iterator<Text> recordsIterator = values.iterator();
|
86
|
//each list ia a block now
|
87
|
context.getCounter(LIMES_COUNTERS.TOTAL_BLOCKS).increment(1);
|
88
|
List<String> records = new ArrayList<>();
|
89
|
int totalRecords = 0;
|
90
|
boolean hasSource = false;
|
91
|
boolean hasTarget = false;
|
92
|
|
93
|
while (recordsIterator.hasNext()) {
|
94
|
String recordId = recordsIterator.next().toString();
|
95
|
if (recordId.contains("source_")) {
|
96
|
hasSource = true;
|
97
|
context.getCounter(LIMES_COUNTERS.S_IN).increment(1);
|
98
|
}
|
99
|
if (recordId.contains("target_")) {
|
100
|
hasTarget = true;
|
101
|
context.getCounter(LIMES_COUNTERS.T_IN).increment(1);
|
102
|
}
|
103
|
records.add(recordId);
|
104
|
totalRecords++;
|
105
|
}
|
106
|
|
107
|
if (totalRecords > 1 && totalRecords < optimalBlockSize && hasSource && hasTarget) {
|
108
|
try {
|
109
|
fillLimesCache(records, context);
|
110
|
linkRecords(context);
|
111
|
} catch (Exception e) {
|
112
|
log.error(e.toString());
|
113
|
throw new IOException(e);
|
114
|
}
|
115
|
context.getCounter(LIMES_COUNTERS.LINKED_BLOCKS).increment(1);
|
116
|
} else {
|
117
|
context.getCounter(LIMES_COUNTERS.DISCARDED_BLOCKS).increment(1);
|
118
|
}
|
119
|
|
120
|
}
|
121
|
|
122
|
|
123
|
private void fillLimesCache(List<String> records, Context context) {
|
124
|
|
125
|
for (String record : records) {
|
126
|
String[] Fields = ((String) connection.get(record)).split(",");
|
127
|
String subject = record.substring(record.indexOf("_") + 1);
|
128
|
String[] split;
|
129
|
for (int j = 0; j < Fields.length; j++) {
|
130
|
split = Fields[j].split("\t");
|
131
|
String field = split[0].replace("\t", "").trim();
|
132
|
if (sourceKb.getProperties().contains(field) || targetKb.getProperties().contains(field)) {
|
133
|
if (record.contains("source_")) {
|
134
|
for (String propertyDub : sourceKb.getFunctions().get(field).keySet()) {
|
135
|
String processedValue = Preprocessor.process(split[1], sourceKb.getFunctions().get(field).get(propertyDub));
|
136
|
sourceCache.addTriple(subject, propertyDub, processedValue);
|
137
|
}
|
138
|
context.getCounter(LIMES_COUNTERS.SOURCE_TRIPLES).increment(1);
|
139
|
} else {
|
140
|
for (String propertyDub : targetKb.getFunctions().get(field).keySet()) {
|
141
|
String processedValue = Preprocessor.process(split[1], targetKb.getFunctions().get(field).get(propertyDub));
|
142
|
targetCache.addTriple(subject, propertyDub, processedValue);
|
143
|
}
|
144
|
context.getCounter(LIMES_COUNTERS.TARGET_TRIPLES).increment(1);
|
145
|
}
|
146
|
}
|
147
|
|
148
|
}
|
149
|
}
|
150
|
records.clear();
|
151
|
}
|
152
|
|
153
|
private void linkRecords(Context context) throws IOException, InterruptedException {
|
154
|
//link specification
|
155
|
Rewriter rw = RewriterFactory.getRewriter("Default");
|
156
|
LinkSpecification ls = new LinkSpecification(config.getMetricExpression(), config.getVerificationThreshold());
|
157
|
LinkSpecification rwLs = rw.rewrite(ls);
|
158
|
//planning
|
159
|
IPlanner planner = ExecutionPlannerFactory.getPlanner(config.getExecutionPlan(), sourceCache, targetCache);
|
160
|
assert planner != null;
|
161
|
NestedPlan plan = planner.plan(rwLs);
|
162
|
ExecutionEngine engine = ExecutionEngineFactory.getEngine("Default", sourceCache, targetCache,
|
163
|
config.getSourceInfo().getVar(), config.getTargetInfo().getVar());
|
164
|
|
165
|
Mapping verificationMapping = engine.execute(plan); //mappings for verification
|
166
|
Mapping acceptanceMapping = verificationMapping.getSubMap(config.getAcceptanceThreshold()); //mappings for acceptance (auta theloume)
|
167
|
//output
|
168
|
for (String source : acceptanceMapping.getMap().keySet()) {//gia kathe source blepoume ta targets
|
169
|
for (String target : acceptanceMapping.getMap().get(source).keySet()) {//gia kathe target blepoume to confidence
|
170
|
context.write(new Text(source.replace("source_", "")), new Text(target.replace("target_", "") + "," + acceptanceMapping.getConfidence(source, target)));
|
171
|
context.getCounter(LIMES_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1);
|
172
|
}
|
173
|
}
|
174
|
|
175
|
}
|
176
|
|
177
|
@Override
|
178
|
protected void cleanup(Context context) throws IOException, InterruptedException {
|
179
|
|
180
|
}
|
181
|
|
182
|
}
|
183
|
*/
|