Revision 47717
Added by Alessia Bardi almost 7 years ago
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ApplyClaimRelsJobNode.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims; |
2 | 2 |
|
3 |
import java.io.IOException; |
|
4 | 3 |
import java.util.List; |
5 | 4 |
|
6 | 5 |
import com.googlecode.sarasvati.Arc; |
... | ... | |
22 | 21 |
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions; |
23 | 22 |
import eu.dnetlib.enabling.locators.UniqueServiceLocator; |
24 | 23 |
import eu.dnetlib.msro.rmi.MSROException; |
24 |
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode; |
|
25 | 25 |
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode; |
26 |
import eu.dnetlib.msro.workflows.util.ProgressProvider; |
|
26 | 27 |
import eu.dnetlib.utils.ontologies.OntologyLoader; |
27 | 28 |
import org.apache.commons.codec.binary.Base64; |
28 |
import org.apache.commons.io.IOUtils; |
|
29 | 29 |
import org.apache.commons.logging.Log; |
30 | 30 |
import org.apache.commons.logging.LogFactory; |
31 | 31 |
import org.apache.hadoop.util.StringUtils; |
... | ... | |
34 | 34 |
/** |
35 | 35 |
* Created by alessia on 23/10/15. |
36 | 36 |
*/ |
37 |
public class ApplyClaimRelsJobNode extends SimpleJobNode { |
|
37 |
public class ApplyClaimRelsJobNode extends SimpleJobNode implements ProgressJobNode {
|
|
38 | 38 |
|
39 | 39 |
private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class); |
40 | 40 |
|
... | ... | |
47 | 47 |
private ClaimDatabaseUtils claimDatabaseUtils; |
48 | 48 |
|
49 | 49 |
private String sql; |
50 |
private String countQuery; |
|
51 |
private int total = 0; |
|
52 |
private int processed = 0; |
|
50 | 53 |
|
51 | 54 |
private String clusterName; |
52 | 55 |
|
53 | 56 |
private String tableName; |
54 | 57 |
|
55 |
private String fetchSqlAsText(final String path) throws IOException { |
|
56 |
return IOUtils.toString(getClass().getResourceAsStream(path)); |
|
57 |
} |
|
58 |
// private String fetchSqlAsText(final String path) throws IOException {
|
|
59 |
// return IOUtils.toString(getClass().getResourceAsStream(path));
|
|
60 |
// }
|
|
58 | 61 |
|
59 | 62 |
@Override |
60 | 63 |
protected String execute(NodeToken token) throws Exception { |
61 | 64 |
//TODO: use claim.claim_date from the claim db |
62 | 65 |
long timestamp = System.currentTimeMillis(); |
63 |
|
|
66 |
total = this.claimDatabaseUtils.count(countQuery); |
|
64 | 67 |
List<Claim> claimRels = this.claimDatabaseUtils.query(sql); |
65 |
int totalClaims = 0; |
|
66 | 68 |
int totalWrites = 0; |
69 |
int valid = 0; |
|
67 | 70 |
int discardedClaims = 0; |
68 | 71 |
|
69 | 72 |
HadoopService hadoopService = serviceLocator.getService(HadoopService.class); |
... | ... | |
84 | 87 |
final String value) |
85 | 88 |
*/ |
86 | 89 |
hadoopService.addHBaseColumn(clusterName, tableName, sourceId, claim.getSemantics(), targetId, value); |
87 |
totalClaims++;
|
|
90 |
processed++;
|
|
88 | 91 |
totalWrites++; |
89 | 92 |
|
90 | 93 |
String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics()); |
... | ... | |
97 | 100 |
} |
98 | 101 |
} |
99 | 102 |
|
100 |
log.info("totalClaims: " + totalClaims);
|
|
101 |
token.getEnv().setAttribute("claimSize", totalClaims);
|
|
103 |
log.info("totalClaims: " + total); |
|
104 |
token.getEnv().setAttribute("claimSize", total); |
|
102 | 105 |
log.info("writeOps: " + totalWrites); |
103 | 106 |
token.getEnv().setAttribute("writeOps", totalWrites); |
107 |
log.info("validClaims: " + valid); |
|
108 |
token.getEnv().setAttribute("validClaims", valid); |
|
104 | 109 |
log.info("discardedClaims: " + discardedClaims); |
105 | 110 |
token.getEnv().setAttribute("discardedClaims", discardedClaims); |
106 | 111 |
|
... | ... | |
217 | 222 |
public void setSql(final String sql) { |
218 | 223 |
this.sql = sql; |
219 | 224 |
} |
225 |
|
|
226 |
public String getCountQuery() { |
|
227 |
return countQuery; |
|
228 |
} |
|
229 |
|
|
230 |
public void setCountQuery(final String countQuery) { |
|
231 |
this.countQuery = countQuery; |
|
232 |
} |
|
233 |
|
|
234 |
@Override |
|
235 |
public ProgressProvider getProgressProvider() { |
|
236 |
return new ProgressProvider() { |
|
237 |
|
|
238 |
@Override |
|
239 |
public boolean isInaccurate() { |
|
240 |
return false; |
|
241 |
} |
|
242 |
|
|
243 |
@Override |
|
244 |
public int getTotalValue() { |
|
245 |
return total; |
|
246 |
} |
|
247 |
|
|
248 |
@Override |
|
249 |
public int getCurrentValue() { |
|
250 |
return processed; |
|
251 |
} |
|
252 |
}; |
|
253 |
} |
|
220 | 254 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/java/eu/dnetlib/msro/openaireplus/workflows/nodes/claims/ClaimDatabaseUtils.java | ||
---|---|---|
25 | 25 |
private String dbName; |
26 | 26 |
|
27 | 27 |
|
28 |
public List<Claim> query(String sqlQuery){ |
|
28 |
public List<Claim> query(final String sqlQuery){
|
|
29 | 29 |
this.claimsJdbcTemplateFactory.setDataSourceFactory(claimsDataSourceFactory); |
30 | 30 |
return this.claimsJdbcTemplateFactory.createJdbcTemplate(dbName).query(sqlQuery, |
31 | 31 |
(rs, rowNum) -> new Claim().setSemantics(rs.getString("semantics")) |
... | ... | |
33 | 33 |
.setTarget(rs.getString("target_id")).setTargetType(rs.getString("target_type"))); |
34 | 34 |
} |
35 | 35 |
|
36 |
public int count(final String sqlCountQuery){ |
|
37 |
this.claimsJdbcTemplateFactory.setDataSourceFactory(claimsDataSourceFactory); |
|
38 |
return this.claimsJdbcTemplateFactory.createJdbcTemplate(dbName).queryForObject(sqlCountQuery, Integer.class); |
|
39 |
} |
|
36 | 40 |
|
41 |
|
|
37 | 42 |
} |
modules/dnet-openaireplus-workflows/trunk/src/main/resources/eu/dnetlib/test/profiles/openaireplus/workflows/hbase/tmp_claims2hbase.xml | ||
---|---|---|
24 | 24 |
<PARAM managedBy="system" name="clusterName" required="true" type="string">DM</PARAM> |
25 | 25 |
<PARAM managedBy="user" name="tableName" required="true" type="string"></PARAM> |
26 | 26 |
<PARAM managedBy="user" name="sql" required="true" type="string">SELECT source_type, source_id, target_type, target_id, semantics FROM claim WHERE approved=TRUE AND source_type !='context'</PARAM> |
27 |
<PARAM managedBy="user" name="countSql" required="true" type="string">SELECT count(*) FROM claim WHERE approved=TRUE AND source_type !='context'</PARAM> |
|
27 | 28 |
</PARAMETERS> |
28 | 29 |
<ARCS> |
29 | 30 |
<ARC to="applyClaimUpdates"/> |
... | ... | |
35 | 36 |
<PARAM managedBy="system" name="clusterName" required="true" type="string">DM</PARAM> |
36 | 37 |
<PARAM managedBy="user" name="tableName" required="true" type="string"></PARAM> |
37 | 38 |
<PARAM managedBy="user" name="sql" required="true" type="string">SELECT source_type, source_id, target_type, target_id, semantics FROM claim WHERE approved=TRUE AND source_type ='context'</PARAM> |
39 |
<PARAM managedBy="user" name="countSql" required="true" type="string">SELECT count(*) FROM claim WHERE approved=TRUE AND source_type ='context'</PARAM> |
|
38 | 40 |
</PARAMETERS> |
39 | 41 |
<ARCS> |
40 | 42 |
<ARC to="success"/> |
Also available in: Unified diff
ApplyClaims with progress provider