Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
2

    
3
import com.lambdaworks.redis.RedisClient;
4
import com.lambdaworks.redis.RedisConnection;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.FrequencyCounter;
6
import org.aksw.limes.core.execution.engine.ExecutionEngine;
7
import org.aksw.limes.core.execution.engine.ExecutionEngineFactory;
8
import org.aksw.limes.core.execution.planning.plan.NestedPlan;
9
import org.aksw.limes.core.execution.planning.planner.ExecutionPlannerFactory;
10
import org.aksw.limes.core.execution.planning.planner.IPlanner;
11
import org.aksw.limes.core.execution.rewriter.Rewriter;
12
import org.aksw.limes.core.execution.rewriter.RewriterFactory;
13
import org.aksw.limes.core.io.cache.MemoryCache;
14
import org.aksw.limes.core.io.config.KBInfo;
15
import org.aksw.limes.core.io.config.reader.xml.XMLConfigurationReader;
16
import org.aksw.limes.core.io.ls.LinkSpecification;
17
import org.aksw.limes.core.io.mapping.Mapping;
18
import org.aksw.limes.core.io.preprocessing.Preprocessor;
19
import org.apache.hadoop.io.Text;
20
import org.apache.hadoop.mapreduce.Reducer;
21
import org.apache.log4j.Logger;
22

    
23
import java.io.IOException;
24
import java.util.ArrayList;
25
import java.util.Iterator;
26
import java.util.List;
27

    
28
public class LimesReducerLimes extends Reducer<Text, Text, Text, Text> {
29

    
30
    private Logger log = Logger.getLogger(LimesReducerLimes.class);
31
    private MemoryCache sourceCache, targetCache;
32
    private KBInfo sourceKb, targetKb;
33
    private org.aksw.limes.core.io.config.Configuration config;
34
    private String redisHost;
35
    private Integer redisPort;
36
    private RedisConnection connection;
37
    private RedisClient client;
38

    
39
    private long optimalBlockSize;
40

    
41
    public static enum LIMES_COUNTERS {
42
        TARGET_TRIPLES,
43
        SOURCE_TRIPLES,
44
        WRITTEN_OUT_ENTITIES,
45
        LINKED_BLOCKS,
46
        TOTAL_BLOCKS,
47
        S_IN,
48
        T_IN,
49
        DISCARDED_BLOCKS
50
    }
51

    
52

    
53
    @Override
54
    protected void setup(Context context) throws IOException, InterruptedException {
55

    
56
        try {
57
            String configXML = context.getConfiguration().get("lod.configXML");
58
            String limesDTD = context.getConfiguration().get("lod.limesDTD");
59
            XMLConfigurationReader reader = new XMLConfigurationReader();
60
            config = reader.read(configXML, limesDTD);
61
            sourceCache = new MemoryCache();
62
            targetCache = new MemoryCache();
63
            sourceKb = config.getSourceInfo();
64
            targetKb = config.getTargetInfo();
65
            redisHost = context.getConfiguration().get("lod.redisHost");
66
            redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
67
            client = RedisClient.create("redis://" + redisHost + ":" + redisPort);
68
            connection = client.connect();
69

    
70
            String statsPath = context.getConfiguration().get("lod.statsOutputPath");
71
            FrequencyCounter frequencyCounter = new FrequencyCounter();
72
            this.optimalBlockSize = frequencyCounter.getOptimalBlockSize(statsPath);
73
            //this.optimalBlockSize = 17000;
74
            log.info("OPTIMAL BLOCK SIZE" + optimalBlockSize);
75
        } catch (Exception e) {
76
            log.error("Error computing stats" + e.toString());
77
            throw new RuntimeException(e);
78
        }
79

    
80
    }
81

    
82
    @Override
83
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
84
        Iterator<Text> recordsIterator = values.iterator();
85
//each list ia a block now
86
        context.getCounter(LIMES_COUNTERS.TOTAL_BLOCKS).increment(1);
87
        List<String> records = new ArrayList<>();
88
        int totalRecords = 0;
89
        boolean hasSource = false;
90
        boolean hasTarget = false;
91

    
92
        while (recordsIterator.hasNext()) {
93
            String recordId = recordsIterator.next().toString();
94
            if (recordId.contains("source_")) {
95
                hasSource = true;
96
                context.getCounter(LIMES_COUNTERS.S_IN).increment(1);
97
            }
98
            if (recordId.contains("target_")) {
99
                hasTarget = true;
100
                context.getCounter(LIMES_COUNTERS.T_IN).increment(1);
101
            }
102
            records.add(recordId);
103
            totalRecords++;
104
        }
105

    
106
        if (totalRecords > 1 && totalRecords < optimalBlockSize && hasSource && hasTarget) {
107
            try {
108
                fillLimesCache(records, context);
109
                linkRecords(context);
110
            } catch (Exception e) {
111
                log.error(e.toString());
112
                throw new IOException(e);
113
            }
114
            context.getCounter(LIMES_COUNTERS.LINKED_BLOCKS).increment(1);
115
        } else {
116
            context.getCounter(LIMES_COUNTERS.DISCARDED_BLOCKS).increment(1);
117
        }
118

    
119
    }
120

    
121

    
122
    private void fillLimesCache(List<String> records, Context context) {
123

    
124
        for (String record : records) {
125
            String[] Fields = ((String) connection.get(record)).split(",");
126
            String subject = record.substring(record.indexOf("_") + 1);
127
            String[] split;
128
            for (int j = 0; j < Fields.length; j++) {
129
                split = Fields[j].split("\t");
130
                String field = split[0].replace("\t", "").trim();
131
                if (sourceKb.getProperties().contains(field) || targetKb.getProperties().contains(field)) {
132
                    if (record.contains("source_")) {
133
                        for (String propertyDub : sourceKb.getFunctions().get(field).keySet()) {
134
                            String processedValue = Preprocessor.process(split[1], sourceKb.getFunctions().get(field).get(propertyDub));
135
                            sourceCache.addTriple(subject, propertyDub, processedValue);
136
                        }
137
                        context.getCounter(LIMES_COUNTERS.SOURCE_TRIPLES).increment(1);
138
                    } else {
139
                        for (String propertyDub : targetKb.getFunctions().get(field).keySet()) {
140
                            String processedValue = Preprocessor.process(split[1], targetKb.getFunctions().get(field).get(propertyDub));
141
                            targetCache.addTriple(subject, propertyDub, processedValue);
142
                        }
143
                        context.getCounter(LIMES_COUNTERS.TARGET_TRIPLES).increment(1);
144
                    }
145
                }
146

    
147
            }
148
        }
149
        records.clear();
150
        }
151

    
152
    private void linkRecords(Context context) throws IOException, InterruptedException {
153
        //link specification
154
        Rewriter rw = RewriterFactory.getRewriter("Default");
155
        LinkSpecification ls = new LinkSpecification(config.getMetricExpression(), config.getVerificationThreshold());
156
        LinkSpecification rwLs = rw.rewrite(ls);
157
        //planning
158
        IPlanner planner = ExecutionPlannerFactory.getPlanner(config.getExecutionPlan(), sourceCache, targetCache);
159
        assert planner != null;
160
        NestedPlan plan = planner.plan(rwLs);
161
        ExecutionEngine engine = ExecutionEngineFactory.getEngine("Default", sourceCache, targetCache,
162
                config.getSourceInfo().getVar(), config.getTargetInfo().getVar());
163

    
164
        Mapping verificationMapping = engine.execute(plan); //mappings for verification
165
        Mapping acceptanceMapping = verificationMapping.getSubMap(config.getAcceptanceThreshold()); //mappings for acceptance (auta theloume)
166
        //output
167
        for (String source : acceptanceMapping.getMap().keySet()) {//gia kathe source blepoume ta targets
168
            for (String target : acceptanceMapping.getMap().get(source).keySet()) {//gia kathe target blepoume to confidence
169
                context.write(new Text(source.replace("source_", "")), new Text(target.replace("target_", "") + "," + acceptanceMapping.getConfidence(source, target)));
170
                context.getCounter(LIMES_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1);
171
            }
172
        }
173

    
174
    }
175

    
176
    @Override
177
    protected void cleanup(Context context) throws IOException, InterruptedException {
178

    
179
    }
180

    
181
}
(2-2/5)