Revision 49417
Added by Claudio Atzori over 6 years ago
ApplyClaimUpdatesJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
2 | 2 |
|
3 |
import java.util.List;
|
|
3 |
import java.util.concurrent.atomic.AtomicInteger;
|
|
4 | 4 |
|
5 | 5 |
import com.googlecode.sarasvati.Arc; |
6 | 6 |
import com.googlecode.sarasvati.NodeToken; |
... | ... | |
29 | 29 |
//TODO: use claim.claim_date from the claim db |
30 | 30 |
long timestamp = System.currentTimeMillis(); |
31 | 31 |
setTotal(getClaimDatabaseUtils().count(getCountQuery())); |
32 |
List<Claim> claimUpdates = this.getClaimDatabaseUtils().query(getSql()); |
|
33 |
int discardedClaims = 0; |
|
34 | 32 |
|
35 |
HadoopService hadoopService = getServiceLocator().getService(HadoopService.class); |
|
33 |
final AtomicInteger discardedClaims = new AtomicInteger(0); |
|
34 |
final HadoopService hadoopService = getServiceLocator().getService(HadoopService.class); |
|
36 | 35 |
|
37 |
for (Claim claim : claimUpdates) {
|
|
36 |
getClaimDatabaseUtils().query(getSql()).forEach(claim -> {
|
|
38 | 37 |
try { |
39 | 38 |
log.debug(claim); |
40 | 39 |
String contextId = claim.getSource(); |
... | ... | |
43 | 42 |
String value = getValue(rowKey, contextId, timestamp); |
44 | 43 |
hadoopService.addHBaseColumn(getClusterName(), getTableName(), rowKey, "result", "update_" + System.nanoTime(), value); |
45 | 44 |
incrementProcessed(); |
46 |
} catch (IllegalArgumentException e) {
|
|
45 |
} catch (Exception e) { |
|
47 | 46 |
log.error("Discarding claim " + claim + ". Cause: " + e.getMessage()); |
48 |
discardedClaims++;
|
|
47 |
discardedClaims.incrementAndGet();
|
|
49 | 48 |
} |
50 |
} |
|
51 |
|
|
49 |
}); |
|
52 | 50 |
log.info("Total Claim Updates: " + getTotal()); |
53 | 51 |
token.getEnv().setAttribute("claimUpdatesSize", getTotal()); |
54 | 52 |
log.info("Claim updates: " + getProcessed()); |
55 | 53 |
token.getEnv().setAttribute("claimUpdates", getProcessed()); |
56 |
log.info("Discarded Claim Updates: " + discardedClaims); |
|
57 |
token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims); |
|
54 |
log.info("Discarded Claim Updates: " + discardedClaims.intValue());
|
|
55 |
token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims.intValue());
|
|
58 | 56 |
|
59 | 57 |
return Arc.DEFAULT_ARC; |
60 | 58 |
} |
Also available in: Unified diff
stream claims from the jdbcTemplate instead of load them in memory