Project

General

Profile

« Previous | Next » 

Revision 49417

stream claims from the jdbcTemplate instead of load them in memory

View differences:

modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2 2

  
3
import java.util.List;
3
import java.util.concurrent.atomic.AtomicInteger;
4 4

  
5 5
import com.googlecode.sarasvati.Arc;
6 6
import com.googlecode.sarasvati.NodeToken;
......
38 38
		//TODO: use claim.claim_date from the claim db
39 39
		long timestamp = System.currentTimeMillis();
40 40
		setTotal(getClaimDatabaseUtils().count(getCountQuery()));
41
		List<Claim> claimRels = getClaimDatabaseUtils().query(getSql());
42
		int totalWrites = 0;
43
		int valid = 0;
44
		int discardedClaims = 0;
45 41

  
46
		HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
42
		final AtomicInteger totalWrites = new AtomicInteger(0);
43
		final AtomicInteger discardedClaims = new AtomicInteger(0);
47 44

  
48
		for (Claim claim : claimRels) {
45
		final HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
46

  
47
		getClaimDatabaseUtils().query(getSql()).forEach(claim -> {
49 48
			log.debug(claim);
50 49
			try {
51 50
				String sourceId = getFullId(getOpenAIREType(claim.getSourceType()), claim.getSource());
......
53 52
				String value = getValue(sourceId, claim.getSemantics(), targetId, timestamp);
54 53
				// adding relationship
55 54
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), sourceId, claim.getSemantics(), targetId, value);
56
				totalWrites++;
55
				totalWrites.incrementAndGet();
57 56

  
58 57
				String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
59 58
				String inverseValue = getValue(targetId, inverseSemantics, sourceId, timestamp);
60 59
				//adding inverse relationship
61 60
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), targetId, inverseSemantics, sourceId, inverseValue);
62
				totalWrites++;
61
				totalWrites.incrementAndGet();
63 62
				incrementProcessed();
64
			} catch (IllegalArgumentException e) {
63
			} catch (Exception e) {
65 64
				log.error("Discarding claim " + claim + ". Cause: " + e.getMessage());
66
				discardedClaims++;
65
				discardedClaims.incrementAndGet();
67 66
			}
68
		}
69

  
67
		});
70 68
		log.info("totalClaimRels: " + getTotal());
71 69
		token.getEnv().setAttribute("claimRelsSize", getTotal());
72
		log.info("claim rels writeOps: " + totalWrites);
73
		token.getEnv().setAttribute("claimRelsWriteOps", totalWrites);
74
		log.info("validClaimRels: " + valid);
75
		token.getEnv().setAttribute("validClaimRels", valid);
76
		log.info("discardedClaimRels: " + discardedClaims);
77
		token.getEnv().setAttribute("discardedClaimRels", discardedClaims);
70
		log.info("claim rels writeOps: " + totalWrites.intValue());
71
		token.getEnv().setAttribute("claimRelsWriteOps", totalWrites.intValue());
72
		log.info("discardedClaimRels: " + discardedClaims.intValue());
73
		token.getEnv().setAttribute("discardedClaimRels", discardedClaims.intValue());
78 74

  
79 75
		return Arc.DEFAULT_ARC;
80 76
	}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimUpdatesJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2 2

  
3
import java.util.List;
3
import java.util.concurrent.atomic.AtomicInteger;
4 4

  
5 5
import com.googlecode.sarasvati.Arc;
6 6
import com.googlecode.sarasvati.NodeToken;
......
29 29
		//TODO: use claim.claim_date from the claim db
30 30
		long timestamp = System.currentTimeMillis();
31 31
		setTotal(getClaimDatabaseUtils().count(getCountQuery()));
32
		List<Claim> claimUpdates = this.getClaimDatabaseUtils().query(getSql());
33
		int discardedClaims = 0;
34 32

  
35
		HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
33
		final AtomicInteger discardedClaims = new AtomicInteger(0);
34
		final HadoopService hadoopService = getServiceLocator().getService(HadoopService.class);
36 35

  
37
		for (Claim claim : claimUpdates) {
36
		getClaimDatabaseUtils().query(getSql()).forEach(claim -> {
38 37
			try {
39 38
				log.debug(claim);
40 39
				String contextId = claim.getSource();
......
43 42
				String value = getValue(rowKey, contextId, timestamp);
44 43
				hadoopService.addHBaseColumn(getClusterName(), getTableName(), rowKey, "result", "update_" + System.nanoTime(), value);
45 44
				incrementProcessed();
46
			} catch (IllegalArgumentException e) {
45
			} catch (Exception e) {
47 46
				log.error("Discarding claim " + claim + ". Cause: " + e.getMessage());
48
				discardedClaims++;
47
				discardedClaims.incrementAndGet();
49 48
			}
50
		}
51

  
49
		});
52 50
		log.info("Total Claim Updates: " + getTotal());
53 51
		token.getEnv().setAttribute("claimUpdatesSize", getTotal());
54 52
		log.info("Claim updates: " + getProcessed());
55 53
		token.getEnv().setAttribute("claimUpdates", getProcessed());
56
		log.info("Discarded Claim Updates: " + discardedClaims);
57
		token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims);
54
		log.info("Discarded Claim Updates: " + discardedClaims.intValue());
55
		token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims.intValue());
58 56

  
59 57
		return Arc.DEFAULT_ARC;
60 58
	}
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ClaimDatabaseUtils.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2 2

  
3
import java.util.List;
3
import java.util.Iterator;
4
import java.util.NoSuchElementException;
5
import java.util.Spliterator;
6
import java.util.Spliterators;
7
import java.util.stream.Stream;
8
import java.util.stream.StreamSupport;
9
import javax.annotation.PostConstruct;
4 10

  
5 11
import eu.dnetlib.enabling.database.DataSourceFactory;
6 12
import eu.dnetlib.enabling.database.utils.JdbcTemplateFactory;
7 13
import org.springframework.beans.factory.annotation.Autowired;
8 14
import org.springframework.beans.factory.annotation.Value;
15
import org.springframework.jdbc.core.JdbcTemplate;
16
import org.springframework.jdbc.support.rowset.SqlRowSet;
9 17
import org.springframework.stereotype.Component;
10 18

  
11 19
/**
......
21 29
	@Autowired
22 30
	private DataSourceFactory claimsDataSourceFactory;
23 31

  
32
	private JdbcTemplate jdbcTemplate;
33

  
24 34
	@Value("${dnet.openaire.claims.db.name}")
25 35
	private String dbName;
26 36

  
27
	public List<Claim> query(final String sqlQuery) {
28
		this.claimsJdbcTemplateFactory.setDataSourceFactory(claimsDataSourceFactory);
29
		return this.claimsJdbcTemplateFactory.createJdbcTemplate(dbName).query(sqlQuery,
30
				(rs, rowNum) -> new Claim().setSemantics(rs.getString("semantics"))
31
						.setSource(rs.getString("source_id")).setSourceType(rs.getString("source_type"))
32
						.setTarget(rs.getString("target_id")).setTargetType(rs.getString("target_type")));
37
	@PostConstruct
38
	public void init() {
39
		claimsJdbcTemplateFactory.setDataSourceFactory(claimsDataSourceFactory);
40
		this.jdbcTemplate = claimsJdbcTemplateFactory.createJdbcTemplate(dbName);
33 41
	}
34 42

  
35 43
	public int count(final String sqlCountQuery) {
36
		this.claimsJdbcTemplateFactory.setDataSourceFactory(claimsDataSourceFactory);
37
		return this.claimsJdbcTemplateFactory.createJdbcTemplate(dbName).queryForObject(sqlCountQuery, Integer.class);
44
		return jdbcTemplate.queryForObject(sqlCountQuery, Integer.class);
38 45
	}
39 46

  
47
	public Stream<Claim> query(final String sqlQuery) {
48
		final SqlRowSet rowSet = jdbcTemplate.queryForRowSet(sqlQuery);
49
		return StreamSupport.stream(
50
				Spliterators.spliteratorUnknownSize(
51
					new Iterator<Claim>() {
52

  
53
						@Override
54
						public boolean hasNext() {
55
							return !rowSet.isLast();
56
						}
57

  
58
						@Override
59
						public Claim next() {
60
							if (!rowSet.next()) {
61
								throw new NoSuchElementException();
62
							}
63
							return asClaim(rowSet);
64
						}
65
					},
66
					Spliterator.IMMUTABLE), false);
67
	}
68

  
69
	private Claim asClaim(SqlRowSet rs) {
70
		return new Claim()
71
				.setSemantics(rs.getString("semantics"))
72
				.setSource(rs.getString("source_id")).setSourceType(rs.getString("source_type"))
73
				.setTarget(rs.getString("target_id")).setTargetType(rs.getString("target_type"));
74
	}
75

  
40 76
}

Also available in: Unified diff