Project

General

Profile

« Previous | Next » 

Revision 53915

handle dates without time

View differences:

LodMapper.java
20 20
import org.apache.log4j.Logger;
21 21

  
22 22
import java.io.IOException;
23
import java.text.ParseException;
23 24
import java.text.SimpleDateFormat;
24 25
import java.util.*;
25 26
import java.util.Map.Entry;
......
29 30
 * export
30 31
 */
31 32
public class LodMapper extends TableMapper<Text, Text> {
32
    private static final String DATE_OF_TRANSFORMATION_PATTERN = "yyyy-MM-dd'T'HH:mm:ss";
33
    private static final String LAST_EXECUTION_DATE_PATTERN = "yyyy-MM-dd";
34
    private Logger log = Logger.getLogger(this.getClass());
35
    private EntityConfigTable entityConfigTable;
36
    private String lastExecutionDate = "";
33
	private static final String DATE_OF_TRANSFORMATION_PATTERN = "yyyy-MM-dd'T'HH:mm:ss";
34
	private static final String LAST_EXECUTION_DATE_PATTERN = "yyyy-MM-dd";
35
	private Logger log = Logger.getLogger(this.getClass());
36
	private EntityConfigTable entityConfigTable;
37
	private String lastExecutionDate = "";
37 38

  
39
	public static enum ENTITIES_COUNTER {
40
		RESULT, PROJECT, DATASOURCE, PERSON, ORGANIZATION, RESULT_NO_DATE, PROJECT_NO_DATE, DATASOURCE_NO_DATE,
41
		PERSON_NO_DATE, RGANIZATION_NO_DATE, DELETED_BY_INFERENCE, NOT_DELETED_BY_INFERENCE, TOTAL_ENTITIES,
42
		TOTAL_RELATIONS, UPDATED, NOT_UPDATED, NO_DATE
38 43

  
39
    public static enum ENTITIES_COUNTER {
40
        RESULT,
41
        PROJECT,
42
        DATASOURCE,
43
        PERSON,
44
        ORGANIZATION,
45
        DELETED_BY_INFERENCE,
46
        NOT_DELETED_BY_INFERENCE,
47
        TOTAL_ENTITIES,
48
        TOTAL_RELATIONS,
49
        UPDATED,
50
        NOT_UPDATED
44
	}
51 45

  
52
    }
46
	;
53 47

  
54
    ;
48
	private String DELIM;
55 49

  
56
    private String DELIM;
50
	@Override
51
	protected void setup(Context context) throws IOException, InterruptedException {
52
		loadEntityConfig(context);
53
		DELIM = context.getConfiguration().get("lod.delim");
54
		lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
57 55

  
58
    @Override
59
    protected void setup(Context context) throws IOException, InterruptedException {
60
        loadEntityConfig(context);
61
        DELIM = context.getConfiguration().get("lod.delim");
62
        lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
56
	}
63 57

  
64
    }
58
	@Override
59
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context)
60
			throws IOException {
65 61

  
62
		final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
63
		final Type type = decoder.getType();
64
		final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
66 65

  
67
    @Override
68
    protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
66
		if (isValid(oaf)) {
69 67

  
70
        final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
71
        final Type type = decoder.getType();
72
        final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
68
			if (deletedByInference(oaf)) {
69
				context.getCounter(ENTITIES_COUNTER.DELETED_BY_INFERENCE).increment(1);
70
			} else {
71
				context.getCounter(ENTITIES_COUNTER.NOT_DELETED_BY_INFERENCE).increment(1);
72
			}
73 73

  
74
        if (isValid(oaf)) {
74
			context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(1);
75
			try {
76
				emitProtos(context, result, oaf);
77
			} catch (ParseException e) {
78
				// TODO Auto-generated catch block
79
				e.printStackTrace();
80
			}
81
		}
75 82

  
76
            if (deletedByInference(oaf)) {
77
                context.getCounter(ENTITIES_COUNTER.DELETED_BY_INFERENCE).increment(1);
78
            } else {
79
                context.getCounter(ENTITIES_COUNTER.NOT_DELETED_BY_INFERENCE).increment(1);
80
            }
83
	}
81 84

  
82
            context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(1);
83
            emitProtos(context, result, oaf);
84
        }
85
	private boolean isValid(Oaf oaf) {
86
		try {
87
			if (oaf != null && oaf.isInitialized()) {
88
				return true;
89
			}
85 90

  
86
    }
91
		} catch (Exception e) {
92
			log.error("invalid proto", e);
93
		}
87 94

  
88
    private boolean isValid(Oaf oaf) {
89
        try {
90
            if (oaf != null && oaf.isInitialized()) {
91
                return true;
92
            }
95
		return false;
96
	}
93 97

  
94
        } catch (Exception e) {
95
            log.error("invalid proto", e);
96
        }
98
	private boolean isUpdated(Oaf oaf, Context context) throws IOException, ParseException {
99
		String dateOfTransformationString = "";
100
		try {
101
			SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_OF_TRANSFORMATION_PATTERN,
102
					Locale.getDefault());
103
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
104
			dateOfTransformationString = oaf.getEntity().getDateoftransformation();
105
			if (dateOfTransformationString == null || dateOfTransformationString.isEmpty()
106
					|| dateOfTransformationString.equals(" ")) {
107
				context.getCounter(ENTITIES_COUNTER.NO_DATE).increment(1);
108
				return true;
109
			}
97 110

  
98
        return false;
99
    }
111
			Date dateOfTransformation = simpleDateFormat.parse(dateOfTransformationString);
100 112

  
101
    private boolean isUpdated(Oaf oaf) throws IOException {
102
        String dateOfTransformationString = "";
103
        try {
104
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_OF_TRANSFORMATION_PATTERN, Locale.getDefault());
105
            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
106
            dateOfTransformationString = oaf.getEntity().getDateoftransformation();
107
            if (dateOfTransformationString == null || dateOfTransformationString.isEmpty() || dateOfTransformationString.equals(" ")) {
108
                return true;
109
            }
113
			SimpleDateFormat lastExecDateFormatter = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN);
114
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
115
			Date lastExecDate = lastExecDateFormatter.parse(lastExecutionDate);
110 116

  
111
            Date dateOfTransformation = simpleDateFormat.parse(dateOfTransformationString);
117
			if (lastExecDate.before(dateOfTransformation)) {
118
				context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
119
				return true;
120
			}
121
		} catch (ParseException pe) {
122
			SimpleDateFormat simpleDateFormat = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN, Locale.getDefault());
123
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
124
			dateOfTransformationString = oaf.getEntity().getDateoftransformation();
125
			if (dateOfTransformationString == null || dateOfTransformationString.isEmpty()
126
					|| dateOfTransformationString.equals(" ")) {
127
				context.getCounter(ENTITIES_COUNTER.NO_DATE).increment(1);
128
				return true;
129
			}
112 130

  
113
            SimpleDateFormat lastExecDateFormatter = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN);
114
            simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
115
            Date lastExecDate = lastExecDateFormatter.parse(lastExecutionDate);
131
			Date dateOfTransformation = simpleDateFormat.parse(dateOfTransformationString);
132
			SimpleDateFormat lastExecDateFormatter = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN);
133
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
134
			Date lastExecDate = lastExecDateFormatter.parse(lastExecutionDate);
116 135

  
117
            if (lastExecDate.before(dateOfTransformation)) {
118
                return true;
119
            }
120
        } catch (Exception e) {
121
            log.error("invalid date " + dateOfTransformationString, e);
122
            throw new IOException(e);
123
        }
136
			if (lastExecDate.before(dateOfTransformation)) {
137
				context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
138
				return true;
139
			}
140
		} catch (Exception e) {
141
			log.error("invalid date " + dateOfTransformationString, e);
142
			throw new IOException(e);
143
		}
144
		context.getCounter(ENTITIES_COUNTER.NOT_UPDATED).increment(1);
145
		return false;
146
	}
124 147

  
125
        return false;
126
    }
148
	private void emitProtos(Context context, Result result, Oaf oaf) throws IOException, ParseException {
149
		Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
150
		Type type = oaf.getEntity().getType();
151
		oafBuilder.setEntity(oaf.getEntity());
152
		// emit relation first so we can cache them to entity protos
153
		emitRelation(context, result, oaf, type, oafBuilder);
127 154

  
128
    private void emitProtos(Context context, Result result, Oaf oaf) throws IOException {
129
        Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
130
        Type type = oaf.getEntity().getType();
131
        oafBuilder.setEntity(oaf.getEntity());
132
        // emit relation first so we can cache them to entity protos
133
        emitRelation(context, result, oaf, type, oafBuilder);
155
		if (isUpdated(oaf, context)) {
156
			emitEntity(context, oaf, oafBuilder);
157
		}
158
	}
134 159

  
135
        if (isUpdated(oaf)) {
136
            emitEntity(context, oaf, oafBuilder);
137
        }
138
    }
160
	private void emitEntity(Context context, Oaf oaf, Oaf.Builder oafBuilder) {
161
		String serialized = Serializer.serialize(oafBuilder.build(), DELIM);
139 162

  
140
    private void emitEntity(Context context, Oaf oaf, Oaf.Builder oafBuilder) {
141
        String serialized = Serializer.serialize(oafBuilder.build(), DELIM);
163
		if (serialized != null && !oaf.getEntity().getId().contains("dedup")) {
142 164

  
143
        if (serialized != null && !oaf.getEntity().getId().contains("dedup")) {
165
			try {
166
				context.getCounter(ENTITIES_COUNTER.valueOf(oaf.getEntity().getType().toString().toUpperCase()))
167
						.increment(1);
168
				// output entity types so we can have seperate files for each type
169
				Text TextKeyOut = new Text(oaf.getEntity().getType().toString());
170
				context.write((TextKeyOut), new Text(serialized));
144 171

  
145
            try {
146
                context.getCounter(ENTITIES_COUNTER.valueOf(oaf.getEntity().getType().toString().toUpperCase())).increment(1);
147
                //output entity types so we can have seperate files for each type
148
                Text TextKeyOut = new Text(oaf.getEntity().getType().toString());
149
                context.write((TextKeyOut), new Text(serialized));
150
                if (!oaf.getEntity().getDateoftransformation().isEmpty()) {
151
                    context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
152
                } else {
153
                    context.getCounter(ENTITIES_COUNTER.NOT_UPDATED).increment(1);
154
                }
172
				if (oaf.getEntity().getDateoftransformation().isEmpty()) {
173
					context.getCounter(
174
							ENTITIES_COUNTER.valueOf(oaf.getEntity().getType().toString().toUpperCase() + "_NO_DATE"))
175
							.increment(1);
176
//                    context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
177
				}
178
//                else {
179
//                    context.getCounter(ENTITIES_COUNTER.NOT_UPDATED).increment(1);
180
//                }
155 181

  
156
            } catch (Exception e) {
157
                log.error("Error writing entity to M/R output", e);
158
            }
159
        }
182
			} catch (Exception e) {
183
				log.error("Error writing entity to M/R output", e);
184
			}
185
		}
160 186

  
161
    }
187
	}
162 188

  
163
    // may have multiple relations per each field
164
    private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
165
        // derived relations; ResultLanguages, resultConcepts etc are
166
        // created here
189
	// may have multiple relations per each field
190
	private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
191
		// derived relations; ResultLanguages, resultConcepts etc are
192
		// created here
167 193

  
168
        if (!oaf.getEntity().getId().contains("dedup")) {
169
            // Existing Hbase relations are generated here
170
            if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
171
                for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
172
                    try {
194
		if (!oaf.getEntity().getId().contains("dedup")) {
195
			// Existing Hbase relations are generated here
196
			if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
197
				for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
198
					try {
173 199

  
174
                        final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
200
						final Map<byte[], byte[]> columnMap = result
201
								.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
175 202

  
176
                        List<OafRel> relOaf = decodeRelation(oaf, columnMap, ld);
203
						List<OafRel> relOaf = decodeRelation(oaf, columnMap, ld);
177 204

  
205
						for (OafRel rel : relOaf) {
206
							builder.getEntityBuilder().addCachedRel(rel);
207
							try {
208
								// keep all relations to one file
209
								Text TextKeyOut = new Text("relations");
178 210

  
179
                        for (OafRel rel : relOaf) {
180
                            builder.getEntityBuilder().addCachedRel(rel);
181
                            try {
182
                                //keep all relations to one file
183
                                Text TextKeyOut = new Text("relations");
211
								String buff = Serializer.serialize(rel, DELIM);
184 212

  
185
                                String buff = Serializer.serialize(rel, DELIM);
213
								if (!buff.isEmpty() && !rel.getTarget().contains("dedup")) {
214
									context.write((TextKeyOut), new Text(buff));
215
									context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
216
								}
186 217

  
187
                                if (!buff.isEmpty() && !rel.getTarget().contains("dedup")) {
188
                                    context.write((TextKeyOut), new Text(buff));
189
                                    context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
190
                                }
218
							} catch (Exception e) {
219
								log.error("Error while writing Relation Proto to M/R output", e);
220
							}
191 221

  
192
                            } catch (Exception e) {
193
                                log.error("Error while writing Relation Proto to M/R output", e);
194
                            }
222
						}
195 223

  
196
                        }
224
						relOaf.clear();
225
					} catch (Exception e) {
226
						log.error("Error while decoding Relation Proto from HBase Body", e);
197 227

  
198
                        relOaf.clear();
199
                    } catch (Exception e) {
200
                        log.error("Error while decoding Relation Proto from HBase Body", e);
228
					}
201 229

  
202
                    }
230
				}
231
			}
232
		}
203 233

  
204
                }
205
            }
206
        }
234
		Set<String> relationsList = new HashSet<String>();
207 235

  
236
		Serializer.extractRelations(oaf, DELIM, relationsList);
208 237

  
209
        Set<String> relationsList = new HashSet<String>();
238
		for (String rel : relationsList) {
239
			try {
210 240

  
211
        Serializer.extractRelations(oaf, DELIM, relationsList);
241
				Text TextKeyOut = new Text("relations");
212 242

  
213
        for (String rel : relationsList) {
214
            try {
243
				if (!oaf.getEntity().getId().contains("dedup")) {
244
					if (!rel.contains("dedup")) {
245
						context.write((TextKeyOut), new Text(rel));
246
						context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
215 247

  
216
                Text TextKeyOut = new Text("relations");
248
					}
249
				} else {
250
					// for dedup entities write only dedup relationships: all the permutations
251
					// of children
252
					if (rel.contains("dedup")) {
253
						context.write((TextKeyOut), new Text(rel));
254
						context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
255
					}
217 256

  
218
                if (!oaf.getEntity().getId().contains("dedup")) {
219
                    if (!rel.contains("dedup")) {
220
                        context.write((TextKeyOut), new Text(rel));
221
                        context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
257
				}
222 258

  
223
                    }
224
                } else {
225
                    //for dedup entities write only dedup relationships: all the permutations
226
                    // of children
227
                    if (rel.contains("dedup")) {
228
                        context.write((TextKeyOut), new Text(rel));
229
                        context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
230
                    }
259
			} catch (Exception e) {
260
				log.error("Error writing relations to output : " + rel);
261
			}
262
		}
263
		relationsList.clear();
264
	}
231 265

  
232
                }
266
	private ArrayList<OafRel> decodeRelation(final Oaf body, Map<byte[], byte[]> columnMap, final LinkDescriptor ld)
267
			throws IOException, InterruptedException {
233 268

  
234
            } catch (Exception e) {
235
                log.error("Error writing relations to output : " + rel);
236
            }
237
        }
238
        relationsList.clear();
239
    }
269
		ArrayList<OafRel> rels = new ArrayList<OafRel>();
240 270

  
271
		if (hasData(columnMap)) {
241 272

  
242
    private ArrayList<OafRel> decodeRelation(final Oaf body, Map<byte[], byte[]> columnMap,
243
                                             final LinkDescriptor ld) throws IOException, InterruptedException {
273
			for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
244 274

  
245
        ArrayList<OafRel> rels = new ArrayList<OafRel>();
275
				final Oaf decodedOaf = decodeProto(e.getValue());
276
				if (isValid(decodedOaf)) {
277
					OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
246 278

  
247
        if (hasData(columnMap)) {
279
					// skip dedups
248 280

  
249
            for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
281
					if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString())
282
							&& isDedupSelf(relBuilder)) {
283
						// log.error("invalid protto", e);
250 284

  
251
                final Oaf decodedOaf = decodeProto(e.getValue());
252
                if (isValid(decodedOaf)) {
253
                    OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
285
					} else {
286
						OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
287
						relBuilder.setCachedTarget(body.getEntity());
288
						rels.add(oafRel);
289
					}
290
				}
254 291

  
255
                    // skip dedups
292
			}
293
		}
256 294

  
257
                    if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
258
                        //     log.error("invalid protto", e);
295
		return rels;
296
	}
259 297

  
260
                    } else {
261
                        OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
262
                        relBuilder.setCachedTarget(body.getEntity());
263
                        rels.add(oafRel);
264
                    }
265
                }
298
	private Oaf decodeProto(final byte[] body) {
299
		try {
300
			return Oaf.parseFrom(body);
301
		} catch (InvalidProtocolBufferException e) {
302
			log.error("Invalid Protos", e);
303
		}
304
		return null;
305
	}
266 306

  
267
            }
268
        }
307
	private void loadEntityConfig(Context context) throws InterruptedException {
308
		String indexConf = context.getConfiguration().get("index.conf");
269 309

  
270
        return rels;
271
    }
310
		if (indexConf == null || indexConf.isEmpty()) {
311
			log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
312
			throw new InterruptedException("Null enitity configuration in mappper");
272 313

  
273
    private Oaf decodeProto(final byte[] body) {
274
        try {
275
            return Oaf.parseFrom(body);
276
        } catch (InvalidProtocolBufferException e) {
277
            log.error("Invalid Protos", e);
278
        }
279
        return null;
280
    }
314
		}
281 315

  
316
		entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
282 317

  
283
    private void loadEntityConfig(Context context) throws InterruptedException {
284
        String indexConf = context.getConfiguration().get("index.conf");
318
	}
285 319

  
286
        if (indexConf == null || indexConf.isEmpty()) {
287
            log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
288
            throw new InterruptedException("Null enitity configuration in mappper");
320
	private boolean isDedupSelf(final OafRelOrBuilder rel) {
321
		return rel.getSource().contains(rel.getTarget());
322
	}
289 323

  
290
        }
324
	private boolean hasData(final Map<byte[], byte[]> columnMap) {
325
		return columnMap != null && !columnMap.isEmpty();
326
	}
291 327

  
292
        entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
328
	private boolean deletedByInference(final Oaf oaf) {
329
		return oaf.getDataInfo().getDeletedbyinference();
330
	}
293 331

  
294
    }
332
	@Override
333
	protected void cleanup(Context context) throws IOException, InterruptedException {
334
		super.cleanup(context);
335
	}
295 336

  
296
    private boolean isDedupSelf(final OafRelOrBuilder rel) {
297
        return rel.getSource().contains(rel.getTarget());
298
    }
337
	public EntityConfigTable getEntityConfigTable() {
338
		return entityConfigTable;
339
	}
299 340

  
300
    private boolean hasData(final Map<byte[], byte[]> columnMap) {
301
        return columnMap != null && !columnMap.isEmpty();
302
    }
341
	public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
342
		this.entityConfigTable = entityConfigTable;
343
	}
303 344

  
304
    private boolean deletedByInference(final Oaf oaf) {
305
        return oaf.getDataInfo().getDeletedbyinference();
306
    }
307

  
308
    @Override
309
    protected void cleanup(Context context) throws IOException, InterruptedException {
310
        super.cleanup(context);
311
    }
312

  
313

  
314
    public EntityConfigTable getEntityConfigTable() {
315
        return entityConfigTable;
316
    }
317

  
318
    public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
319
        this.entityConfigTable = entityConfigTable;
320
    }
321

  
322

  
323
}
345
}

Also available in: Unified diff