Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.linkage;
2

    
3
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.caching.RedisUtils;
4
import org.aksw.limes.core.execution.engine.ExecutionEngine;
5
import org.aksw.limes.core.execution.engine.ExecutionEngineFactory;
6
import org.aksw.limes.core.execution.planning.plan.NestedPlan;
7
import org.aksw.limes.core.execution.planning.planner.ExecutionPlannerFactory;
8
import org.aksw.limes.core.execution.planning.planner.IPlanner;
9
import org.aksw.limes.core.execution.rewriter.Rewriter;
10
import org.aksw.limes.core.execution.rewriter.RewriterFactory;
11
import org.aksw.limes.core.io.cache.MemoryCache;
12
import org.aksw.limes.core.io.config.KBInfo;
13
import org.aksw.limes.core.io.config.reader.xml.XMLConfigurationReader;
14
import org.aksw.limes.core.io.ls.LinkSpecification;
15
import org.aksw.limes.core.io.mapping.Mapping;
16
import org.aksw.limes.core.io.preprocessing.Preprocessor;
17
import org.apache.hadoop.io.Text;
18
import org.apache.hadoop.mapreduce.Reducer;
19
import org.apache.log4j.Logger;
20

    
21
import java.io.IOException;
22

    
23
public class LimesReducer extends Reducer<Text, Text, Text, Text> {
24

    
25
    private Logger log = Logger.getLogger(LimesReducer.class);
26
    private MemoryCache sourceCache, targetCache;
27
    private KBInfo sourceKb, targetKb;
28
    private org.aksw.limes.core.io.config.Configuration config;
29
    private RedisUtils redisUtils;
30
    private int optimalBlockSize;
31
    private static final String OPTIMAL_BLOCKS = "optimalBlockSize";
32
    private static final String SEPERATOR = ",";
33
    private static final String LINE_SEPERATOR = "\t.\t";
34
    private static final String FIELD_DELIM = "\t";
35

    
36
    public static enum LIMES_COUNTERS {
37
        TARGET_TRIPLES,
38
        SOURCE_TRIPLES,
39
        WRITTEN_OUT_ENTITIES,
40
        LINKED_BLOCKS,
41
        DISCARDED_BLOCKS
42
    }
43

    
44

    
45
    @Override
46
    protected void setup(Context context) throws IOException, InterruptedException {
47

    
48
        try {
49
            redisUtils = new RedisUtils(context);
50
            String configXML = context.getConfiguration().get("lod.configXML");
51
            String limesDTD = context.getConfiguration().get("lod.limesDTD");
52
            XMLConfigurationReader reader = new XMLConfigurationReader();
53
            config = reader.read(configXML, limesDTD);
54
            sourceKb = config.getSourceInfo();
55
            targetKb = config.getTargetInfo();
56
            optimalBlockSize = Integer.valueOf(redisUtils.getValue(OPTIMAL_BLOCKS));
57
            log.info("OPTIMAL BLOCK SIZE " + optimalBlockSize);
58
            //    optimalBlockSize = 1700;
59
            log.info("sourceKB" + sourceKb.getProperties());
60
            log.info("targetKB" + targetKb.getProperties());
61

    
62
        } catch (Exception e) {
63
            log.error("Error computing stats" + e.toString());
64
            throw new RuntimeException(e);
65
        }
66

    
67
    }
68

    
69
    @Override
70
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
71
        //each item in the list is a block with the given key
72

    
73
        for (Text block : values) {
74
            try {
75
                context.getCounter(LIMES_COUNTERS.LINKED_BLOCKS).increment(1);
76
                sourceCache = new MemoryCache();
77
                targetCache = new MemoryCache();
78
                fillLimesCache(block, context);
79
                linkRecords(context);
80
            } catch (Exception e) {
81
                log.error(e.toString(), e);
82
                throw new IOException(e);
83
            }
84

    
85
        }
86
    }
87

    
88

    
89
    private void fillLimesCache(Text block, Context context) throws Exception {
90

    
91
        String[] split = block.toString().split(SEPERATOR);
92
        for (String recordId : split) {
93
            String record = redisUtils.getValue(recordId);
94
            if (record == null) {
95
                log.error("Record " + recordId + " not found! ");
96
                throw new Exception("Record " + recordId + " not found! ");
97

    
98
            }
99
            record = record.substring(record.indexOf(FIELD_DELIM) + 1).trim();
100
            String[] Triples = record.split(LINE_SEPERATOR);
101
            for (String triple : Triples) {
102
                String[] Fields = triple.split(FIELD_DELIM);
103
                String subject = Fields[0];
104
                //props in sourceKB do not contain <> ,so we need to replace them.
105
                String property = Fields[1].replaceAll("[<>]", "");
106
                String value = Fields[2];
107
                if (sourceKb.getProperties().contains(property) || targetKb.getProperties().contains(property)) {
108
                    if (recordId.contains("source_")) {
109
                        for (String propertyDub : sourceKb.getFunctions().get(property).keySet()) {
110
                            String processedValue = Preprocessor.process(value, sourceKb.getFunctions().get(property).get(propertyDub));
111
                            sourceCache.addTriple(subject, propertyDub, processedValue);
112
                        }
113
                        context.getCounter(LIMES_COUNTERS.SOURCE_TRIPLES).increment(1);
114
                    } else {
115
                        for (String propertyDub : targetKb.getFunctions().get(property).keySet()) {
116
                            String processedValue = Preprocessor.process(value, targetKb.getFunctions().get(property).get(propertyDub));
117
                            targetCache.addTriple(subject, propertyDub, processedValue);
118
                        }
119
                        context.getCounter(LIMES_COUNTERS.TARGET_TRIPLES).increment(1);
120
                    }
121

    
122
                }
123
            }
124

    
125
        }
126
    }
127

    
128
    private void linkRecords(Context context) throws IOException, InterruptedException {
129
        //link specification
130
        Rewriter rw = RewriterFactory.getRewriter("Default");
131
        LinkSpecification ls = new LinkSpecification(config.getMetricExpression(), config.getVerificationThreshold());
132
        LinkSpecification rwLs = rw.rewrite(ls);
133
        //planning
134
        IPlanner planner = ExecutionPlannerFactory.getPlanner(config.getExecutionPlan(), sourceCache, targetCache);
135
        assert planner != null;
136
        NestedPlan plan = planner.plan(rwLs);
137
        ExecutionEngine engine = ExecutionEngineFactory.getEngine("Default", sourceCache, targetCache,
138
                config.getSourceInfo().getVar(), config.getTargetInfo().getVar());
139

    
140
        Mapping verificationMapping = engine.execute(plan); //mappings for verification
141
        Mapping acceptanceMapping = verificationMapping.getSubMap(config.getAcceptanceThreshold()); //mappings for acceptance (auta theloume)
142
        //output
143
        for (String source : acceptanceMapping.getMap().keySet()) {//gia kathe source blepoume ta targets
144
            for (String target : acceptanceMapping.getMap().get(source).keySet()) {//gia kathe target blepoume to confidence
145
                context.write(new Text(source), new Text(target + "," + acceptanceMapping.getConfidence(source, target)));
146
                context.getCounter(LIMES_COUNTERS.WRITTEN_OUT_ENTITIES).increment(1);
147
            }
148
        }
149
    }
150

    
151
    @Override
152
    protected void cleanup(Context context) throws IOException, InterruptedException {
153

    
154
    }
155

    
156
}
(1-1/5)