Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
2

    
3
import org.aksw.limes.core.execution.engine.ExecutionEngine;
4
import org.aksw.limes.core.execution.engine.ExecutionEngineFactory;
5
import org.aksw.limes.core.execution.planning.plan.NestedPlan;
6
import org.aksw.limes.core.execution.planning.planner.ExecutionPlannerFactory;
7
import org.aksw.limes.core.execution.planning.planner.IPlanner;
8
import org.aksw.limes.core.execution.rewriter.Rewriter;
9
import org.aksw.limes.core.execution.rewriter.RewriterFactory;
10
import org.aksw.limes.core.io.cache.MemoryCache;
11
import org.aksw.limes.core.io.config.KBInfo;
12
import org.aksw.limes.core.io.config.reader.xml.XMLConfigurationReader;
13
import org.aksw.limes.core.io.ls.LinkSpecification;
14
import org.aksw.limes.core.io.mapping.Mapping;
15
import org.aksw.limes.core.io.preprocessing.Preprocessor;
16
import org.apache.hadoop.conf.Configuration;
17
import org.apache.hadoop.fs.FileSystem;
18
import org.apache.hadoop.fs.Path;
19
import org.apache.hadoop.io.Text;
20
import org.apache.hadoop.mapreduce.Reducer;
21
import org.apache.log4j.Logger;
22
import redis.clients.jedis.Jedis;
23
import redis.clients.jedis.JedisPool;
24
import redis.clients.jedis.exceptions.JedisException;
25

    
26
import java.io.BufferedReader;
27
import java.io.IOException;
28
import java.io.InputStreamReader;
29
import java.util.Iterator;
30

    
31
public class LimesReducer extends Reducer<Text, Text, Text, Text> {
32

    
33
	private Logger log = Logger.getLogger(LimesReducer.class);
34
	//address of your redis server
35
	private String redisHost;
36
	private Integer redisPort;
37

    
38
	private org.aksw.limes.core.io.config.Configuration config ;
39
	//the jedis connection pool..
40
	private static JedisPool pool = null;
41

    
42
	public static enum ENTITIES_COUNTER {
43
		TARGET_ENTITIES,
44
		SOURCE_ENTITIES,
45
		WRITTEN_OUT_ENTITIES
46
	}
47

    
48

    
49
	@Override
50
	protected void setup(Context context) throws IOException, InterruptedException {
51
		redisHost = context.getConfiguration().get("lod.redisHost");
52
		redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
53

    
54
		pool = new JedisPool(redisHost, redisPort);
55

    
56
		String configXML = context.getConfiguration().get("lod.configXML");
57
		String limesDTD = context.getConfiguration().get("lod.limesDTD");
58

    
59

    
60
		XMLConfigurationReader reader = new XMLConfigurationReader();
61

    
62

    
63

    
64
		config = reader.read(configXML, limesDTD);
65

    
66
		System.out.println("Lod config" + config.toString());
67

    
68

    
69
	}
70

    
71

    
72
	@Override
73
	protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
74

    
75

    
76
		MemoryCache sourceCache = new MemoryCache();
77
		MemoryCache targetCache = new MemoryCache();
78
		KBInfo sourceKb = config.getSourceInfo();
79
		KBInfo targetKb = config.getTargetInfo();
80

    
81
		System.out.println("sourceKb " + config.getSourceInfo());
82

    
83
		System.out.println("sourceKb  func" + config.getSourceInfo().getFunctions());
84

    
85
		System.out.println("targetKb  func" + config.getTargetInfo());
86

    
87
		System.out.println("sourceKb  func" + config.getTargetInfo().getFunctions());
88

    
89

    
90

    
91
		Iterator<Text> it = values.iterator();
92

    
93

    
94
		String[] recordIds = key.toString().split(",");
95

    
96

    
97

    
98
		//System.out.println("recordIds" + key);
99
		try {
100
			//each list is a block
101

    
102
			//while (it.hasNext())
103
			for (int i = 1; i < recordIds.length; i++)
104

    
105
			{
106

    
107

    
108

    
109
				String recordId = recordIds[i];
110
				//System.out.println("recordId " +recordId);
111

    
112

    
113

    
114

    
115
					String[] Fields = getFromRedis(recordId).split(",");
116

    
117
					String subject = recordId.substring(recordId.indexOf("_") + 1);
118

    
119

    
120
					for (int j = 0; j < Fields.length; j++) {
121

    
122
						String[] split = Fields[j].split("\t");
123
					if(sourceKb.getProperties().contains(split[0])){
124
						if (recordId.startsWith("source_")) {
125
							for(String propertyDub : sourceKb.getFunctions().get(split[0]).keySet()) {
126
								String processedValue = Preprocessor.process(split[1], sourceKb.getFunctions().get(split[0]).get(propertyDub));
127
								sourceCache.addTriple(subject, propertyDub, processedValue);
128
							}
129

    
130
							//System.out.println(subject + "," + split[0] + "," + split[1]);
131
							//log.info(subject + "," + split[0] + "," + split[1]);
132
							context.getCounter(ENTITIES_COUNTER.SOURCE_ENTITIES).increment(1);
133
						} else {
134
							
135
							for(String propertyDub : targetKb.getFunctions().get(split[0]).keySet()) {
136
									String processedValue = Preprocessor.process(split[1], targetKb.getFunctions().get(split[0]).get(propertyDub));
137
									targetCache.addTriple(subject, propertyDub, processedValue);
138
								}
139
							context.getCounter(ENTITIES_COUNTER.TARGET_ENTITIES).increment(1);
140
						}
141
					}
142
						//		System.out.println(subject + "," + split[0] + "," + split[1]);
143
						//context.write(new Text(subject), new Text(split[0] + "," + split[1]));
144

    
145
					}
146

    
147

    
148
			}
149

    
150

    
151

    
152
			//link specification
153
			Rewriter rw = RewriterFactory.getRewriter("Default");
154
			LinkSpecification ls = new LinkSpecification(config.getMetricExpression(), config.getVerificationThreshold());
155
			LinkSpecification rwLs = rw.rewrite(ls);
156

    
157
			//planning
158
			IPlanner planner = ExecutionPlannerFactory.getPlanner(config.getExecutionPlan(), sourceCache, targetCache);
159
			assert planner != null;
160
			NestedPlan plan = planner.plan(rwLs);
161

    
162
			ExecutionEngine engine = ExecutionEngineFactory.getEngine("Default", sourceCache, targetCache, config.getSourceInfo().getVar(), config.getTargetInfo().getVar());
163

    
164
			Mapping verificationMapping = engine.execute(plan); //mappings for verification
165
			Mapping acceptanceMapping = verificationMapping.getSubMap(config.getAcceptanceThreshold()); //mappings for acceptance (auta theloume)
166

    
167
			//output
168
			for (String source : acceptanceMapping.getMap().keySet()) {//gia kathe source blepoume ta targets
169
				for (String target : acceptanceMapping.getMap().get(source).keySet()) {//gia kathe target blepoume to confidence
170
					//System.out.println(" ouput "+ source + " " + acceptanceMapping.getConfidence(source, target) + " " + target);
171

    
172

    
173
					context.write(new Text(source), new Text(target + "," + acceptanceMapping.getConfidence(source, target)));
174
					//mporeis na grapseis source-target se osous syndiasmous vgei! Edw sou deixnw pws grafeis kai confidence mesa gia na kseroume
175
					//ayta apo olous tous reducers kalo 8a itan na grafonte se ena arxeio..LinkSet 8a to onomazoume kai 8a exei ola ta links pou 8a vgazei to limes
176
					context.getCounter(ENTITIES_COUNTER.WRITTEN_OUT_ENTITIES).increment(1);
177

    
178
				}
179
			}
180

    
181

    
182
		} catch (Exception e) {
183
			log.error(e.toString(), e);
184
			//throw new InterruptedException(e.toString());
185
		}
186

    
187
	}
188

    
189
	private String getFromRedis(String key) throws Exception {
190
		//get a jedis connection jedis connection pool
191

    
192
		Jedis jedis = pool.getResource();
193
		try {
194

    
195
			Iterator iterator = jedis.smembers(key).iterator();
196
			if (iterator.hasNext()) {
197
				String fields = (String) iterator.next();
198
				//System.out.println("Redis fields " + fields);
199
				return fields;
200
			}
201

    
202
			return "";
203
		} catch (JedisException e) {
204

    
205
			log.error("Error writing entity to Redis", e);
206
			if (null != jedis) {
207
				pool.returnBrokenResource(jedis);
208
				jedis = null;
209
			}
210
			throw new RuntimeException("Error writing to Redis  ", e);
211
		} finally
212

    
213
		{
214
			if (null != jedis) pool.returnResource(jedis);
215
		}
216

    
217
	}
218

    
219
	private String readFile(String path) throws IOException {
220

    
221

    
222
		FileSystem fs = FileSystem.get(new Configuration());
223
		Path pt = new Path(path);
224
		BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(pt)));
225

    
226
		String line = br.readLine();
227

    
228
		while (line != null) {
229
			line += br.readLine();
230
		}
231

    
232
		br.close();
233
		return line;
234
	}
235

    
236
	@Override
237
	protected void cleanup(Context context) throws IOException, InterruptedException {
238
		log.info("Cleaning up reducer...");
239

    
240

    
241
	}
242

    
243
	public String getRedisHost() {
244
		return redisHost;
245
	}
246

    
247
	public void setRedisHost(String redisHost) {
248
		this.redisHost = redisHost;
249
	}
250

    
251
	public Integer getRedisPort() {
252
		return redisPort;
253
	}
254

    
255
	public void setRedisPort(Integer redisPort) {
256
		this.redisPort = redisPort;
257
	}
258

    
259
	public static JedisPool getPool() {
260
		return pool;
261
	}
262

    
263
	public static void setPool(JedisPool pool) {
264
		LimesReducer.pool = pool;
265
	}
266
}
(1-1/2)