1
|
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
|
2
|
|
3
|
import org.aksw.limes.core.execution.engine.ExecutionEngine;
|
4
|
import org.aksw.limes.core.execution.engine.ExecutionEngineFactory;
|
5
|
import org.aksw.limes.core.execution.planning.plan.NestedPlan;
|
6
|
import org.aksw.limes.core.execution.planning.planner.ExecutionPlannerFactory;
|
7
|
import org.aksw.limes.core.execution.planning.planner.IPlanner;
|
8
|
import org.aksw.limes.core.execution.rewriter.Rewriter;
|
9
|
import org.aksw.limes.core.execution.rewriter.RewriterFactory;
|
10
|
import org.aksw.limes.core.io.cache.MemoryCache;
|
11
|
import org.aksw.limes.core.io.config.KBInfo;
|
12
|
import org.aksw.limes.core.io.config.reader.xml.XMLConfigurationReader;
|
13
|
import org.aksw.limes.core.io.ls.LinkSpecification;
|
14
|
import org.aksw.limes.core.io.mapping.Mapping;
|
15
|
import org.aksw.limes.core.io.preprocessing.Preprocessor;
|
16
|
import org.apache.hadoop.conf.Configuration;
|
17
|
import org.apache.hadoop.fs.FileSystem;
|
18
|
import org.apache.hadoop.fs.Path;
|
19
|
import org.apache.hadoop.io.Text;
|
20
|
import org.apache.hadoop.mapreduce.Reducer;
|
21
|
import org.apache.log4j.Logger;
|
22
|
import redis.clients.jedis.Jedis;
|
23
|
import redis.clients.jedis.JedisPool;
|
24
|
import redis.clients.jedis.exceptions.JedisException;
|
25
|
|
26
|
import java.io.BufferedReader;
|
27
|
import java.io.IOException;
|
28
|
import java.io.InputStreamReader;
|
29
|
import java.util.Iterator;
|
30
|
|
31
|
public class LimesReducer extends Reducer<Text, Text, Text, Text> {
|
32
|
|
33
|
private Logger log = Logger.getLogger(LimesReducer.class);
|
34
|
//address of your redis server
|
35
|
private String redisHost;
|
36
|
private Integer redisPort;
|
37
|
|
38
|
private org.aksw.limes.core.io.config.Configuration config ;
|
39
|
//the jedis connection pool..
|
40
|
private static JedisPool pool = null;
|
41
|
|
42
|
public static enum ENTITIES_COUNTER {
|
43
|
TARGET_ENTITIES,
|
44
|
SOURCE_ENTITIES,
|
45
|
WRITTEN_OUT_ENTITIES
|
46
|
}
|
47
|
|
48
|
|
49
|
@Override
|
50
|
protected void setup(Context context) throws IOException, InterruptedException {
|
51
|
redisHost = context.getConfiguration().get("lod.redisHost");
|
52
|
redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
|
53
|
|
54
|
pool = new JedisPool(redisHost, redisPort);
|
55
|
|
56
|
String configXML = context.getConfiguration().get("lod.configXML");
|
57
|
String limesDTD = context.getConfiguration().get("lod.limesDTD");
|
58
|
|
59
|
|
60
|
XMLConfigurationReader reader = new XMLConfigurationReader();
|
61
|
|
62
|
|
63
|
|
64
|
config = reader.read(configXML, limesDTD);
|
65
|
|
66
|
System.out.println("Lod config" + config.toString());
|
67
|
|
68
|
|
69
|
}
|
70
|
|
71
|
|
72
|
@Override
|
73
|
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
|
74
|
|
75
|
|
76
|
MemoryCache sourceCache = new MemoryCache();
|
77
|
MemoryCache targetCache = new MemoryCache();
|
78
|
KBInfo sourceKb = config.getSourceInfo();
|
79
|
KBInfo targetKb = config.getTargetInfo();
|
80
|
|
81
|
System.out.println("sourceKb " + config.getSourceInfo());
|
82
|
|
83
|
System.out.println("sourceKb func" + config.getSourceInfo().getFunctions());
|
84
|
|
85
|
System.out.println("targetKb func" + config.getTargetInfo());
|
86
|
|
87
|
System.out.println("sourceKb func" + config.getTargetInfo().getFunctions());
|
88
|
|
89
|
|
90
|
|
91
|
Iterator<Text> it = values.iterator();
|
92
|
|
93
|
|
94
|
String[] recordIds = key.toString().split(",");
|
95
|
|
96
|
|
97
|
|
98
|
//System.out.println("recordIds" + key);
|
99
|
try {
|
100
|
//each list is a block
|
101
|
|
102
|
//while (it.hasNext())
|
103
|
for (int i = 1; i < recordIds.length; i++)
|
104
|
|
105
|
{
|
106
|
|
107
|
|
108
|
|
109
|
String recordId = recordIds[i];
|
110
|
//System.out.println("recordId " +recordId);
|
111
|
|
112
|
|
113
|
|
114
|
|
115
|
String[] Fields = getFromRedis(recordId).split(",");
|
116
|
|
117
|
String subject = recordId.substring(recordId.indexOf("_") + 1);
|
118
|
|
119
|
|
120
|
for (int j = 0; j < Fields.length; j++) {
|
121
|
|
122
|
String[] split = Fields[j].split("\t");
|
123
|
if(sourceKb.getProperties().contains(split[0])){
|
124
|
if (recordId.startsWith("source_")) {
|
125
|
for(String propertyDub : sourceKb.getFunctions().get(split[0]).keySet()) {
|
126
|
String processedValue = Preprocessor.process(split[1], sourceKb.getFunctions().get(split[0]).get(propertyDub));
|
127
|
sourceCache.addTriple(subject, propertyDub, processedValue);
|
128
|
}
|
129
|
|
130
|
//System.out.println(subject + "," + split[0] + "," + split[1]);
|
131
|
//log.info(subject + "," + split[0] + "," + split[1]);
|
132
|
context.getCounter(ENTITIES_COUNTER.SOURCE_ENTITIES).increment(1);
|
133
|
} else {
|
134
|
|
135
|
for(String propertyDub : targetKb.getFunctions().get(split[0]).keySet()) {
|
136
|
String processedValue = Preprocessor.process(split[1], targetKb.getFunctions().get(split[0]).get(propertyDub));
|
137
|
targetCache.addTriple(subject, propertyDub, processedValue);
|
138
|
}
|
139
|
context.getCounter(ENTITIES_COUNTER.TARGET_ENTITIES).increment(1);
|
140
|
}
|
141
|
}
|
142
|
// System.out.println(subject + "," + split[0] + "," + split[1]);
|
143
|
//context.write(new Text(subject), new Text(split[0] + "," + split[1]));
|
144
|
|
145
|
}
|
146
|
|
147
|
|
148
|
}
|
149
|
|
150
|
|
151
|
|
152
|
//link specification
|
153
|
Rewriter rw = RewriterFactory.getRewriter("Default");
|
154
|
LinkSpecification ls = new LinkSpecification(config.getMetricExpression(), config.getVerificationThreshold());
|
155
|
LinkSpecification rwLs = rw.rewrite(ls);
|
156
|
|
157
|
//planning
|
158
|
IPlanner planner = ExecutionPlannerFactory.getPlanner(config.getExecutionPlan(), sourceCache, targetCache);
|
159
|
assert planner != null;
|
160
|
NestedPlan plan = planner.plan(rwLs);
|
161
|
|
162
|
ExecutionEngine engine = ExecutionEngineFactory.getEngine("Default", sourceCache, targetCache, config.getSourceInfo().getVar(), config.getTargetInfo().getVar());
|
163
|
|
164
|
Mapping verificationMapping = engine.execute(plan); //mappings for verification
|
165
|
Mapping acceptanceMapping = verificationMapping.getSubMap(config.getAcceptanceThreshold()); //mappings for acceptance (auta theloume)
|
166
|
|
167
|
//output
|
168
|
for (String source : acceptanceMapping.getMap().keySet()) {//gia kathe source blepoume ta targets
|
169
|
for (String target : acceptanceMapping.getMap().get(source).keySet()) {//gia kathe target blepoume to confidence
|
170
|
//System.out.println(" ouput "+ source + " " + acceptanceMapping.getConfidence(source, target) + " " + target);
|
171
|
|
172
|
|
173
|
context.write(new Text(source), new Text(target + "," + acceptanceMapping.getConfidence(source, target)));
|
174
|
//mporeis na grapseis source-target se osous syndiasmous vgei! Edw sou deixnw pws grafeis kai confidence mesa gia na kseroume
|
175
|
//ayta apo olous tous reducers kalo 8a itan na grafonte se ena arxeio..LinkSet 8a to onomazoume kai 8a exei ola ta links pou 8a vgazei to limes
|
176
|
context.getCounter(ENTITIES_COUNTER.WRITTEN_OUT_ENTITIES).increment(1);
|
177
|
|
178
|
}
|
179
|
}
|
180
|
|
181
|
|
182
|
} catch (Exception e) {
|
183
|
log.error(e.toString(), e);
|
184
|
//throw new InterruptedException(e.toString());
|
185
|
}
|
186
|
|
187
|
}
|
188
|
|
189
|
private String getFromRedis(String key) throws Exception {
|
190
|
//get a jedis connection jedis connection pool
|
191
|
|
192
|
Jedis jedis = pool.getResource();
|
193
|
try {
|
194
|
|
195
|
Iterator iterator = jedis.smembers(key).iterator();
|
196
|
if (iterator.hasNext()) {
|
197
|
String fields = (String) iterator.next();
|
198
|
//System.out.println("Redis fields " + fields);
|
199
|
return fields;
|
200
|
}
|
201
|
|
202
|
return "";
|
203
|
} catch (JedisException e) {
|
204
|
|
205
|
log.error("Error writing entity to Redis", e);
|
206
|
if (null != jedis) {
|
207
|
pool.returnBrokenResource(jedis);
|
208
|
jedis = null;
|
209
|
}
|
210
|
throw new RuntimeException("Error writing to Redis ", e);
|
211
|
} finally
|
212
|
|
213
|
{
|
214
|
if (null != jedis) pool.returnResource(jedis);
|
215
|
}
|
216
|
|
217
|
}
|
218
|
|
219
|
private String readFile(String path) throws IOException {
|
220
|
|
221
|
|
222
|
FileSystem fs = FileSystem.get(new Configuration());
|
223
|
Path pt = new Path(path);
|
224
|
BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
|
225
|
|
226
|
String line = br.readLine();
|
227
|
|
228
|
while (line != null) {
|
229
|
line += br.readLine();
|
230
|
}
|
231
|
|
232
|
br.close();
|
233
|
return line;
|
234
|
}
|
235
|
|
236
|
@Override
|
237
|
protected void cleanup(Context context) throws IOException, InterruptedException {
|
238
|
log.info("Cleaning up reducer...");
|
239
|
|
240
|
|
241
|
}
|
242
|
|
243
|
public String getRedisHost() {
|
244
|
return redisHost;
|
245
|
}
|
246
|
|
247
|
public void setRedisHost(String redisHost) {
|
248
|
this.redisHost = redisHost;
|
249
|
}
|
250
|
|
251
|
public Integer getRedisPort() {
|
252
|
return redisPort;
|
253
|
}
|
254
|
|
255
|
public void setRedisPort(Integer redisPort) {
|
256
|
this.redisPort = redisPort;
|
257
|
}
|
258
|
|
259
|
public static JedisPool getPool() {
|
260
|
return pool;
|
261
|
}
|
262
|
|
263
|
public static void setPool(JedisPool pool) {
|
264
|
LimesReducer.pool = pool;
|
265
|
}
|
266
|
}
|