Project

General

Profile

1
package eu.dnetlib.data.hadoop.utils;
2

    
3
import java.io.ByteArrayOutputStream;
4
import java.io.DataOutputStream;
5
import java.io.IOException;
6
import java.util.Map;
7

    
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.hbase.client.Scan;
12
import org.apache.hadoop.hbase.filter.PrefixFilter;
13
import org.apache.hadoop.hbase.util.Base64;
14
import org.dom4j.Document;
15
import org.dom4j.Node;
16

    
17
public class ScanFactory {
18

    
19
	private static final Log log = LogFactory.getLog(ScanFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
20

    
21
	public static String getScan(final ScanProperties scanProperties) throws IOException {
22
		Scan scan = new Scan();
23

    
24
		scan.setCaching(scanProperties.getCaching());
25
		scan.setCacheBlocks(false); // don't set to true for MR jobs
26

    
27
		scan.setFilter(scanProperties.getFilterList());
28
		for (String family : scanProperties.getFamilies()) {
29
			scan.addFamily(family.getBytes());
30
		}
31

    
32
		log.debug("serializing scan");
33
		return convertScanToString(scan);
34
	}
35

    
36
	public static ScanProperties parseScanProperties(final Document doc, final Map<String, String> bbParams) {
37
		log.debug("setting job scanner");
38

    
39
		ScanProperties scanProperties = new ScanProperties(doc.valueOf("//FILTERS/@operator"));
40

    
41
		String caching = doc.valueOf("//SCAN/@caching");
42
		if (!StringUtils.isBlank(caching)) {
43
			log.info("overriding default scan caching with: " + caching);
44
			scanProperties.setCaching(Integer.valueOf(caching));
45
		}
46

    
47
		for (Object o : doc.selectNodes("//SCAN/FAMILIES/FAMILY")) {
48
			Node node = (Node) o;
49
			String value = node.valueOf("./@value");
50
			if ((value == null) || value.isEmpty()) {
51
				value = bbParams.get(node.valueOf("./@param"));
52
			}
53
			log.debug("scanner family value: " + value);
54
			scanProperties.getFamilies().add(value);
55
		}
56
		for (Object o : doc.selectNodes("//SCAN/FILTERS/FILTER")) {
57
			Node node = (Node) o;
58
			String filterType = node.valueOf("./@type");
59

    
60
			String value = node.valueOf("./@value");
61
			if ((value == null) || value.isEmpty()) {
62
				value = bbParams.get(node.valueOf("./@param"));
63
			}
64

    
65
			if (filterType.equals("prefix")) {
66
				log.debug("scanner prefix filter, value: " + value);
67
				scanProperties.getFilterList().addFilter(new PrefixFilter(value.getBytes()));
68
			} // TODO add more filterType cases here
69
		}
70
		return scanProperties;
71
	}
72

    
73
	/**
74
	 * Writes the given scan into a Base64 encoded string.
75
	 *
76
	 * @param scan
77
	 *            The scan to write out.
78
	 * @return The scan saved in a Base64 encoded string.
79
	 * @throws IOException
80
	 *             When writing the scan fails.
81
	 */
82
	private static String convertScanToString(final Scan scan) throws IOException {
83
		ByteArrayOutputStream out = new ByteArrayOutputStream();
84
		DataOutputStream dos = new DataOutputStream(out);
85
		scan.write(dos);
86
		return Base64.encodeBytes(out.toByteArray());
87
	}
88

    
89
}
(3-3/4)