Project

General

Profile

« Previous | Next » 

Revision 30928

implemented postgres mdstore, added some exception

View differences:

PostgresMDStore.java
24 24
 */
25 25
public class PostgresMDStore implements MDStore {
26 26

  
27
	private static final int MAX_NUM_RECORD_BEFORE_COMMIT = 10000;
27
	private static final int MAX_NUM_RECORD_BEFORE_COMMIT = 100;
28 28

  
29 29
	/** The id. */
30 30
	private String id;
......
125 125
	 */
126 126
	@Override
127 127
	public void truncate() {
128
		Connection conn = null;
128 129
		try {
129
			Connection conn = datasource.getConnection();
130
			conn = datasource.getConnection();
130 131
			// DELETE ALL ITEM IN FIELDS
131 132
			conn.setAutoCommit(false);
132 133
			String sql = String.format("delete from  %s_fields", this.id);
......
152 153
		} catch (SQLException e) {
153 154
			log.error("Erron on dropping table " + this.id);
154 155

  
156
		} finally {
157
			if (conn != null) {
158
				try {
159
					conn.close();
160
				} catch (SQLException e) {
161
					log.error("Error on get mdstore", e);
162
				}
163
			}
155 164
		}
156 165

  
157 166
	}
......
170 179
		final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(80);
171 180
		final Object sentinel = new Object();
172 181
		Thread background = null;
182
		Connection connection = null;
173 183
		try {
174
			final Connection conn = this.datasource.getConnection();
184
			connection = this.datasource.getConnection();
185
			final Connection conn = connection;
175 186
			conn.setAutoCommit(false);
176 187
			// DROP DISCARDED COLLECTION AT NEW FEEDING
177 188
			String dropDiscardedCollectionQuery = String.format("delete from %s_discarded", this.id);
......
183 194
				@Override
184 195
				public void run() {
185 196
					int commitNumber = 0;
186

  
187 197
					while (true) {
188 198
						try {
189 199
							Object record = queue.take();
190 200
							if (record == sentinel) {
191 201
								conn.commit();
192 202
								updateSizeOnMetadataTable(conn);
203
								if (conn != null) {
204
									try {
205
										conn.close();
206
									} catch (SQLException e) {
207
										log.error("Error on close mdstore", e);
208
									}
209
								}
193 210
								break;
194 211
							}
195 212
							safeFeedRecord((String) record, incremental, conn);
......
199 216
							}
200 217
						} catch (InterruptedException e) {
201 218
							log.fatal("got exception in background thread", e);
219
							if (conn != null) {
220
								try {
221
									conn.close();
222
								} catch (SQLException e1) {
223
									log.error("Error on close mdstore", e);
224
								}
225
							}
202 226
							throw new IllegalStateException(e);
227

  
203 228
						} catch (SQLException e) {
204 229
							log.fatal("got exception in background thread", e);
230
							if (conn != null) {
231
								try {
232
									conn.close();
233
								} catch (SQLException e1) {
234
									log.error("Error on close mdstore", e);
235
								}
236
							}
205 237
							throw new IllegalStateException(e);
206 238
						}
207 239
					}
......
210 242
			});
211 243
			background.start();
212 244
		} catch (SQLException e) {
245

  
246
			if (connection != null) {
247
				try {
248
					connection.close();
249
				} catch (SQLException e1) {
250
					log.error("Error on close mdstore", e);
251
				}
252
			}
213 253
			throw new IllegalStateException(e);
214 254
		}
215 255
		try {
......
269 309

  
270 310
	private void feedRecord(final String record, final boolean incremental, final Connection connection) {
271 311
		final Map<String, String> recordProperties = getRecordParser().parseRecord(record);
312

  
272 313
		log.debug("found props: " + recordProperties);
273 314
		if (recordProperties.containsKey("id")) {
274 315
			String p = "WITH upsert AS (UPDATE %s SET body=?, timestampdate=? WHERE recordid=? RETURNING *) "
......
287 328
				ps.executeUpdate();
288 329

  
289 330
			} catch (SQLException e) {
290
				log.error("Error on insert Element");
331
				log.error("Error on insert Element", e);
291 332
			}
292 333
		} else {
293 334
			if (discardRecords) {
......
365 406
	 */
366 407
	@Override
367 408
	public void deleteRecord(final String recordId) {
409
		Connection connection = null;
368 410
		try {
369
			Connection connection = this.datasource.getConnection();
411
			connection = this.datasource.getConnection();
370 412
			String sql = String.format("delete from %s where recordid = ?", this.id);
371 413
			PreparedStatement ps = connection.prepareStatement(sql);
372 414
			ps.setString(1, recordId);
373 415
			ps.executeUpdate();
374 416
		} catch (SQLException e) {
375 417
			log.error("ERROR ON DELETING record ", e);
418
		} finally {
419
			if (connection != null) {
420
				try {
421
					connection.close();
422
				} catch (SQLException e) {
423
					log.error("Error on get mdstore", e);
424
				}
425
			}
376 426
		}
377 427
	}
378 428

  
......
387 437
	 */
388 438
	@Override
389 439
	public String getRecord(final String recordId) throws DocumentNotFoundException {
440
		Connection connection = null;
390 441
		try {
391
			Connection connection = this.datasource.getConnection();
442
			connection = this.datasource.getConnection();
392 443
			String sql = String.format("select body from %s where recordid = ?", this.id);
393 444
			PreparedStatement ps = connection.prepareStatement(sql);
394 445
			ps.setString(1, recordId);
......
400 451
		} catch (SQLException e) {
401 452
			log.error("ERROR ON DELETING record ", e);
402 453
			return null;
454
		} finally {
455
			if (connection != null) {
456
				try {
457
					connection.close();
458
				} catch (SQLException e) {
459
					log.error("Error on get mdstore", e);
460
				}
461
			}
403 462
		}
404 463
	}
405 464

  

Also available in: Unified diff