Project

General

Profile

« Previous | Next » 

Revision 45056

Added by Eri Katsari over 7 years ago

Cleaned up Limes Reducer

View differences:

LimesReducer.java
2 2

  
3 3
import com.lambdaworks.redis.RedisClient;
4 4
import com.lambdaworks.redis.RedisConnection;
5
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.FrequencyCounter;
6 5
import org.aksw.limes.core.execution.engine.ExecutionEngine;
7 6
import org.aksw.limes.core.execution.engine.ExecutionEngineFactory;
8 7
import org.aksw.limes.core.execution.planning.plan.NestedPlan;
......
21 20
import org.apache.log4j.Logger;
22 21

  
23 22
import java.io.IOException;
24
import java.util.ArrayList;
23
import java.util.Arrays;
25 24
import java.util.Iterator;
26 25
import java.util.List;
27 26

  
......
37 36
    private RedisClient client;
38 37
    private static final String OPTIMAL_BLOCKS = "optimalBlockSize";
39 38
    private long optimalBlockSize;
39
    private static final String SEPERATOR = ",";
40 40

  
41 41
    public static enum LIMES_COUNTERS {
42 42
        TARGET_TRIPLES,
43 43
        SOURCE_TRIPLES,
44 44
        WRITTEN_OUT_ENTITIES,
45 45
        LINKED_BLOCKS,
46
        TOTAL_BLOCKS,
47
        S_IN,
48
        T_IN,
49 46
        DISCARDED_BLOCKS
50 47
    }
51 48

  
......
77 74

  
78 75
    @Override
79 76
    protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException {
80
        Iterator<Text> recordsIterator = values.iterator();
81
//each list ia a block now
82
        context.getCounter(LIMES_COUNTERS.TOTAL_BLOCKS).increment(1);
83
        List<String> records = new ArrayList<>();
84
        int totalRecords = 0;
85
        boolean hasSource = false;
86
        boolean hasTarget = false;
77
        Iterator<Text> blocksIterator = values.iterator();
78
        //each item in the list is a block with the given key
87 79

  
88
        while (recordsIterator.hasNext()) {
89
            String recordId = recordsIterator.next().toString();
90
            if (recordId.contains("source_")) {
91
                hasSource = true;
92
                context.getCounter(LIMES_COUNTERS.S_IN).increment(1);
93
            }
94
            if (recordId.contains("target_")) {
95
                hasTarget = true;
96
                context.getCounter(LIMES_COUNTERS.T_IN).increment(1);
97
            }
98
            records.add(recordId);
99
            totalRecords++;
100
        }
101

  
102
        if (totalRecords > 1 && totalRecords < optimalBlockSize && hasSource && hasTarget) {
80
        while (blocksIterator.hasNext()) {
81
            List<String> records = Arrays.asList(blocksIterator.next().toString().split(SEPERATOR));
103 82
            try {
104 83
                fillLimesCache(records, context);
105 84
                linkRecords(context);
......
108 87
                throw new IOException(e);
109 88
            }
110 89
            context.getCounter(LIMES_COUNTERS.LINKED_BLOCKS).increment(1);
111
        } else {
112
            context.getCounter(LIMES_COUNTERS.DISCARDED_BLOCKS).increment(1);
113 90
        }
114

  
115 91
    }
116 92

  
117 93

  
118 94
    private void fillLimesCache(List<String> records, Context context) {
119

  
120 95
        for (String record : records) {
121 96
            String[] Fields = ((String) connection.get(record)).split(",");
122 97
            String subject = record.substring(record.indexOf("_") + 1);

Also available in: Unified diff