Project

General

Profile

1
package eu.dnetlib.msro.workflows.hadoop.hbase;
2

    
3
import java.util.Map.Entry;
4
import java.util.Set;
5

    
6
import javax.annotation.Resource;
7

    
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11

    
12
import com.google.common.base.Joiner;
13
import com.google.common.base.Splitter;
14
import com.google.common.collect.Sets;
15
import com.googlecode.sarasvati.NodeToken;
16

    
17
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
18
import eu.dnetlib.msro.rmi.MSROException;
19
import eu.dnetlib.msro.workflows.nodes.SimpleJobNode;
20

    
21
public abstract class AbstractHBaseAdminJobNode extends SimpleJobNode {
22

    
23
	/** The Constant log. */
24
	private static final Log log = LogFactory.getLog(AbstractHBaseAdminJobNode.class); // NOPMD by marko on 11/24/08 5:02 PM
25

    
26
	private String tableColumnsParamName = "columns";
27
	private String hbaseTableProperty;
28
	private String cluster;
29

    
30
	@Resource
31
	private UniqueServiceLocator serviceLocator;
32

    
33
	@Override
34
	protected void beforeStart(final NodeToken token) {
35
		for (Entry<String, String> e : parseJsonParameters(token).entrySet()) {
36
			token.getEnv().setAttribute(e.getKey(), e.getValue());
37
		}
38
	}
39

    
40
	protected String tableName(final NodeToken token) {
41
		if (token.getEnv().hasAttribute("hbaseTable")) {
42
			String table = token.getEnv().getAttribute("hbaseTable");
43
			log.debug("found override value in wfEnv for 'hbaseTable' param: " + table);
44
			return table;
45
		}
46
		return getPropertyFetcher().getProperty(getHbaseTableProperty());
47
	}
48

    
49
	protected String cluster(final NodeToken token) {
50
		if (token.getEnv().hasAttribute("cluster")) {
51
			String cluster = token.getEnv().getAttribute("cluster");
52
			log.debug("found override value in wfEnv for 'cluster' param: " + cluster);
53
			return cluster;
54
		}
55
		return getCluster();
56
	}
57

    
58
	protected Set<String> getColumns(final NodeToken token) throws MSROException {
59
		String envCols = token.getEnv().getAttribute(getTableColumnsParamName());
60
		if (StringUtils.isBlank(envCols)) { throw new MSROException("cannot find table description"); }
61
		log.debug("using columns from env: " + envCols);
62
		return Sets.newHashSet(Splitter.on(",").omitEmptyStrings().split(envCols));
63
	}
64

    
65
	protected String asCSV(final Iterable<String> columns) {
66
		return Joiner.on(",").skipNulls().join(columns);
67
	}
68

    
69
	public String getCluster() {
70
		return cluster;
71
	}
72

    
73
	public void setCluster(final String cluster) {
74
		this.cluster = cluster;
75
	}
76

    
77
	public String getHbaseTableProperty() {
78
		return hbaseTableProperty;
79
	}
80

    
81
	public void setHbaseTableProperty(final String hbaseTableProperty) {
82
		this.hbaseTableProperty = hbaseTableProperty;
83
	}
84

    
85
	public String getTableColumnsParamName() {
86
		return tableColumnsParamName;
87
	}
88

    
89
	public void setTableColumnsParamName(final String tableColumnsParamName) {
90
		this.tableColumnsParamName = tableColumnsParamName;
91
	}
92

    
93
	public UniqueServiceLocator getServiceLocator() {
94
		return serviceLocator;
95
	}
96

    
97
}
(1-1/8)