Revision 59405
Added by Alessia Bardi over 3 years ago
modules/cnr-resultset-client/trunk/src/main/java/eu/dnetlib/enabling/resultset/client/ResultSetClient.java | ||
---|---|---|
10 | 10 |
*/ |
11 | 11 |
public interface ResultSetClient { |
12 | 12 |
|
13 |
String SENTINEL_PROC_CANCELED = "CANCELED"; |
|
13 | 14 |
/** |
14 | 15 |
* |
15 | 16 |
* @param epr |
modules/dnet-msro-service/trunk/src/main/java/eu/dnetlib/msro/workflows/resultset/ProcessCountingResultSetListener.java | ||
---|---|---|
2 | 2 |
|
3 | 3 |
import java.util.List; |
4 | 4 |
|
5 |
import eu.dnetlib.enabling.resultset.client.ResultSetClient; |
|
5 | 6 |
import org.apache.commons.lang.StringUtils; |
6 | 7 |
import org.apache.commons.logging.Log; |
7 | 8 |
import org.apache.commons.logging.LogFactory; |
... | ... | |
45 | 46 |
public List<String> getResult(final int from, final int to) { |
46 | 47 |
if (process.isCanceled()) { |
47 | 48 |
this.outputResulset.close(); |
48 |
return Lists.newArrayList(); |
|
49 |
return Lists.newArrayList(ResultSetClient.SENTINEL_PROC_CANCELED);
|
|
49 | 50 |
} |
50 | 51 |
try { |
51 | 52 |
this.count = to; |
modules/dnet-msro-service/trunk/pom.xml | ||
---|---|---|
3 | 3 |
<parent> |
4 | 4 |
<groupId>eu.dnetlib</groupId> |
5 | 5 |
<artifactId>dnet45-parent</artifactId> |
6 |
<version>1.0.0</version> |
|
6 |
<version>1.0.0-SNAPSHOT</version>
|
|
7 | 7 |
<relativePath /> |
8 | 8 |
</parent> |
9 | 9 |
<modelVersion>4.0.0</modelVersion> |
modules/cnr-mongo-mdstore/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/mongodb/MongoMDStore.java | ||
---|---|---|
23 | 23 |
import eu.dnetlib.data.mdstore.modular.RecordParser; |
24 | 24 |
import eu.dnetlib.data.mdstore.modular.connector.MDStore; |
25 | 25 |
import eu.dnetlib.enabling.resultset.ResultSetListener; |
26 |
import eu.dnetlib.enabling.resultset.client.ResultSetClient; |
|
26 | 27 |
import org.apache.commons.lang3.StringUtils; |
27 | 28 |
import org.apache.commons.logging.Log; |
28 | 29 |
import org.apache.commons.logging.LogFactory; |
... | ... | |
81 | 82 |
while (true) { |
82 | 83 |
try { |
83 | 84 |
final Object record = queue.take(); |
85 |
if(record == ResultSetClient.SENTINEL_PROC_CANCELED){ |
|
86 |
//it means the wf was cancelled, so we must give this information back to the caller to avoid commit |
|
87 |
log.warn(String.format("Process cancelled after %d records, returning -1", count)); |
|
88 |
return -1; |
|
89 |
} |
|
84 | 90 |
if (record == sentinel) { |
85 | 91 |
bulkWritesManager.flushBulks(); |
86 | 92 |
break; |
modules/cnr-mongo-mdstore/trunk/pom.xml | ||
---|---|---|
3 | 3 |
<parent> |
4 | 4 |
<groupId>eu.dnetlib</groupId> |
5 | 5 |
<artifactId>dnet45-parent</artifactId> |
6 |
<version>1.0.0</version> |
|
6 |
<version>1.0.0-SNAPSHOT</version>
|
|
7 | 7 |
</parent> |
8 | 8 |
<modelVersion>4.0.0</modelVersion> |
9 | 9 |
<groupId>eu.dnetlib</groupId> |
... | ... | |
41 | 41 |
<dependency> |
42 | 42 |
<groupId>eu.dnetlib</groupId> |
43 | 43 |
<artifactId>cnr-modular-mdstore-service</artifactId> |
44 |
<version>[6.0.0,7.0.0)</version>
|
|
44 |
<version>[6.0.5-SNAPSHOT,7.0.0)</version>
|
|
45 | 45 |
</dependency> |
46 | 46 |
<dependency> |
47 | 47 |
<groupId>eu.dnetlib</groupId> |
modules/cnr-modular-mdstore-service/trunk/src/main/java/eu/dnetlib/data/mdstore/modular/MDStoreFeeder.java | ||
---|---|---|
60 | 60 |
writeOps = mdstore.feed(records, refresh, mdformats); |
61 | 61 |
} |
62 | 62 |
|
63 |
dao.commit(mdstore.getId(), mdId); |
|
64 |
|
|
65 |
int size = dao.refreshSize(mdId); |
|
66 |
|
|
67 |
touch(mdId, size); |
|
68 |
|
|
69 |
log.info("Finished feeding mdstore " + mdId + " - new size: " + size); |
|
70 |
|
|
71 |
doneCallback.call(buildParams(size, writeOps)); |
|
63 |
if(writeOps == -1){ |
|
64 |
//means the process was cancelled, so we must not commit |
|
65 |
log.warn(String.format("The feeding process on mdstore %s has been cancelled. Records will not be committed. Transaction %s set as invalid.", mdId, transactionId)); |
|
66 |
dao.invalidTransaction(transactionId, mdId); |
|
67 |
failCallback.call(new MDStoreServiceException(String.format("Feeding cancelled for mdstore %s on transaction %s", mdId, transactionId))); |
|
68 |
} |
|
69 |
else { |
|
70 |
dao.commit(mdstore.getId(), mdId); |
|
71 |
int size = dao.refreshSize(mdId); |
|
72 |
touch(mdId, size); |
|
73 |
log.info("Finished feeding mdstore " + mdId + " - new size: " + size); |
|
74 |
doneCallback.call(buildParams(size, writeOps)); |
|
75 |
} |
|
72 | 76 |
} catch (Throwable e) { |
73 | 77 |
if (transactionId != null) { |
74 | 78 |
dao.invalidTransaction(transactionId, mdId); |
modules/cnr-modular-mdstore-service/trunk/pom.xml | ||
---|---|---|
3 | 3 |
<parent> |
4 | 4 |
<groupId>eu.dnetlib</groupId> |
5 | 5 |
<artifactId>dnet45-parent</artifactId> |
6 |
<version>1.0.0</version> |
|
6 |
<version>1.0.0-SNAPSHOT</version>
|
|
7 | 7 |
<relativePath /> |
8 | 8 |
</parent> |
9 | 9 |
<modelVersion>4.0.0</modelVersion> |
Also available in: Unified diff
Trying to avoid mdstore to store transaction of cancelled feeding