Project

General

Profile

« Previous | Next » 

Revision 49417

stream claims from the jdbcTemplate instead of load them in memory

View differences:

ApplyClaimUpdatesJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2 2

  
3
import java.util.List;
3
import java.util.concurrent.atomic.AtomicInteger;
4 4

  
5 5
import com.googlecode.sarasvati.Arc;
6 6
import com.googlecode.sarasvati.NodeToken;
......
29 29
		//TODO: use claim.claim_date from the claim db
30 30
		long timestamp = System.currentTimeMillis();
31 31
		setTotal(getClaimDatabaseUtils().count(getCountQuery()));
32
		List<Claim> claimUpdates = this.getClaimDatabaseUtils().query(getSql());
33
		int discardedClaims = 0;
34 32

  
35
		HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
33
		final AtomicInteger discardedClaims = new AtomicInteger(0);
34
		final HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
36 35

  
37
		for (Claim claim : claimUpdates) {
36
		getClaimDatabaseUtils().query(getSql()).forEach(claim -> {
38 37
			try {
39 38
				log.debug(claim);
40 39
				String contextId = claim.getSource();
......
43 42
				String value = getValue(rowKey, contextId, timestamp);
44 43
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), rowKey, "result", "update_" + System.nanoTime(), value);
45 44
				incrementProcessed();
46
			} catch (IllegalArgumentException e) {
45
			} catch (Exception e) {
47 46
				log.error("Discarding claim " + claim + ". Cause: " + e.getMessage());
48
				discardedClaims++;
47
				discardedClaims.incrementAndGet();
49 48
			}
50
		}
51

  
49
		});
52 50
		log.info("Total Claim Updates: " + getTotal());
53 51
		token.getEnv().setAttribute("claimUpdatesSize", getTotal());
54 52
		log.info("Claim updates: " + getProcessed());
55 53
		token.getEnv().setAttribute("claimUpdates", getProcessed());
56
		log.info("Discarded Claim Updates: " + discardedClaims);
57
		token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims);
54
		log.info("Discarded Claim Updates: " + discardedClaims.intValue());
55
		token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims.intValue());
58 56

  
59 57
		return Arc.DEFAULT_ARC;
60 58
	}

Also available in: Unified diff