Project

General

Profile

1
/*
2
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
3

    
4
import com.lambdaworks.redis.RedisClient;
5
import com.lambdaworks.redis.RedisConnection;
6
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.FrequencyCounter;
7
import org.aksw.limes.core.execution.engine.ExecutionEngine;
8
import org.aksw.limes.core.execution.engine.ExecutionEngineFactory;
9
import org.aksw.limes.core.execution.planning.plan.NestedPlan;
10
import org.aksw.limes.core.execution.planning.planner.ExecutionPlannerFactory;
11
import org.aksw.limes.core.execution.planning.planner.IPlanner;
12
import org.aksw.limes.core.execution.rewriter.Rewriter;
13
import org.aksw.limes.core.execution.rewriter.RewriterFactory;
14
import org.aksw.limes.core.io.cache.MemoryCache;
15
import org.aksw.limes.core.io.config.KBInfo;
16
import org.aksw.limes.core.io.config.reader.xml.XMLConfigurationReader;
17
import org.aksw.limes.core.io.ls.LinkSpecification;
18
import org.aksw.limes.core.io.mapping.Mapping;
19
import org.aksw.limes.core.io.preprocessing.Preprocessor;
20
import org.apache.hadoop.io.Text;
21
import org.apache.hadoop.mapreduce.Reducer;
22
import org.apache.log4j.Logger;
23

    
24
import java.io.IOException;
25
import java.util.ArrayList;
26
import java.util.Iterator;
27
import java.util.List;
28

    
29
public class LimesReducerLimes extends Reducer<Text, Text, Text, Text> {
30

    
31
    private Logger log = Logger.getLogger(LimesReducerLimes.class);
32
    private MemoryCache sourceCache, targetCache;
33
    private KBInfo sourceKb, targetKb;
34
    private org.aksw.limes.core.io.config.Configuration config;
35
    private String redisHost;
36
    private Integer redisPort;
37
    private RedisConnection connection;
38
    private RedisClient client;
39

    
40
    private long optimalBlockSize;
41

    
42
    public static enum LIMES_COUNTERS {
43
        TARGET_TRIPLES,
44
        SOURCE_TRIPLES,
45
        WRITTEN_OUT_ENTITIES,
46
        LINKED_BLOCKS,
47
        TOTAL_BLOCKS,
48
        S_IN,
49
        T_IN,
50
        DISCARDED_BLOCKS
51
    }
52

    
53

    
54
    @Override
55
    protected void setup(Context context) throws IOException, InterruptedException {
56

    
57
        try {
58
            String configXML = context.getConfiguration().get("lod.configXML");
59
            String limesDTD = context.getConfiguration().get("lod.limesDTD");
60
            XMLConfigurationReader reader = new XMLConfigurationReader();
61
            config = reader.read(configXML, limesDTD);
62
            sourceCache = new MemoryCache();
63
            targetCache = new MemoryCache();
64
            sourceKb = config.getSourceInfo();
65
            targetKb = config.getTargetInfo();
66
            redisHost = context.getConfiguration().get("lod.redisHost");
67
            redisPort = Integer.parseInt(context.getConfiguration().get("lod.redisPort"));
68
            client = RedisClient.create("redis://" + redisHost + ":" + redisPort);
69
            connection = client.connect();
70

    
71
            String statsPath = context.getConfiguration().get("lod.statsOutputPath");
72
            FrequencyCounter frequencyCounter = new FrequencyCounter();
73
            this.optimalBlockSize = frequencyCounter.getOptimalBlockSize(statsPath);
74
            //this.optimalBlockSize = 17000;
75
            log.info("OPTIMAL BLOCK SIZE" + optimalBlockSize);
76
        } catch (Exception e) {
77
            log.error("Error computing stats" + e.toString());
78
            throw new RuntimeException(e);
79
        }
80

    
81
    }
82

    
83
    @Override
84
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
85
        Iterator<Text> recordsIterator = values.iterator();
86
//each list ia a block now
87
        context.getCounter(LIMES_COUNTERS.TOTAL_BLOCKS).increment(1);
88
        List<String> records = new ArrayList<>();
89
        int totalRecords = 0;
90
        boolean hasSource = false;
91
        boolean hasTarget = false;
92

    
93
        while (recordsIterator.hasNext()) {
94
            String recordId = recordsIterator.next().toString();
95
            if (recordId.contains("source_")) {
96
                hasSource = true;
97
                context.getCounter(LIMES_COUNTERS.S_IN).increment(1);
98
            }
99
            if (recordId.contains("target_")) {
100
                hasTarget = true;
101
                context.getCounter(LIMES_COUNTERS.T_IN).increment(1);
102
            }
103
            records.add(recordId);
104
            totalRecords++;
105
        }
106

    
107
        if (totalRecords > 1 && totalRecords < optimalBlockSize && hasSource && hasTarget) {
108
            try {
109
                fillLimesCache(records, context);
110
                linkRecords(context);
111
            } catch (Exception e) {
112
                log.error(e.toString());
113
                throw new IOException(e);
114
            }
115
            context.getCounter(LIMES_COUNTERS.LINKED_BLOCKS).increment(1);
116
        } else {
117
            context.getCounter(LIMES_COUNTERS.DISCARDED_BLOCKS).increment(1);
118
        }
119

    
120
    }
121

    
122

    
123
    private void fillLimesCache(List<String> records, Context context) {
124

    
125
        for (String record : records) {
126
            String[] Fields = ((String) connection.get(record)).split(",");
127
            String subject = record.substring(record.indexOf("_") + 1);
128
            String[] split;
129
            for (int j = 0; j < Fields.length; j++) {
130
                split = Fields[j].split("\t");
131
                String field = split[0].replace("\t", "").trim();
132
                if (sourceKb.getProperties().contains(field) || targetKb.getProperties().contains(field)) {
133
                    if (record.contains("source_")) {
134
                        for (String propertyDub : sourceKb.getFunctions().get(field).keySet()) {
135
                            String processedValue = Preprocessor.process(split[1], sourceKb.getFunctions().get(field).get(propertyDub));
136
                            sourceCache.addTriple(subject, propertyDub, processedValue);
137
                        }
138
                        context.getCounter(LIMES_COUNTERS.SOURCE_TRIPLES).increment(1);
139
                    } else {
140
                        for (String propertyDub : targetKb.getFunctions().get(field).keySet()) {
141
                            String processedValue = Preprocessor.process(split[1], targetKb.getFunctions().get(field).get(propertyDub));
142
                            targetCache.addTriple(subject, propertyDub, processedValue);
143
                        }
144
                        context.getCounter(LIMES_COUNTERS.TARGET_TRIPLES).increment(1);
145
                    }
146
                }
147

    
148
            }
149
        }
150
        records.clear();
151
        }
152

    
153
    private void linkRecords(Context context) throws IOException, InterruptedException {
154
        //link specification
155
        Rewriter rw = RewriterFactory.getRewriter("Default");
156
        LinkSpecification ls = new LinkSpecification(config.getMetricExpression(), config.getVerificationThreshold());
157
        LinkSpecification rwLs = rw.rewrite(ls);
158
        //planning
159
        IPlanner planner = ExecutionPlannerFactory.getPlanner(config.getExecutionPlan(), sourceCache, targetCache);
160
        assert planner != null;
161
        NestedPlan plan = planner.plan(rwLs);
162
        ExecutionEngine engine = ExecutionEngineFactory.getEngine("Default", sourceCache, targetCache,
163
                config.getSourceInfo().getVar(), config.getTargetInfo().getVar());
164

    
165
        Mapping verificationMapping = engine.execute(plan); //mappings for verification
166
        Mapping acceptanceMapping = verificationMapping.getSubMap(config.getAcceptanceThreshold()); //mappings for acceptance (auta theloume)
167
        //output
168
        for (String source : acceptanceMapping.getMap().keySet()) {//gia kathe source blepoume ta targets
169
            for (String target : acceptanceMapping.getMap().get(source).keySet()) {//gia kathe target blepoume to confidence
170
                context.write(new Text(source.replace("source_", "")), new Text(target.replace("target_", "") + "," + acceptanceMapping.getConfidence(source, target)));
171
                context.getCounter(LIMES_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1);
172
            }
173
        }
174

    
175
    }
176

    
177
    @Override
178
    protected void cleanup(Context context) throws IOException, InterruptedException {
179

    
180
    }
181

    
182
}
183
*/
(2-2/5)