Revision 42280
Added by Eri Katsari about 8 years ago
LodMapper.java | ||
---|---|---|
23 | 23 |
import org.joda.time.format.DateTimeFormatter; |
24 | 24 |
|
25 | 25 |
import java.io.IOException; |
26 |
import java.util.ArrayList; |
|
27 |
import java.util.List; |
|
28 |
import java.util.Map; |
|
26 |
import java.util.*; |
|
29 | 27 |
import java.util.Map.Entry; |
30 | 28 |
|
31 | 29 |
/** |
... | ... | |
153 | 151 |
String serialized = serializer.Serialize(oafBuilder.build(), DELIM); |
154 | 152 |
|
155 | 153 |
if (serialized != null && !oaf.getEntity().getId().contains("dedup")) { |
154 |
|
|
156 | 155 |
try { |
157 | 156 |
Text TextKeyOut = new Text("entities"); |
158 | 157 |
context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes())); |
158 |
|
|
159 | 159 |
//counter |
160 | 160 |
context.getCounter(type).increment(1); |
161 | 161 |
|
... | ... | |
179 | 179 |
|
180 | 180 |
final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt())); |
181 | 181 |
|
182 |
List<OafRel> relOaf=decodeRelation(oaf, context, columnMap, ld);
|
|
182 |
List<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
|
|
183 | 183 |
|
184 |
|
|
184 | 185 |
for (OafRel rel : relOaf) { |
185 | 186 |
builder.getEntityBuilder().addCachedRel(rel); |
186 | 187 |
try { |
187 | 188 |
Text TextKeyOut = new Text("relations"); |
189 |
|
|
188 | 190 |
String buff = serializer.Serialize(rel, DELIM); |
189 | 191 |
|
190 |
if (!rel.getTarget().contains("dedup")) { |
|
192 |
if (!buff.isEmpty() && !rel.getTarget().contains("dedup")) {
|
|
191 | 193 |
context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes())); |
192 | 194 |
context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1); |
193 | 195 |
} |
194 | 196 |
|
195 |
|
|
196 | 197 |
} catch (Exception e) { |
197 | 198 |
log.error("Error while writing Relation Proto to M/R output", e); |
198 | 199 |
} |
... | ... | |
211 | 212 |
} |
212 | 213 |
|
213 | 214 |
|
214 |
List<String> relationsList = new ArrayList<String>();
|
|
215 |
Set<String> relationsList = new HashSet<String>();
|
|
215 | 216 |
|
216 |
serializer.extractRelations(oaf, DELIM,relationsList);
|
|
217 |
serializer.extractRelations(oaf, DELIM, relationsList);
|
|
217 | 218 |
|
218 | 219 |
for (String rel : relationsList) { |
219 | 220 |
try { |
221 |
|
|
220 | 222 |
Text TextKeyOut = new Text("relations"); |
223 |
|
|
221 | 224 |
if (!oaf.getEntity().getId().contains("dedup")) { |
222 |
if(!rel.contains("dedup")){
|
|
223 |
context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes())); |
|
224 |
context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1); |
|
225 |
}} |
|
226 |
else
|
|
227 |
{ |
|
225 |
if (!rel.contains("dedup")) {
|
|
226 |
context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
|
|
227 |
context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
|
|
228 |
|
|
229 |
}
|
|
230 |
} else {
|
|
228 | 231 |
//for dedup entities write only dedup relationships: all the permutations |
229 | 232 |
// of children |
230 |
if(rel.contains("dedup")){
|
|
231 |
context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes())); |
|
232 |
context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1); |
|
233 |
} |
|
233 |
if (rel.contains("dedup")) {
|
|
234 |
context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
|
|
235 |
context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
|
|
236 |
}
|
|
234 | 237 |
|
235 | 238 |
} |
236 | 239 |
|
... | ... | |
255 | 258 |
final Oaf decodedOaf = decodeProto(context, e.getValue()); |
256 | 259 |
if (isValid(decodedOaf)) { |
257 | 260 |
OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel()); |
261 |
|
|
258 | 262 |
// skip dedups |
259 | 263 |
|
260 | 264 |
if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) { |
Also available in: Unified diff
fix for "" in person ids; fix for duplicates in relations; updated delim to !