Project

General

Profile

1
package eu.dnetlib.msro.openaireplus.workflows.nodes;
2

    
3
import com.googlecode.sarasvati.Arc;
4
import com.googlecode.sarasvati.NodeToken;
5
import eu.dnetlib.msro.rmi.MSROException;
6
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
7
import eu.dnetlib.msro.workflows.util.WorkflowsConstants;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10

    
11
public class CheckHDFSCountJobNode extends SimpleJobNode {
12

    
13
	private static final Log log = LogFactory.getLog(CheckHDFSCountJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
14

    
15
	/**
16
	 * Name of the env param where to find the number to check.
17
	 **/
18
	private String numberToVerifyParamName;
19
	/**
20
	 * Name of the enc param where to find the number of records stored to hdfs.
21
	 **/
22
	private String hdfsCounterParamName = "mainlog:storeHdfsRecords:count";
23

    
24
	/**
25
	 * {@inheritDoc}
26
	 *
27
	 * @throws MSROException
28
	 * @see com.googlecode.sarasvati.mem.MemNode#execute(com.googlecode.sarasvati.Engine, com.googlecode.sarasvati.NodeToken)
29
	 */
30
	@Override
31
	public String execute(final NodeToken token) throws MSROException {
32
		int hdfsCount = 0;
33
		int hbaseCount = 0;
34
		String numberToVerifyInEnvParam = WorkflowsConstants.BLACKBOARD_PARAM_PREFIX + numberToVerifyParamName;
35
		hbaseCount = getValueFromEnv(token, numberToVerifyInEnvParam);
36
		hdfsCount = getValueFromEnv(token, hdfsCounterParamName);
37

    
38
		if (hdfsCount != hbaseCount) {
39
			log.warn("Numbers are not the same. Number to verify (hbase count): " + hbaseCount + ". From hdfs: " + hdfsCount);
40
			log.warn((hdfsCount - hbaseCount) + " records come without a 'metadata' field");
41
			token.getEnv().setAttribute("noMetadataRecords", hdfsCount - hbaseCount);
42
		}
43
		return Arc.DEFAULT_ARC;
44

    
45
	}
46

    
47
	private int getValueFromEnv(final NodeToken token, final String paramName) throws MSROException {
48
		String valueString = token.getEnv().getAttribute(paramName);
49
		int number = 0;
50
		try {
51
			number = Integer.parseInt(valueString);
52
		} catch (NumberFormatException e) {
53
			log.error("Env param name: " + paramName + " is " + valueString + " and cannot be parsed as integer");
54
			throw new MSROException(e);
55
		}
56
		return number;
57
	}
58

    
59
	public String getNumberToVerifyParamName() {
60
		return numberToVerifyParamName;
61
	}
62

    
63
	public void setNumberToVerifyParamName(final String numberToVerifyParamName) {
64
		this.numberToVerifyParamName = numberToVerifyParamName;
65
	}
66

    
67
	public String getHdfsCounterParamName() {
68
		return hdfsCounterParamName;
69
	}
70

    
71
	public void setHdfsCounterParamName(final String hdfsCounterParamName) {
72
		this.hdfsCounterParamName = hdfsCounterParamName;
73
	}
74

    
75
}
(1-1/24)