Revision 40699
Added by Eri Katsari over 8 years ago
LodMapper.java | ||
---|---|---|
30 | 30 |
* export |
31 | 31 |
*/ |
32 | 32 |
public class LodMapper extends TableMapper<Text, ImmutableBytesWritable> { |
33 |
private Logger log = Logger.getLogger(this.getClass());
|
|
33 |
private Logger log = Logger.getLogger(this.getClass());
|
|
34 | 34 |
|
35 |
private Serializer serializer;
|
|
35 |
private Serializer serializer;
|
|
36 | 36 |
|
37 |
private EntityConfigTable entityConfigTable;
|
|
37 |
private EntityConfigTable entityConfigTable;
|
|
38 | 38 |
|
39 | 39 |
|
40 |
private long threshold = 100000;
|
|
40 |
private long threshold = 100000;
|
|
41 | 41 |
|
42 | 42 |
|
43 |
@Override
|
|
44 |
protected void setup(Context context) throws IOException, InterruptedException {
|
|
43 |
@Override
|
|
44 |
protected void setup(Context context) throws IOException, InterruptedException {
|
|
45 | 45 |
|
46 |
loadEntityConfig(context); |
|
46 |
loadEntityConfig(context); |
|
47 |
serializer = new Serializer(context.getConfiguration().get("lod.delim"), context.getConfiguration().get("lod.enclosing")); |
|
47 | 48 |
|
48 |
serializer = new Serializer(); |
|
49 |
serializer.setDELIM(context.getConfiguration().get("lod.delim")); |
|
50 |
serializer.setENCLOSED(context.getConfiguration().get("lod.enclosing")); |
|
49 |
} |
|
51 | 50 |
|
52 |
} |
|
53 | 51 |
|
54 |
@Override
|
|
55 |
protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
|
|
52 |
@Override
|
|
53 |
protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
|
|
56 | 54 |
|
57 |
final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
|
|
58 |
final Type type = decoder.getType();
|
|
59 |
final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
|
|
55 |
final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
|
|
56 |
final Type type = decoder.getType();
|
|
57 |
final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
|
|
60 | 58 |
|
61 |
if (isValid(oaf)) {
|
|
62 |
// TODO set this only when the configuration file has include dups
|
|
63 |
// to fals for Results
|
|
64 |
// or else were gonna get deleted by inference entries
|
|
65 |
// if (!deletedByInference(oaf) ||
|
|
66 |
// entityConfigTable.includeDuplicates(type)) {
|
|
67 |
if (!deletedByInference(oaf)) {
|
|
68 |
emitProtos(context, result, oaf);
|
|
69 |
}
|
|
70 |
}
|
|
59 |
if (isValid(oaf)) {
|
|
60 |
// TODO set this only when the configuration file has include dups
|
|
61 |
// to fals for Results
|
|
62 |
// or else were gonna get deleted by inference entries
|
|
63 |
// if (!deletedByInference(oaf) ||
|
|
64 |
// entityConfigTable.includeDuplicates(type)) {
|
|
65 |
if (!deletedByInference(oaf)) {
|
|
66 |
emitProtos(context, result, oaf);
|
|
67 |
}
|
|
68 |
}
|
|
71 | 69 |
|
72 |
}
|
|
70 |
}
|
|
73 | 71 |
|
74 |
private boolean isValid(Oaf oaf) {
|
|
75 |
try {
|
|
72 |
private boolean isValid(Oaf oaf) {
|
|
73 |
try {
|
|
76 | 74 |
|
77 |
if (oaf != null && oaf.isInitialized()) {
|
|
78 |
return true;
|
|
79 |
}
|
|
75 |
if (oaf != null && oaf.isInitialized()) {
|
|
76 |
return true;
|
|
77 |
}
|
|
80 | 78 |
|
81 |
} catch (Exception e) {
|
|
82 |
// log.error("OAF NOT INITIALIZED ");
|
|
83 |
}
|
|
79 |
} catch (Exception e) {
|
|
80 |
// log.error("OAF NOT INITIALIZED ");
|
|
81 |
}
|
|
84 | 82 |
|
85 |
return false;
|
|
86 |
}
|
|
83 |
return false;
|
|
84 |
}
|
|
87 | 85 |
|
88 |
private void emitProtos(Context context, Result result, Oaf oaf) {
|
|
89 |
//Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo()).setTimestamp(oaf.getTimestamp());
|
|
90 |
Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
|
|
91 |
Type type = oaf.getEntity().getType();
|
|
92 |
oafBuilder.setEntity(oaf.getEntity());
|
|
86 |
private void emitProtos(Context context, Result result, Oaf oaf) {
|
|
87 |
//Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo()).setTimestamp(oaf.getTimestamp());
|
|
88 |
Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
|
|
89 |
Type type = oaf.getEntity().getType();
|
|
90 |
oafBuilder.setEntity(oaf.getEntity());
|
|
93 | 91 |
|
94 |
// emit relation first so we can cache them to entity protos
|
|
95 |
emitRelation(context, result, oaf, type, oafBuilder);
|
|
96 |
emitEntity(context, oaf, type, oafBuilder);
|
|
92 |
// emit relation first so we can cache them to entity protos
|
|
93 |
emitRelation(context, result, oaf, type, oafBuilder);
|
|
94 |
emitEntity(context, oaf, type, oafBuilder);
|
|
97 | 95 |
|
98 |
}
|
|
96 |
}
|
|
99 | 97 |
|
100 |
private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
|
|
98 |
private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
|
|
101 | 99 |
|
102 | 100 |
|
103 |
String serialized = serializer.serialize(oafBuilder.build());
|
|
104 |
if (serialized != null && !oaf.getEntity().getId().startsWith("dedup")) {
|
|
105 |
try {
|
|
106 |
Text TextKeyOut = new Text("entities");
|
|
107 |
context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
|
|
108 |
} catch (IOException e) {
|
|
109 |
log.error("Error writing entity to M/R output", e);
|
|
110 |
} catch (InterruptedException e) {
|
|
111 |
log.error("Error writing entity to M/R output: Job Interrupted.", e);
|
|
112 |
}
|
|
113 |
}
|
|
101 |
String serialized = serializer.serialize(oafBuilder.build());
|
|
102 |
if (serialized != null && !oaf.getEntity().getId().startsWith("dedup")) {
|
|
103 |
try {
|
|
104 |
Text TextKeyOut = new Text("entities");
|
|
105 |
context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
|
|
106 |
} catch (IOException e) {
|
|
107 |
log.error("Error writing entity to M/R output", e);
|
|
108 |
} catch (InterruptedException e) {
|
|
109 |
log.error("Error writing entity to M/R output: Job Interrupted.", e);
|
|
110 |
}
|
|
111 |
}
|
|
114 | 112 |
|
115 |
}
|
|
113 |
}
|
|
116 | 114 |
|
117 |
// may have multiple relations per each field
|
|
118 |
private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
|
|
119 |
// derived relations; ResultLanguages, resultConcepts etc are
|
|
120 |
// created here
|
|
115 |
// may have multiple relations per each field
|
|
116 |
private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
|
|
117 |
// derived relations; ResultLanguages, resultConcepts etc are
|
|
118 |
// created here
|
|
121 | 119 |
|
122 |
if (!oaf.getEntity().getId().startsWith("dedup")) {
|
|
123 |
// Existing Hbase relations are generated here
|
|
124 |
if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
|
|
125 |
for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
|
|
120 |
if (!oaf.getEntity().getId().startsWith("dedup")) {
|
|
121 |
// Existing Hbase relations are generated here
|
|
122 |
if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
|
|
123 |
for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
|
|
126 | 124 |
|
127 |
try {
|
|
125 |
try {
|
|
128 | 126 |
|
129 |
final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
|
|
127 |
final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
|
|
130 | 128 |
|
131 |
ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
|
|
129 |
ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
|
|
132 | 130 |
|
133 |
for (OafRel rel : relOaf) {
|
|
134 |
builder.getEntityBuilder().addCachedRel(rel);
|
|
131 |
for (OafRel rel : relOaf) {
|
|
132 |
builder.getEntityBuilder().addCachedRel(rel);
|
|
135 | 133 |
|
136 |
try {
|
|
137 |
Text TextKeyOut = new Text("relations");
|
|
138 |
String buff = serializer.serialize(rel);
|
|
139 |
context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
|
|
134 |
try {
|
|
135 |
Text TextKeyOut = new Text("relations");
|
|
136 |
String buff = serializer.serialize(rel);
|
|
137 |
context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
|
|
140 | 138 |
|
141 |
} catch (Exception e) {
|
|
142 |
log.error("Error while writing Relation Proto to M/R output", e);
|
|
143 |
}
|
|
139 |
} catch (Exception e) {
|
|
140 |
log.error("Error while writing Relation Proto to M/R output", e);
|
|
141 |
}
|
|
144 | 142 |
|
145 |
}
|
|
146 |
} catch (Exception e) {
|
|
147 |
log.error("Error while decoding Relation Proto from HBase Body", e);
|
|
143 |
}
|
|
144 |
} catch (Exception e) {
|
|
145 |
log.error("Error while decoding Relation Proto from HBase Body", e);
|
|
148 | 146 |
|
149 |
}
|
|
147 |
}
|
|
150 | 148 |
|
151 |
}
|
|
152 |
}
|
|
153 |
}
|
|
149 |
}
|
|
150 |
}
|
|
151 |
}
|
|
154 | 152 |
|
155 |
List<String> relationsList = serializer.extractRelations(oaf);
|
|
153 |
List<String> relationsList = serializer.extractRelations(oaf);
|
|
156 | 154 |
|
157 |
for (String rel : relationsList) {
|
|
158 |
try {
|
|
159 |
Text TextKeyOut = new Text("relations");
|
|
155 |
for (String rel : relationsList) {
|
|
156 |
try {
|
|
157 |
Text TextKeyOut = new Text("relations");
|
|
160 | 158 |
|
161 |
if (oaf.getEntity().getId().startsWith("dedup")) {
|
|
159 |
if (oaf.getEntity().getId().startsWith("dedup")) {
|
|
162 | 160 |
|
163 |
if (rel.startsWith("Dedup")) {
|
|
164 |
context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
|
|
165 |
}
|
|
166 |
} else {
|
|
167 |
context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
|
|
168 |
}
|
|
161 |
if (rel.startsWith("Dedup")) {
|
|
162 |
context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
|
|
163 |
}
|
|
164 |
} else {
|
|
165 |
context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
|
|
166 |
}
|
|
169 | 167 |
|
170 |
} catch (Exception e) {
|
|
171 |
log.error("Error writing relations to output : " + rel);
|
|
172 |
}
|
|
173 |
}
|
|
168 |
} catch (Exception e) {
|
|
169 |
log.error("Error writing relations to output : " + rel);
|
|
170 |
}
|
|
171 |
}
|
|
174 | 172 |
|
175 | 173 |
|
176 |
}
|
|
174 |
}
|
|
177 | 175 |
|
178 |
private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
|
|
176 |
private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
|
|
179 | 177 |
|
180 |
ArrayList<OafRel> rels = new ArrayList<OafRel>();
|
|
178 |
ArrayList<OafRel> rels = new ArrayList<OafRel>();
|
|
181 | 179 |
|
182 |
if (hasData(columnMap)) {
|
|
180 |
if (hasData(columnMap)) {
|
|
183 | 181 |
|
184 |
for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
|
|
182 |
for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
|
|
185 | 183 |
|
186 |
final Oaf decodedOaf = decodeProto(context, e.getValue());
|
|
187 |
if (isValid(decodedOaf) && !deletedByInference(decodedOaf)) {
|
|
188 |
OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
|
|
189 |
// skip dedups
|
|
190 |
if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
|
|
191 |
// OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
|
|
192 |
// relBuilder.setCachedTarget(body.getEntity());
|
|
193 |
// rels.put("dedups", oafRel);
|
|
184 |
final Oaf decodedOaf = decodeProto(context, e.getValue());
|
|
185 |
if (isValid(decodedOaf) && !deletedByInference(decodedOaf)) {
|
|
186 |
OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
|
|
187 |
// skip dedups
|
|
188 |
if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
|
|
189 |
// OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
|
|
190 |
// relBuilder.setCachedTarget(body.getEntity());
|
|
191 |
// rels.put("dedups", oafRel);
|
|
194 | 192 |
|
195 |
} else {
|
|
196 |
OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
|
|
197 |
relBuilder.setCachedTarget(body.getEntity());
|
|
198 |
rels.add(oafRel);
|
|
199 |
}
|
|
200 |
}
|
|
193 |
} else {
|
|
194 |
OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
|
|
195 |
relBuilder.setCachedTarget(body.getEntity());
|
|
196 |
rels.add(oafRel);
|
|
197 |
}
|
|
198 |
}
|
|
201 | 199 |
|
202 |
}
|
|
203 |
}
|
|
200 |
}
|
|
201 |
}
|
|
204 | 202 |
|
205 |
return rels;
|
|
206 |
}
|
|
203 |
return rels;
|
|
204 |
}
|
|
207 | 205 |
|
208 |
private Oaf decodeProto(final Context context, final byte[] body) {
|
|
209 |
try {
|
|
210 |
return Oaf.parseFrom(body);
|
|
211 |
} catch (InvalidProtocolBufferException e) {
|
|
212 |
// log.error("Corrupted proto ", e);
|
|
206 |
private Oaf decodeProto(final Context context, final byte[] body) {
|
|
207 |
try {
|
|
208 |
return Oaf.parseFrom(body);
|
|
209 |
} catch (InvalidProtocolBufferException e) {
|
|
210 |
// log.error("Corrupted proto ", e);
|
|
213 | 211 |
|
214 |
}
|
|
215 |
return null;
|
|
216 |
}
|
|
212 |
}
|
|
213 |
return null;
|
|
214 |
}
|
|
217 | 215 |
|
218 | 216 |
|
219 |
private void loadEntityConfig(Context context) {
|
|
220 |
String indexConf = context.getConfiguration().get("index.conf");
|
|
217 |
private void loadEntityConfig(Context context) {
|
|
218 |
String indexConf = context.getConfiguration().get("index.conf");
|
|
221 | 219 |
|
222 |
if (indexConf == null || indexConf.isEmpty()) {
|
|
223 |
log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER : ");
|
|
220 |
if (indexConf == null || indexConf.isEmpty()) {
|
|
221 |
log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER : ");
|
|
224 | 222 |
|
225 |
}
|
|
223 |
}
|
|
226 | 224 |
|
227 |
entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
|
|
225 |
entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
|
|
228 | 226 |
|
229 |
}
|
|
227 |
}
|
|
230 | 228 |
|
231 |
private boolean isDedupSelf(final OafRelOrBuilder rel) {
|
|
232 |
return rel.getSource().contains(rel.getTarget());
|
|
233 |
}
|
|
229 |
private boolean isDedupSelf(final OafRelOrBuilder rel) {
|
|
230 |
return rel.getSource().contains(rel.getTarget());
|
|
231 |
}
|
|
234 | 232 |
|
235 |
private boolean hasData(final Map<byte[], byte[]> columnMap) {
|
|
236 |
return columnMap != null && !columnMap.isEmpty();
|
|
237 |
}
|
|
233 |
private boolean hasData(final Map<byte[], byte[]> columnMap) {
|
|
234 |
return columnMap != null && !columnMap.isEmpty();
|
|
235 |
}
|
|
238 | 236 |
|
239 |
private boolean deletedByInference(final Oaf oaf) {
|
|
240 |
return oaf.getDataInfo().getDeletedbyinference();
|
|
241 |
}
|
|
237 |
private boolean deletedByInference(final Oaf oaf) {
|
|
238 |
return oaf.getDataInfo().getDeletedbyinference();
|
|
239 |
}
|
|
242 | 240 |
|
243 |
@Override
|
|
244 |
protected void cleanup(Context context) throws IOException, InterruptedException {
|
|
241 |
@Override
|
|
242 |
protected void cleanup(Context context) throws IOException, InterruptedException {
|
|
245 | 243 |
|
246 |
super.cleanup(context);
|
|
247 |
}
|
|
244 |
super.cleanup(context);
|
|
245 |
}
|
|
248 | 246 |
|
249 |
public Serializer getSerializer() {
|
|
250 |
return serializer;
|
|
251 |
}
|
|
247 |
public Serializer getSerializer() {
|
|
248 |
return serializer;
|
|
249 |
}
|
|
252 | 250 |
|
253 |
public void setSerializer(Serializer serializer) {
|
|
254 |
this.serializer = serializer;
|
|
255 |
}
|
|
251 |
public void setSerializer(Serializer serializer) {
|
|
252 |
this.serializer = serializer;
|
|
253 |
}
|
|
256 | 254 |
|
257 |
public EntityConfigTable getEntityConfigTable() {
|
|
258 |
return entityConfigTable;
|
|
259 |
}
|
|
255 |
public EntityConfigTable getEntityConfigTable() {
|
|
256 |
return entityConfigTable;
|
|
257 |
}
|
|
260 | 258 |
|
261 |
public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
|
|
262 |
this.entityConfigTable = entityConfigTable;
|
|
263 |
}
|
|
259 |
public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
|
|
260 |
this.entityConfigTable = entityConfigTable;
|
|
261 |
}
|
|
264 | 262 |
|
265 | 263 |
|
266 |
public long getThreshold() {
|
|
267 |
return threshold;
|
|
268 |
}
|
|
264 |
public long getThreshold() {
|
|
265 |
return threshold;
|
|
266 |
}
|
|
269 | 267 |
|
270 |
public void setThreshold(long threshold) {
|
|
271 |
this.threshold = threshold;
|
|
272 |
}
|
|
268 |
public void setThreshold(long threshold) {
|
|
269 |
this.threshold = threshold;
|
|
270 |
}
|
|
273 | 271 |
|
274 | 272 |
} |
Also available in: Unified diff
fixes for delims in funding parser; fix for relation name in relations file