1 |
26600
|
sandro.lab
|
package eu.dnetlib.data.hadoop.utils;
|
2 |
|
|
|
3 |
|
|
import java.io.ByteArrayOutputStream;
|
4 |
|
|
import java.io.DataOutputStream;
|
5 |
|
|
import java.io.IOException;
|
6 |
|
|
import java.util.Map;
|
7 |
|
|
|
8 |
30959
|
claudio.at
|
import org.apache.commons.lang.StringUtils;
|
9 |
26600
|
sandro.lab
|
import org.apache.commons.logging.Log;
|
10 |
|
|
import org.apache.commons.logging.LogFactory;
|
11 |
|
|
import org.apache.hadoop.hbase.client.Scan;
|
12 |
|
|
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
13 |
|
|
import org.apache.hadoop.hbase.util.Base64;
|
14 |
|
|
import org.dom4j.Document;
|
15 |
|
|
import org.dom4j.Node;
|
16 |
|
|
|
17 |
|
|
public class ScanFactory {
|
18 |
|
|
|
19 |
|
|
private static final Log log = LogFactory.getLog(ScanFactory.class); // NOPMD by marko on 11/24/08 5:02 PM
|
20 |
|
|
|
21 |
30959
|
claudio.at
|
public static String getScan(final ScanProperties scanProperties) throws IOException {
|
22 |
26600
|
sandro.lab
|
Scan scan = new Scan();
|
23 |
|
|
|
24 |
30959
|
claudio.at
|
scan.setCaching(scanProperties.getCaching());
|
25 |
26600
|
sandro.lab
|
scan.setCacheBlocks(false); // don't set to true for MR jobs
|
26 |
|
|
|
27 |
|
|
scan.setFilter(scanProperties.getFilterList());
|
28 |
|
|
for (String family : scanProperties.getFamilies()) {
|
29 |
|
|
scan.addFamily(family.getBytes());
|
30 |
|
|
}
|
31 |
|
|
|
32 |
|
|
log.debug("serializing scan");
|
33 |
|
|
return convertScanToString(scan);
|
34 |
|
|
}
|
35 |
|
|
|
36 |
30959
|
claudio.at
|
public static ScanProperties parseScanProperties(final Document doc, final Map<String, String> bbParams) {
|
37 |
26600
|
sandro.lab
|
log.debug("setting job scanner");
|
38 |
|
|
|
39 |
|
|
ScanProperties scanProperties = new ScanProperties(doc.valueOf("//FILTERS/@operator"));
|
40 |
30959
|
claudio.at
|
|
41 |
|
|
String caching = doc.valueOf("//SCAN/@caching");
|
42 |
|
|
if (!StringUtils.isBlank(caching)) {
|
43 |
|
|
log.info("overriding default scan caching with: " + caching);
|
44 |
|
|
scanProperties.setCaching(Integer.valueOf(caching));
|
45 |
|
|
}
|
46 |
|
|
|
47 |
26600
|
sandro.lab
|
for (Object o : doc.selectNodes("//SCAN/FAMILIES/FAMILY")) {
|
48 |
|
|
Node node = (Node) o;
|
49 |
|
|
String value = node.valueOf("./@value");
|
50 |
30959
|
claudio.at
|
if ((value == null) || value.isEmpty()) {
|
51 |
26600
|
sandro.lab
|
value = bbParams.get(node.valueOf("./@param"));
|
52 |
|
|
}
|
53 |
|
|
log.debug("scanner family value: " + value);
|
54 |
|
|
scanProperties.getFamilies().add(value);
|
55 |
|
|
}
|
56 |
|
|
for (Object o : doc.selectNodes("//SCAN/FILTERS/FILTER")) {
|
57 |
|
|
Node node = (Node) o;
|
58 |
|
|
String filterType = node.valueOf("./@type");
|
59 |
|
|
|
60 |
|
|
String value = node.valueOf("./@value");
|
61 |
30959
|
claudio.at
|
if ((value == null) || value.isEmpty()) {
|
62 |
26600
|
sandro.lab
|
value = bbParams.get(node.valueOf("./@param"));
|
63 |
|
|
}
|
64 |
|
|
|
65 |
|
|
if (filterType.equals("prefix")) {
|
66 |
|
|
log.debug("scanner prefix filter, value: " + value);
|
67 |
|
|
scanProperties.getFilterList().addFilter(new PrefixFilter(value.getBytes()));
|
68 |
|
|
} // TODO add more filterType cases here
|
69 |
|
|
}
|
70 |
|
|
return scanProperties;
|
71 |
|
|
}
|
72 |
|
|
|
73 |
|
|
/**
|
74 |
|
|
* Writes the given scan into a Base64 encoded string.
|
75 |
30959
|
claudio.at
|
*
|
76 |
26600
|
sandro.lab
|
* @param scan
|
77 |
|
|
* The scan to write out.
|
78 |
|
|
* @return The scan saved in a Base64 encoded string.
|
79 |
|
|
* @throws IOException
|
80 |
|
|
* When writing the scan fails.
|
81 |
|
|
*/
|
82 |
32295
|
claudio.at
|
private static String convertScanToString(final Scan scan) throws IOException {
|
83 |
26600
|
sandro.lab
|
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
84 |
|
|
DataOutputStream dos = new DataOutputStream(out);
|
85 |
|
|
scan.write(dos);
|
86 |
|
|
return Base64.encodeBytes(out.toByteArray());
|
87 |
|
|
}
|
88 |
|
|
|
89 |
|
|
}
|