Revision 45056
Added by Eri Katsari over 7 years ago
LimesReducer.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import com.lambdaworks.redis.RedisClient; |
4 | 4 |
import com.lambdaworks.redis.RedisConnection; |
5 |
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.FrequencyCounter; |
|
6 | 5 |
import org.aksw.limes.core.execution.engine.ExecutionEngine; |
7 | 6 |
import org.aksw.limes.core.execution.engine.ExecutionEngineFactory; |
8 | 7 |
import org.aksw.limes.core.execution.planning.plan.NestedPlan; |
... | ... | |
21 | 20 |
import org.apache.log4j.Logger; |
22 | 21 |
|
23 | 22 |
import java.io.IOException; |
24 |
import java.util.ArrayList;
|
|
23 |
import java.util.Arrays;
|
|
25 | 24 |
import java.util.Iterator; |
26 | 25 |
import java.util.List; |
27 | 26 |
|
... | ... | |
37 | 36 |
private RedisClient client; |
38 | 37 |
private static final String OPTIMAL_BLOCKS = "optimalBlockSize"; |
39 | 38 |
private long optimalBlockSize; |
39 |
private static final String SEPERATOR = ","; |
|
40 | 40 |
|
41 | 41 |
public static enum LIMES_COUNTERS { |
42 | 42 |
TARGET_TRIPLES, |
43 | 43 |
SOURCE_TRIPLES, |
44 | 44 |
WRITTEN_OUT_ENTITIES, |
45 | 45 |
LINKED_BLOCKS, |
46 |
TOTAL_BLOCKS, |
|
47 |
S_IN, |
|
48 |
T_IN, |
|
49 | 46 |
DISCARDED_BLOCKS |
50 | 47 |
} |
51 | 48 |
|
... | ... | |
77 | 74 |
|
78 | 75 |
@Override |
79 | 76 |
protected void reduce(final Text key, final Iterable<Text> values, final Context context) throws IOException, InterruptedException { |
80 |
Iterator<Text> recordsIterator = values.iterator(); |
|
81 |
//each list ia a block now |
|
82 |
context.getCounter(LIMES_COUNTERS.TOTAL_BLOCKS).increment(1); |
|
83 |
List<String> records = new ArrayList<>(); |
|
84 |
int totalRecords = 0; |
|
85 |
boolean hasSource = false; |
|
86 |
boolean hasTarget = false; |
|
77 |
Iterator<Text> blocksIterator = values.iterator(); |
|
78 |
//each item in the list is a block with the given key |
|
87 | 79 |
|
88 |
while (recordsIterator.hasNext()) { |
|
89 |
String recordId = recordsIterator.next().toString(); |
|
90 |
if (recordId.contains("source_")) { |
|
91 |
hasSource = true; |
|
92 |
context.getCounter(LIMES_COUNTERS.S_IN).increment(1); |
|
93 |
} |
|
94 |
if (recordId.contains("target_")) { |
|
95 |
hasTarget = true; |
|
96 |
context.getCounter(LIMES_COUNTERS.T_IN).increment(1); |
|
97 |
} |
|
98 |
records.add(recordId); |
|
99 |
totalRecords++; |
|
100 |
} |
|
101 |
|
|
102 |
if (totalRecords > 1 && totalRecords < optimalBlockSize && hasSource && hasTarget) { |
|
80 |
while (blocksIterator.hasNext()) { |
|
81 |
List<String> records = Arrays.asList(blocksIterator.next().toString().split(SEPERATOR)); |
|
103 | 82 |
try { |
104 | 83 |
fillLimesCache(records, context); |
105 | 84 |
linkRecords(context); |
... | ... | |
108 | 87 |
throw new IOException(e); |
109 | 88 |
} |
110 | 89 |
context.getCounter(LIMES_COUNTERS.LINKED_BLOCKS).increment(1); |
111 |
} else { |
|
112 |
context.getCounter(LIMES_COUNTERS.DISCARDED_BLOCKS).increment(1); |
|
113 | 90 |
} |
114 |
|
|
115 | 91 |
} |
116 | 92 |
|
117 | 93 |
|
118 | 94 |
private void fillLimesCache(List<String> records, Context context) { |
119 |
|
|
120 | 95 |
for (String record : records) { |
121 | 96 |
String[] Fields = ((String) connection.get(record)).split(","); |
122 | 97 |
String subject = record.substring(record.indexOf("_") + 1); |
Also available in: Unified diff
Cleaned up Limes Reducer