Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dataimport;
2

    
3
import java.io.IOException;
4
import java.util.Set;
5

    
6
import org.apache.hadoop.hbase.KeyValue;
7
import org.apache.hadoop.hbase.client.Delete;
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12

    
13
import com.google.common.base.Splitter;
14
import com.google.common.collect.Sets;
15

    
16
import eu.dnetlib.data.mapreduce.JobParams;
17

    
18
public class DeleteRecordsMapper extends TableMapper<ImmutableBytesWritable, Delete> {
19

    
20
	private Set<String> delPrefix;
21

    
22
	@Override
23
	protected void setup(Context context) throws IOException, InterruptedException {
24
		super.setup(context);
25
		delPrefix = extractPrefixes(context.getConfiguration().get("nsPrefix"));
26

    
27
		String table = context.getConfiguration().get(JobParams.HBASE_SOURCE_TABLE).trim();
28

    
29
		System.out.println("I start to delete records (table=" + table + ", prefixes=" + delPrefix);
30
	}
31

    
32
	@Override
33
	protected void map(ImmutableBytesWritable key, Result value, Context context) throws IOException, InterruptedException {
34
		byte[] bkey = key.get();
35

    
36
		System.out.println("EVALUATING " + Bytes.toString(bkey));
37

    
38
		if (testValue(bkey)) { // If key contains repoId I delete the row
39
			context.write(key, new Delete(bkey));
40
			System.out.println("   --- DELETED");
41
			context.getCounter("MDStore Cleaner", "Deleted records").increment(1);
42
		} else { // otherwise I try to delete the columns containing repoId 
43
			for (KeyValue kv : value.list()) {
44
				byte[] col = kv.getQualifier();
45
				if (testValue(col)) {
46
					Delete d = new Delete(bkey);
47
					d.deleteColumns(kv.getFamily(), col);
48
					context.write(key, d);
49
					System.out.println("   --- DELETED COLUMN: " + Bytes.toString(col));
50
					context.getCounter("MDStore Cleaner", "Deleted columns").increment(1);
51
				}
52
			}
53
		}
54
	}
55

    
56
	private Set<String> extractPrefixes(String s) {
57
		return Sets.newHashSet(Splitter.on(",").trimResults().omitEmptyStrings().split(s));
58
	}
59

    
60
	private boolean testValue(byte[] bkey) {
61
		String s = Bytes.toString(bkey);
62

    
63
		if (s.contains("|") && s.contains("::")) {
64
			return delPrefix.contains(s.substring(s.indexOf("|") + 1, s.indexOf("::")));
65
		} else if (s.contains("::")) {
66
			return delPrefix.contains(s.substring(0, s.indexOf("::")));
67
		} else {
68
			return false;
69
		}
70

    
71
	}
72
}
(4-4/15)