Revision 49417
Added by Claudio Atzori over 6 years ago
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
2 | 2 |
|
3 |
import java.util.List;
|
|
3 |
import java.util.concurrent.atomic.AtomicInteger;
|
|
4 | 4 |
|
5 | 5 |
import com.googlecode.sarasvati.Arc; |
6 | 6 |
import com.googlecode.sarasvati.NodeToken; |
... | ... | |
38 | 38 |
//TODO: use claim.claim_date from the claim db |
39 | 39 |
long timestamp = System.currentTimeMillis(); |
40 | 40 |
setTotal(getClaimDatabaseUtils().count(getCountQuery())); |
41 |
List<Claim> claimRels = getClaimDatabaseUtils().query(getSql()); |
|
42 |
int totalWrites = 0; |
|
43 |
int valid = 0; |
|
44 |
int discardedClaims = 0; |
|
45 | 41 |
|
46 |
HadoopService hadoopService = getServiceLocator().getService(HadoopService.class); |
|
42 |
final AtomicInteger totalWrites = new AtomicInteger(0); |
|
43 |
final AtomicInteger discardedClaims = new AtomicInteger(0); |
|
47 | 44 |
|
48 |
for (Claim claim : claimRels) { |
|
45 |
final HadoopService hadoopService = getServiceLocator().getService(HadoopService.class); |
|
46 |
|
|
47 |
getClaimDatabaseUtils().query(getSql()).forEach(claim -> { |
|
49 | 48 |
log.debug(claim); |
50 | 49 |
try { |
51 | 50 |
String sourceId = getFullId(getOpenAIREType(claim.getSourceType()), claim.getSource()); |
... | ... | |
53 | 52 |
String value = getValue(sourceId, claim.getSemantics(), targetId, timestamp); |
54 | 53 |
// adding relationship |
55 | 54 |
hadoopService.addHBaseColumn(getClusterName(), getTableName(), sourceId, claim.getSemantics(), targetId, value); |
56 |
totalWrites++;
|
|
55 |
totalWrites.incrementAndGet();
|
|
57 | 56 |
|
58 | 57 |
String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics()); |
59 | 58 |
String inverseValue = getValue(targetId, inverseSemantics, sourceId, timestamp); |
60 | 59 |
//adding inverse relationship |
61 | 60 |
hadoopService.addHBaseColumn(getClusterName(), getTableName(), targetId, inverseSemantics, sourceId, inverseValue); |
62 |
totalWrites++;
|
|
61 |
totalWrites.incrementAndGet();
|
|
63 | 62 |
incrementProcessed(); |
64 |
} catch (IllegalArgumentException e) {
|
|
63 |
} catch (Exception e) { |
|
65 | 64 |
log.error("Discarding claim " + claim + ". Cause: " + e.getMessage()); |
66 |
discardedClaims++;
|
|
65 |
discardedClaims.incrementAndGet();
|
|
67 | 66 |
} |
68 |
} |
|
69 |
|
|
67 |
}); |
|
70 | 68 |
log.info("totalClaimRels: " + getTotal()); |
71 | 69 |
token.getEnv().setAttribute("claimRelsSize", getTotal()); |
72 |
log.info("claim rels writeOps: " + totalWrites); |
|
73 |
token.getEnv().setAttribute("claimRelsWriteOps", totalWrites); |
|
74 |
log.info("validClaimRels: " + valid); |
|
75 |
token.getEnv().setAttribute("validClaimRels", valid); |
|
76 |
log.info("discardedClaimRels: " + discardedClaims); |
|
77 |
token.getEnv().setAttribute("discardedClaimRels", discardedClaims); |
|
70 |
log.info("claim rels writeOps: " + totalWrites.intValue()); |
|
71 |
token.getEnv().setAttribute("claimRelsWriteOps", totalWrites.intValue()); |
|
72 |
log.info("discardedClaimRels: " + discardedClaims.intValue()); |
|
73 |
token.getEnv().setAttribute("discardedClaimRels", discardedClaims.intValue()); |
|
78 | 74 |
|
79 | 75 |
return Arc.DEFAULT_ARC; |
80 | 76 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimUpdatesJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
2 | 2 |
|
3 |
import java.util.List;
|
|
3 |
import java.util.concurrent.atomic.AtomicInteger;
|
|
4 | 4 |
|
5 | 5 |
import com.googlecode.sarasvati.Arc; |
6 | 6 |
import com.googlecode.sarasvati.NodeToken; |
... | ... | |
29 | 29 |
//TODO: use claim.claim_date from the claim db |
30 | 30 |
long timestamp = System.currentTimeMillis(); |
31 | 31 |
setTotal(getClaimDatabaseUtils().count(getCountQuery())); |
32 |
List<Claim> claimUpdates = this.getClaimDatabaseUtils().query(getSql()); |
|
33 |
int discardedClaims = 0; |
|
34 | 32 |
|
35 |
HadoopService hadoopService = getServiceLocator().getService(HadoopService.class); |
|
33 |
final AtomicInteger discardedClaims = new AtomicInteger(0); |
|
34 |
final HadoopService hadoopService = getServiceLocator().getService(HadoopService.class); |
|
36 | 35 |
|
37 |
for (Claim claim : claimUpdates) {
|
|
36 |
getClaimDatabaseUtils().query(getSql()).forEach(claim -> {
|
|
38 | 37 |
try { |
39 | 38 |
log.debug(claim); |
40 | 39 |
String contextId = claim.getSource(); |
... | ... | |
43 | 42 |
String value = getValue(rowKey, contextId, timestamp); |
44 | 43 |
hadoopService.addHBaseColumn(getClusterName(), getTableName(), rowKey, "result", "update_" + System.nanoTime(), value); |
45 | 44 |
incrementProcessed(); |
46 |
} catch (IllegalArgumentException e) {
|
|
45 |
} catch (Exception e) { |
|
47 | 46 |
log.error("Discarding claim " + claim + ". Cause: " + e.getMessage()); |
48 |
discardedClaims++;
|
|
47 |
discardedClaims.incrementAndGet();
|
|
49 | 48 |
} |
50 |
} |
|
51 |
|
|
49 |
}); |
|
52 | 50 |
log.info("Total Claim Updates: " + getTotal()); |
53 | 51 |
token.getEnv().setAttribute("claimUpdatesSize", getTotal()); |
54 | 52 |
log.info("Claim updates: " + getProcessed()); |
55 | 53 |
token.getEnv().setAttribute("claimUpdates", getProcessed()); |
56 |
log.info("Discarded Claim Updates: " + discardedClaims); |
|
57 |
token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims); |
|
54 |
log.info("Discarded Claim Updates: " + discardedClaims.intValue());
|
|
55 |
token.getEnv().setAttribute("discardedClaimUpdates", discardedClaims.intValue());
|
|
58 | 56 |
|
59 | 57 |
return Arc.DEFAULT_ARC; |
60 | 58 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ClaimDatabaseUtils.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
2 | 2 |
|
3 |
import java.util.List; |
|
3 |
import java.util.Iterator; |
|
4 |
import java.util.NoSuchElementException; |
|
5 |
import java.util.Spliterator; |
|
6 |
import java.util.Spliterators; |
|
7 |
import java.util.stream.Stream; |
|
8 |
import java.util.stream.StreamSupport; |
|
9 |
import javax.annotation.PostConstruct; |
|
4 | 10 |
|
5 | 11 |
import eu.dnetlib.enabling.database.DataSourceFactory; |
6 | 12 |
import eu.dnetlib.enabling.database.utils.JdbcTemplateFactory; |
7 | 13 |
import org.springframework.beans.factory.annotation.Autowired; |
8 | 14 |
import org.springframework.beans.factory.annotation.Value; |
15 |
import org.springframework.jdbc.core.JdbcTemplate; |
|
16 |
import org.springframework.jdbc.support.rowset.SqlRowSet; |
|
9 | 17 |
import org.springframework.stereotype.Component; |
10 | 18 |
|
11 | 19 |
/** |
... | ... | |
21 | 29 |
@Autowired |
22 | 30 |
private DataSourceFactory claimsDataSourceFactory; |
23 | 31 |
|
32 |
private JdbcTemplate jdbcTemplate; |
|
33 |
|
|
24 | 34 |
@Value("${dnet.openaire.claims.db.name}") |
25 | 35 |
private String dbName; |
26 | 36 |
|
27 |
public List<Claim> query(final String sqlQuery) { |
|
28 |
this.claimsJdbcTemplateFactory.setDataSourceFactory(claimsDataSourceFactory); |
|
29 |
return this.claimsJdbcTemplateFactory.createJdbcTemplate(dbName).query(sqlQuery, |
|
30 |
(rs, rowNum) -> new Claim().setSemantics(rs.getString("semantics")) |
|
31 |
.setSource(rs.getString("source_id")).setSourceType(rs.getString("source_type")) |
|
32 |
.setTarget(rs.getString("target_id")).setTargetType(rs.getString("target_type"))); |
|
37 |
@PostConstruct |
|
38 |
public void init() { |
|
39 |
claimsJdbcTemplateFactory.setDataSourceFactory(claimsDataSourceFactory); |
|
40 |
this.jdbcTemplate = claimsJdbcTemplateFactory.createJdbcTemplate(dbName); |
|
33 | 41 |
} |
34 | 42 |
|
35 | 43 |
public int count(final String sqlCountQuery) { |
36 |
this.claimsJdbcTemplateFactory.setDataSourceFactory(claimsDataSourceFactory); |
|
37 |
return this.claimsJdbcTemplateFactory.createJdbcTemplate(dbName).queryForObject(sqlCountQuery, Integer.class); |
|
44 |
return jdbcTemplate.queryForObject(sqlCountQuery, Integer.class); |
|
38 | 45 |
} |
39 | 46 |
|
47 |
public Stream<Claim> query(final String sqlQuery) { |
|
48 |
final SqlRowSet rowSet = jdbcTemplate.queryForRowSet(sqlQuery); |
|
49 |
return StreamSupport.stream( |
|
50 |
Spliterators.spliteratorUnknownSize( |
|
51 |
new Iterator<Claim>() { |
|
52 |
|
|
53 |
@Override |
|
54 |
public boolean hasNext() { |
|
55 |
return !rowSet.isLast(); |
|
56 |
} |
|
57 |
|
|
58 |
@Override |
|
59 |
public Claim next() { |
|
60 |
if (!rowSet.next()) { |
|
61 |
throw new NoSuchElementException(); |
|
62 |
} |
|
63 |
return asClaim(rowSet); |
|
64 |
} |
|
65 |
}, |
|
66 |
Spliterator.IMMUTABLE), false); |
|
67 |
} |
|
68 |
|
|
69 |
private Claim asClaim(SqlRowSet rs) { |
|
70 |
return new Claim() |
|
71 |
.setSemantics(rs.getString("semantics")) |
|
72 |
.setSource(rs.getString("source_id")).setSourceType(rs.getString("source_type")) |
|
73 |
.setTarget(rs.getString("target_id")).setTargetType(rs.getString("target_type")); |
|
74 |
} |
|
75 |
|
|
40 | 76 |
} |
Also available in: Unified diff
stream claims from the jdbcTemplate instead of load them in memory