Revision 30928
Added by Sandro La Bruzzo over 9 years ago
PostgresMDStore.java | ||
---|---|---|
24 | 24 |
*/ |
25 | 25 |
public class PostgresMDStore implements MDStore { |
26 | 26 |
|
27 |
private static final int MAX_NUM_RECORD_BEFORE_COMMIT = 10000;
|
|
27 |
private static final int MAX_NUM_RECORD_BEFORE_COMMIT = 100; |
|
28 | 28 |
|
29 | 29 |
/** The id. */ |
30 | 30 |
private String id; |
... | ... | |
125 | 125 |
*/ |
126 | 126 |
@Override |
127 | 127 |
public void truncate() { |
128 |
Connection conn = null; |
|
128 | 129 |
try { |
129 |
Connection conn = datasource.getConnection();
|
|
130 |
conn = datasource.getConnection(); |
|
130 | 131 |
// DELETE ALL ITEM IN FIELDS |
131 | 132 |
conn.setAutoCommit(false); |
132 | 133 |
String sql = String.format("delete from %s_fields", this.id); |
... | ... | |
152 | 153 |
} catch (SQLException e) { |
153 | 154 |
log.error("Erron on dropping table " + this.id); |
154 | 155 |
|
156 |
} finally { |
|
157 |
if (conn != null) { |
|
158 |
try { |
|
159 |
conn.close(); |
|
160 |
} catch (SQLException e) { |
|
161 |
log.error("Error on get mdstore", e); |
|
162 |
} |
|
163 |
} |
|
155 | 164 |
} |
156 | 165 |
|
157 | 166 |
} |
... | ... | |
170 | 179 |
final BlockingQueue<Object> queue = new ArrayBlockingQueue<Object>(80); |
171 | 180 |
final Object sentinel = new Object(); |
172 | 181 |
Thread background = null; |
182 |
Connection connection = null; |
|
173 | 183 |
try { |
174 |
final Connection conn = this.datasource.getConnection(); |
|
184 |
connection = this.datasource.getConnection(); |
|
185 |
final Connection conn = connection; |
|
175 | 186 |
conn.setAutoCommit(false); |
176 | 187 |
// DROP DISCARDED COLLECTION AT NEW FEEDING |
177 | 188 |
String dropDiscardedCollectionQuery = String.format("delete from %s_discarded", this.id); |
... | ... | |
183 | 194 |
@Override |
184 | 195 |
public void run() { |
185 | 196 |
int commitNumber = 0; |
186 |
|
|
187 | 197 |
while (true) { |
188 | 198 |
try { |
189 | 199 |
Object record = queue.take(); |
190 | 200 |
if (record == sentinel) { |
191 | 201 |
conn.commit(); |
192 | 202 |
updateSizeOnMetadataTable(conn); |
203 |
if (conn != null) { |
|
204 |
try { |
|
205 |
conn.close(); |
|
206 |
} catch (SQLException e) { |
|
207 |
log.error("Error on close mdstore", e); |
|
208 |
} |
|
209 |
} |
|
193 | 210 |
break; |
194 | 211 |
} |
195 | 212 |
safeFeedRecord((String) record, incremental, conn); |
... | ... | |
199 | 216 |
} |
200 | 217 |
} catch (InterruptedException e) { |
201 | 218 |
log.fatal("got exception in background thread", e); |
219 |
if (conn != null) { |
|
220 |
try { |
|
221 |
conn.close(); |
|
222 |
} catch (SQLException e1) { |
|
223 |
log.error("Error on close mdstore", e); |
|
224 |
} |
|
225 |
} |
|
202 | 226 |
throw new IllegalStateException(e); |
227 |
|
|
203 | 228 |
} catch (SQLException e) { |
204 | 229 |
log.fatal("got exception in background thread", e); |
230 |
if (conn != null) { |
|
231 |
try { |
|
232 |
conn.close(); |
|
233 |
} catch (SQLException e1) { |
|
234 |
log.error("Error on close mdstore", e); |
|
235 |
} |
|
236 |
} |
|
205 | 237 |
throw new IllegalStateException(e); |
206 | 238 |
} |
207 | 239 |
} |
... | ... | |
210 | 242 |
}); |
211 | 243 |
background.start(); |
212 | 244 |
} catch (SQLException e) { |
245 |
|
|
246 |
if (connection != null) { |
|
247 |
try { |
|
248 |
connection.close(); |
|
249 |
} catch (SQLException e1) { |
|
250 |
log.error("Error on close mdstore", e); |
|
251 |
} |
|
252 |
} |
|
213 | 253 |
throw new IllegalStateException(e); |
214 | 254 |
} |
215 | 255 |
try { |
... | ... | |
269 | 309 |
|
270 | 310 |
private void feedRecord(final String record, final boolean incremental, final Connection connection) { |
271 | 311 |
final Map<String, String> recordProperties = getRecordParser().parseRecord(record); |
312 |
|
|
272 | 313 |
log.debug("found props: " + recordProperties); |
273 | 314 |
if (recordProperties.containsKey("id")) { |
274 | 315 |
String p = "WITH upsert AS (UPDATE %s SET body=?, timestampdate=? WHERE recordid=? RETURNING *) " |
... | ... | |
287 | 328 |
ps.executeUpdate(); |
288 | 329 |
|
289 | 330 |
} catch (SQLException e) { |
290 |
log.error("Error on insert Element"); |
|
331 |
log.error("Error on insert Element", e);
|
|
291 | 332 |
} |
292 | 333 |
} else { |
293 | 334 |
if (discardRecords) { |
... | ... | |
365 | 406 |
*/ |
366 | 407 |
@Override |
367 | 408 |
public void deleteRecord(final String recordId) { |
409 |
Connection connection = null; |
|
368 | 410 |
try { |
369 |
Connection connection = this.datasource.getConnection();
|
|
411 |
connection = this.datasource.getConnection(); |
|
370 | 412 |
String sql = String.format("delete from %s where recordid = ?", this.id); |
371 | 413 |
PreparedStatement ps = connection.prepareStatement(sql); |
372 | 414 |
ps.setString(1, recordId); |
373 | 415 |
ps.executeUpdate(); |
374 | 416 |
} catch (SQLException e) { |
375 | 417 |
log.error("ERROR ON DELETING record ", e); |
418 |
} finally { |
|
419 |
if (connection != null) { |
|
420 |
try { |
|
421 |
connection.close(); |
|
422 |
} catch (SQLException e) { |
|
423 |
log.error("Error on get mdstore", e); |
|
424 |
} |
|
425 |
} |
|
376 | 426 |
} |
377 | 427 |
} |
378 | 428 |
|
... | ... | |
387 | 437 |
*/ |
388 | 438 |
@Override |
389 | 439 |
public String getRecord(final String recordId) throws DocumentNotFoundException { |
440 |
Connection connection = null; |
|
390 | 441 |
try { |
391 |
Connection connection = this.datasource.getConnection();
|
|
442 |
connection = this.datasource.getConnection(); |
|
392 | 443 |
String sql = String.format("select body from %s where recordid = ?", this.id); |
393 | 444 |
PreparedStatement ps = connection.prepareStatement(sql); |
394 | 445 |
ps.setString(1, recordId); |
... | ... | |
400 | 451 |
} catch (SQLException e) { |
401 | 452 |
log.error("ERROR ON DELETING record ", e); |
402 | 453 |
return null; |
454 |
} finally { |
|
455 |
if (connection != null) { |
|
456 |
try { |
|
457 |
connection.close(); |
|
458 |
} catch (SQLException e) { |
|
459 |
log.error("Error on get mdstore", e); |
|
460 |
} |
|
461 |
} |
|
403 | 462 |
} |
404 | 463 |
} |
405 | 464 |
|
Also available in: Unified diff
implemented postgres mdstore, added some exception