Project

General

Profile

« Previous | Next » 

Revision 40699

Added by Eri Katsari over 8 years ago

fixes for delims in funding parser; fix for relation name in relations file

View differences:

LodMapper.java
30 30
 * export
31 31
 */
32 32
public class LodMapper extends TableMapper<Text, ImmutableBytesWritable> {
33
	private Logger log = Logger.getLogger(this.getClass());
33
    private Logger log = Logger.getLogger(this.getClass());
34 34

  
35
	private Serializer serializer;
35
    private Serializer serializer;
36 36

  
37
	private EntityConfigTable entityConfigTable;
37
    private EntityConfigTable entityConfigTable;
38 38

  
39 39

  
40
	private long threshold = 100000;
40
    private long threshold = 100000;
41 41

  
42 42

  
43
	@Override
44
	protected void setup(Context context) throws IOException, InterruptedException {
43
    @Override
44
    protected void setup(Context context) throws IOException, InterruptedException {
45 45

  
46
		loadEntityConfig(context);
46
        loadEntityConfig(context);
47
        serializer = new Serializer(context.getConfiguration().get("lod.delim"), context.getConfiguration().get("lod.enclosing"));
47 48

  
48
		serializer = new Serializer();
49
		serializer.setDELIM(context.getConfiguration().get("lod.delim"));
50
		serializer.setENCLOSED(context.getConfiguration().get("lod.enclosing"));
49
    }
51 50

  
52
	}
53 51

  
54
	@Override
55
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
52
    @Override
53
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
56 54

  
57
		final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
58
		final Type type = decoder.getType();
59
		final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
55
        final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
56
        final Type type = decoder.getType();
57
        final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
60 58

  
61
		if (isValid(oaf)) {
62
			// TODO set this only when the configuration file has include dups
63
			// to fals for Results
64
			// or else were gonna get deleted by inference entries
65
			// if (!deletedByInference(oaf) ||
66
			// entityConfigTable.includeDuplicates(type)) {
67
			if (!deletedByInference(oaf)) {
68
				emitProtos(context, result, oaf);
69
			}
70
		}
59
        if (isValid(oaf)) {
60
            // TODO set this only when the configuration file has include dups
61
            // to fals for Results
62
            // or else were gonna get deleted by inference entries
63
            // if (!deletedByInference(oaf) ||
64
            // entityConfigTable.includeDuplicates(type)) {
65
            if (!deletedByInference(oaf)) {
66
                emitProtos(context, result, oaf);
67
            }
68
        }
71 69

  
72
	}
70
    }
73 71

  
74
	private boolean isValid(Oaf oaf) {
75
		try {
72
    private boolean isValid(Oaf oaf) {
73
        try {
76 74

  
77
			if (oaf != null && oaf.isInitialized()) {
78
				return true;
79
			}
75
            if (oaf != null && oaf.isInitialized()) {
76
                return true;
77
            }
80 78

  
81
		} catch (Exception e) {
82
			// log.error("OAF NOT INITIALIZED ");
83
		}
79
        } catch (Exception e) {
80
            // log.error("OAF NOT INITIALIZED ");
81
        }
84 82

  
85
		return false;
86
	}
83
        return false;
84
    }
87 85

  
88
	private void emitProtos(Context context, Result result, Oaf oaf) {
89
		//Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo()).setTimestamp(oaf.getTimestamp());
90
		Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
91
		Type type = oaf.getEntity().getType();
92
		oafBuilder.setEntity(oaf.getEntity());
86
    private void emitProtos(Context context, Result result, Oaf oaf) {
87
        //Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo()).setTimestamp(oaf.getTimestamp());
88
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
89
        Type type = oaf.getEntity().getType();
90
        oafBuilder.setEntity(oaf.getEntity());
93 91

  
94
		// emit relation first so we can cache them to entity protos
95
		emitRelation(context, result, oaf, type, oafBuilder);
96
		emitEntity(context, oaf, type, oafBuilder);
92
        // emit relation first so we can cache them to entity protos
93
        emitRelation(context, result, oaf, type, oafBuilder);
94
        emitEntity(context, oaf, type, oafBuilder);
97 95

  
98
	}
96
    }
99 97

  
100
	private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
98
    private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
101 99

  
102 100

  
103
		String serialized = serializer.serialize(oafBuilder.build());
104
		if (serialized != null && !oaf.getEntity().getId().startsWith("dedup")) {
105
			try {
106
				Text TextKeyOut = new Text("entities");
107
				context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
108
			} catch (IOException e) {
109
				log.error("Error writing entity to M/R output", e);
110
			} catch (InterruptedException e) {
111
				log.error("Error writing entity to M/R output: Job Interrupted.", e);
112
			}
113
		}
101
        String serialized = serializer.serialize(oafBuilder.build());
102
        if (serialized != null && !oaf.getEntity().getId().startsWith("dedup")) {
103
            try {
104
                Text TextKeyOut = new Text("entities");
105
                context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
106
            } catch (IOException e) {
107
                log.error("Error writing entity to M/R output", e);
108
            } catch (InterruptedException e) {
109
                log.error("Error writing entity to M/R output: Job Interrupted.", e);
110
            }
111
        }
114 112

  
115
	}
113
    }
116 114

  
117
	// may have multiple relations per each field
118
	private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
119
		// derived relations; ResultLanguages, resultConcepts etc are
120
		// created here
115
    // may have multiple relations per each field
116
    private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
117
        // derived relations; ResultLanguages, resultConcepts etc are
118
        // created here
121 119

  
122
		if (!oaf.getEntity().getId().startsWith("dedup")) {
123
			// Existing Hbase relations are generated here
124
			if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
125
				for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
120
        if (!oaf.getEntity().getId().startsWith("dedup")) {
121
            // Existing Hbase relations are generated here
122
            if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
123
                for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
126 124

  
127
					try {
125
                    try {
128 126

  
129
						final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
127
                        final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
130 128

  
131
						ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
129
                        ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
132 130

  
133
						for (OafRel rel : relOaf) {
134
							builder.getEntityBuilder().addCachedRel(rel);
131
                        for (OafRel rel : relOaf) {
132
                            builder.getEntityBuilder().addCachedRel(rel);
135 133

  
136
							try {
137
								Text TextKeyOut = new Text("relations");
138
								String buff = serializer.serialize(rel);
139
								context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
134
                            try {
135
                                Text TextKeyOut = new Text("relations");
136
                                String buff = serializer.serialize(rel);
137
                                context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
140 138

  
141
							} catch (Exception e) {
142
								log.error("Error while writing Relation Proto to M/R output", e);
143
							}
139
                            } catch (Exception e) {
140
                                log.error("Error while writing Relation Proto to M/R output", e);
141
                            }
144 142

  
145
						}
146
					} catch (Exception e) {
147
						log.error("Error while decoding Relation Proto from HBase Body", e);
143
                        }
144
                    } catch (Exception e) {
145
                        log.error("Error while decoding Relation Proto from HBase Body", e);
148 146

  
149
					}
147
                    }
150 148

  
151
				}
152
			}
153
		}
149
                }
150
            }
151
        }
154 152

  
155
		List<String> relationsList = serializer.extractRelations(oaf);
153
        List<String> relationsList = serializer.extractRelations(oaf);
156 154

  
157
		for (String rel : relationsList) {
158
			try {
159
				Text TextKeyOut = new Text("relations");
155
        for (String rel : relationsList) {
156
            try {
157
                Text TextKeyOut = new Text("relations");
160 158

  
161
				if (oaf.getEntity().getId().startsWith("dedup")) {
159
                if (oaf.getEntity().getId().startsWith("dedup")) {
162 160

  
163
					if (rel.startsWith("Dedup")) {
164
						context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
165
					}
166
				} else {
167
					context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
168
				}
161
                    if (rel.startsWith("Dedup")) {
162
                        context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
163
                    }
164
                } else {
165
                    context.write((TextKeyOut), new ImmutableBytesWritable(rel.getBytes()));
166
                }
169 167

  
170
			} catch (Exception e) {
171
				log.error("Error writing relations to output : " + rel);
172
			}
173
		}
168
            } catch (Exception e) {
169
                log.error("Error writing relations to output : " + rel);
170
            }
171
        }
174 172

  
175 173

  
176
	}
174
    }
177 175

  
178
	private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
176
    private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
179 177

  
180
		ArrayList<OafRel> rels = new ArrayList<OafRel>();
178
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
181 179

  
182
		if (hasData(columnMap)) {
180
        if (hasData(columnMap)) {
183 181

  
184
			for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
182
            for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
185 183

  
186
				final Oaf decodedOaf = decodeProto(context, e.getValue());
187
				if (isValid(decodedOaf) && !deletedByInference(decodedOaf)) {
188
					OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
189
					// skip dedups
190
					if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
191
						//		OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
192
						//		relBuilder.setCachedTarget(body.getEntity());
193
						//	rels.put("dedups", oafRel);
184
                final Oaf decodedOaf = decodeProto(context, e.getValue());
185
                if (isValid(decodedOaf) && !deletedByInference(decodedOaf)) {
186
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
187
                    // skip dedups
188
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
189
                        //		OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
190
                        //		relBuilder.setCachedTarget(body.getEntity());
191
                        //	rels.put("dedups", oafRel);
194 192

  
195
					} else {
196
						OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
197
						relBuilder.setCachedTarget(body.getEntity());
198
						rels.add(oafRel);
199
					}
200
				}
193
                    } else {
194
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
195
                        relBuilder.setCachedTarget(body.getEntity());
196
                        rels.add(oafRel);
197
                    }
198
                }
201 199

  
202
			}
203
		}
200
            }
201
        }
204 202

  
205
		return rels;
206
	}
203
        return rels;
204
    }
207 205

  
208
	private Oaf decodeProto(final Context context, final byte[] body) {
209
		try {
210
			return Oaf.parseFrom(body);
211
		} catch (InvalidProtocolBufferException e) {
212
			// log.error("Corrupted proto ", e);
206
    private Oaf decodeProto(final Context context, final byte[] body) {
207
        try {
208
            return Oaf.parseFrom(body);
209
        } catch (InvalidProtocolBufferException e) {
210
            // log.error("Corrupted proto ", e);
213 211

  
214
		}
215
		return null;
216
	}
212
        }
213
        return null;
214
    }
217 215

  
218 216

  
219
	private void loadEntityConfig(Context context) {
220
		String indexConf = context.getConfiguration().get("index.conf");
217
    private void loadEntityConfig(Context context) {
218
        String indexConf = context.getConfiguration().get("index.conf");
221 219

  
222
		if (indexConf == null || indexConf.isEmpty()) {
223
			log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
220
        if (indexConf == null || indexConf.isEmpty()) {
221
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
224 222

  
225
		}
223
        }
226 224

  
227
		entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
225
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
228 226

  
229
	}
227
    }
230 228

  
231
	private boolean isDedupSelf(final OafRelOrBuilder rel) {
232
		return rel.getSource().contains(rel.getTarget());
233
	}
229
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
230
        return rel.getSource().contains(rel.getTarget());
231
    }
234 232

  
235
	private boolean hasData(final Map<byte[], byte[]> columnMap) {
236
		return columnMap != null && !columnMap.isEmpty();
237
	}
233
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
234
        return columnMap != null && !columnMap.isEmpty();
235
    }
238 236

  
239
	private boolean deletedByInference(final Oaf oaf) {
240
		return oaf.getDataInfo().getDeletedbyinference();
241
	}
237
    private boolean deletedByInference(final Oaf oaf) {
238
        return oaf.getDataInfo().getDeletedbyinference();
239
    }
242 240

  
243
	@Override
244
	protected void cleanup(Context context) throws IOException, InterruptedException {
241
    @Override
242
    protected void cleanup(Context context) throws IOException, InterruptedException {
245 243

  
246
		super.cleanup(context);
247
	}
244
        super.cleanup(context);
245
    }
248 246

  
249
	public Serializer getSerializer() {
250
		return serializer;
251
	}
247
    public Serializer getSerializer() {
248
        return serializer;
249
    }
252 250

  
253
	public void setSerializer(Serializer serializer) {
254
		this.serializer = serializer;
255
	}
251
    public void setSerializer(Serializer serializer) {
252
        this.serializer = serializer;
253
    }
256 254

  
257
	public EntityConfigTable getEntityConfigTable() {
258
		return entityConfigTable;
259
	}
255
    public EntityConfigTable getEntityConfigTable() {
256
        return entityConfigTable;
257
    }
260 258

  
261
	public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
262
		this.entityConfigTable = entityConfigTable;
263
	}
259
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
260
        this.entityConfigTable = entityConfigTable;
261
    }
264 262

  
265 263

  
266
	public long getThreshold() {
267
		return threshold;
268
	}
264
    public long getThreshold() {
265
        return threshold;
266
    }
269 267

  
270
	public void setThreshold(long threshold) {
271
		this.threshold = threshold;
272
	}
268
    public void setThreshold(long threshold) {
269
        this.threshold = threshold;
270
    }
273 271

  
274 272
}

Also available in: Unified diff