Project

General

Profile

« Previous | Next » 

Revision 47717

ApplyClaims with progress provider

View differences:

ApplyClaimRelsJobNode.java
1 1
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;
2 2

  
3
import java.io.IOException;
4 3
import java.util.List;
5 4

  
6 5
import com.googlecode.sarasvati.Arc;
......
22 21
import eu.dnetlib.data.transform.xml.AbstractDNetXsltFunctions;
23 22
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
24 23
import eu.dnetlib.msro.rmi.MSROException;
24
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
25 25
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
26
import eu.dnetlib.msro.workflows.util.ProgressProvider;
26 27
import eu.dnetlib.utils.ontologies.OntologyLoader;
27 28
import org.apache.commons.codec.binary.Base64;
28
import org.apache.commons.io.IOUtils;
29 29
import org.apache.commons.logging.Log;
30 30
import org.apache.commons.logging.LogFactory;
31 31
import org.apache.hadoop.util.StringUtils;
......
34 34
/**
35 35
 * Created by alessia on 23/10/15.
36 36
 */
37
public class ApplyClaimRelsJobNode extends SimpleJobNode {
37
public class ApplyClaimRelsJobNode extends SimpleJobNode implements ProgressJobNode {
38 38

  
39 39
	private static final Log log = LogFactory.getLog(ApplyClaimRelsJobNode.class);
40 40

  
......
47 47
	private ClaimDatabaseUtils claimDatabaseUtils;
48 48

  
49 49
	private String sql;
50
	private String countQuery;
51
	private int total = 0;
52
	private int processed = 0;
50 53

  
51 54
	private String clusterName;
52 55

  
53 56
	private String tableName;
54 57

  
55
	private String fetchSqlAsText(final String path) throws IOException {
56
		return IOUtils.toString(getClass().getResourceAsStream(path));
57
	}
58
//	private String fetchSqlAsText(final String path) throws IOException {
59
//		return IOUtils.toString(getClass().getResourceAsStream(path));
60
//	}
58 61

  
59 62
	@Override
60 63
	protected String execute(NodeToken token) throws Exception {
61 64
		//TODO: use claim.claim_date from the claim db
62 65
		long timestamp = System.currentTimeMillis();
63

  
66
		total = this.claimDatabaseUtils.count(countQuery);
64 67
		List<Claim> claimRels = this.claimDatabaseUtils.query(sql);
65
		int totalClaims = 0;
66 68
		int totalWrites = 0;
69
		int valid = 0;
67 70
		int discardedClaims = 0;
68 71

  
69 72
		HadoopService hadoopService = serviceLocator.getService(HadoopService.class);
......
84 87
			final String value)
85 88
			 */
86 89
				hadoopService.addHBaseColumn(clusterName, tableName, sourceId, claim.getSemantics(), targetId, value);
87
				totalClaims++;
90
				processed++;
88 91
				totalWrites++;
89 92

  
90 93
				String inverseSemantics = OntologyLoader.fetchInverse(claim.getSemantics());
......
97 100
			}
98 101
		}
99 102

  
100
		log.info("totalClaims: " + totalClaims);
101
		token.getEnv().setAttribute("claimSize", totalClaims);
103
		log.info("totalClaims: " + total);
104
		token.getEnv().setAttribute("claimSize", total);
102 105
		log.info("writeOps: " + totalWrites);
103 106
		token.getEnv().setAttribute("writeOps", totalWrites);
107
		log.info("validClaims: " + valid);
108
		token.getEnv().setAttribute("validClaims", valid);
104 109
		log.info("discardedClaims: " + discardedClaims);
105 110
		token.getEnv().setAttribute("discardedClaims", discardedClaims);
106 111

  
......
217 222
	public void setSql(final String sql) {
218 223
		this.sql = sql;
219 224
	}
225

  
226
	public String getCountQuery() {
227
		return countQuery;
228
	}
229

  
230
	public void setCountQuery(final String countQuery) {
231
		this.countQuery = countQuery;
232
	}
233

  
234
	@Override
235
	public ProgressProvider getProgressProvider() {
236
		return new ProgressProvider() {
237

  
238
			@Override
239
			public boolean isInaccurate() {
240
				return false;
241
			}
242

  
243
			@Override
244
			public int getTotalValue() {
245
				return total;
246
			}
247

  
248
			@Override
249
			public int getCurrentValue() {
250
				return processed;
251
			}
252
		};
253
	}
220 254
}

Also available in: Unified diff