1
|
package eu.dnetlib.data.hadoop.utils;
|
2
|
|
3
|
import java.io.ByteArrayOutputStream;
|
4
|
import java.io.DataOutputStream;
|
5
|
import java.io.IOException;
|
6
|
import java.util.Map;
|
7
|
|
8
|
import org.apache.commons.lang3.StringUtils;
|
9
|
import org.apache.commons.logging.Log;
|
10
|
import org.apache.commons.logging.LogFactory;
|
11
|
import org.apache.hadoop.hbase.client.Scan;
|
12
|
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
13
|
import org.apache.hadoop.hbase.util.Base64;
|
14
|
import org.dom4j.Document;
|
15
|
import org.dom4j.Node;
|
16
|
|
17
|
public class ScanFactory {
|
18
|
|
19
|
private static final Log log = LogFactory.getLog(ScanFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
|
20
|
|
21
|
public static String getScan(final ScanProperties scanProperties) throws IOException {
|
22
|
Scan scan = new Scan();
|
23
|
|
24
|
scan.setCaching(scanProperties.getCaching());
|
25
|
scan.setCacheBlocks(false); // don't set to true for MR jobs
|
26
|
|
27
|
scan.setFilter(scanProperties.getFilterList());
|
28
|
for (String family : scanProperties.getFamilies()) {
|
29
|
scan.addFamily(family.getBytes());
|
30
|
}
|
31
|
|
32
|
log.debug("serializing scan");
|
33
|
return convertScanToString(scan);
|
34
|
}
|
35
|
|
36
|
public static ScanProperties parseScanProperties(final Document doc, final Map<String, String> bbParams) {
|
37
|
log.debug("setting job scanner");
|
38
|
|
39
|
ScanProperties scanProperties = new ScanProperties(doc.valueOf("//FILTERS/@operator"));
|
40
|
|
41
|
String caching = doc.valueOf("//SCAN/@caching");
|
42
|
if (!StringUtils.isBlank(caching)) {
|
43
|
log.info("overriding default scan caching with: " + caching);
|
44
|
scanProperties.setCaching(Integer.valueOf(caching));
|
45
|
}
|
46
|
|
47
|
for (Object o : doc.selectNodes("//SCAN/FAMILIES/FAMILY")) {
|
48
|
Node node = (Node) o;
|
49
|
String value = node.valueOf("./@value");
|
50
|
if ((value == null) || value.isEmpty()) {
|
51
|
value = bbParams.get(node.valueOf("./@param"));
|
52
|
}
|
53
|
log.debug("scanner family value: " + value);
|
54
|
scanProperties.getFamilies().add(value);
|
55
|
}
|
56
|
for (Object o : doc.selectNodes("//SCAN/FILTERS/FILTER")) {
|
57
|
Node node = (Node) o;
|
58
|
String filterType = node.valueOf("./@type");
|
59
|
|
60
|
String value = node.valueOf("./@value");
|
61
|
if ((value == null) || value.isEmpty()) {
|
62
|
value = bbParams.get(node.valueOf("./@param"));
|
63
|
}
|
64
|
|
65
|
if (filterType.equals("prefix")) {
|
66
|
log.debug("scanner prefix filter, value: " + value);
|
67
|
scanProperties.getFilterList().addFilter(new PrefixFilter(value.getBytes()));
|
68
|
} // TODO add more filterType cases here
|
69
|
}
|
70
|
return scanProperties;
|
71
|
}
|
72
|
|
73
|
/**
|
74
|
* Writes the given scan into a Base64 encoded string.
|
75
|
*
|
76
|
* @param scan
|
77
|
* The scan to write out.
|
78
|
* @return The scan saved in a Base64 encoded string.
|
79
|
* @throws IOException
|
80
|
* When writing the scan fails.
|
81
|
*/
|
82
|
private static String convertScanToString(final Scan scan) throws IOException {
|
83
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
84
|
DataOutputStream dos = new DataOutputStream(out);
|
85
|
scan.write(dos);
|
86
|
return Base64.encodeBytes(out.toByteArray());
|
87
|
}
|
88
|
|
89
|
}
|