Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.actions2;
2

    
3
import java.io.IOException;
4
import java.nio.ByteBuffer;
5
import java.util.Set;
6

    
7
import org.apache.hadoop.hbase.client.Delete;
8
import org.apache.hadoop.hbase.client.Result;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.hbase.mapreduce.TableMapper;
11
import org.apache.hadoop.hbase.util.Bytes;
12

    
13
import com.google.common.base.Splitter;
14
import com.google.common.collect.Iterables;
15
import com.google.common.collect.Sets;
16

    
17
import eu.dnetlib.miscutils.datetime.DateUtils;
18

    
19
public class GarbageActionsMapper extends TableMapper<ImmutableBytesWritable, Delete> {
20

    
21
	private static final String LATEST_RAW_SETS = "latestRawSets";
22

    
23
	private static final String GARBAGE_TIME_MARING = "garbageTimeMargin";
24

    
25
	private final Set<ByteBuffer> latestRawSets = Sets.newHashSet();
26

    
27
	private long dateLimit;
28

    
29
	private final long MAX_DATE_INTERVAL = 4 * 24 * 60 * 60 * 1000; // 4 days default value
30

    
31
	@Override
32
	protected void setup(final Context context) throws IOException, InterruptedException {
33

    
34
		long garbageTimeMargin = MAX_DATE_INTERVAL;
35
		try {
36
			garbageTimeMargin = Long.parseLong(context.getConfiguration().get(GARBAGE_TIME_MARING));
37
		} catch (NumberFormatException e) {}
38
		this.dateLimit = DateUtils.now() - garbageTimeMargin;
39
		System.out.println("dateLimit: " + dateLimit);
40

    
41
		final String s = context.getConfiguration().get(LATEST_RAW_SETS);
42
		if (s != null) {
43
			for (String set : Sets.newHashSet(Splitter.on(",").omitEmptyStrings().trimResults().split(s))) {
44
				latestRawSets.add(ByteBuffer.wrap(Bytes.toBytes(set)));
45
			}
46
		}
47

    
48
		if (latestRawSets.isEmpty()) { throw new IOException("Input parameter (" + LATEST_RAW_SETS + ") is missing or empty: " + s); }
49
	}
50

    
51
	@Override
52
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
53
		if (isInARecentRawSet(value)) { return; }
54

    
55
		context.getCounter("Actions", "N. Deletes").increment(1);
56
		context.write(key, new Delete(key.copyBytes()));
57
	}
58

    
59
	private boolean isInARecentRawSet(final Result value) {
60
		for (byte[] s1 : value.getFamilyMap(Bytes.toBytes("set")).keySet()) {
61
			if (isRecentRawSet(s1)) { return true; }
62
		}
63
		return false;
64
	}
65

    
66
	private boolean isRecentRawSet(final byte[] rawSet) {
67
		final String date = Iterables.getLast(Splitter.on("_").split(Bytes.toString(rawSet)));
68

    
69
		if (Long.parseLong(date) > this.dateLimit || latestRawSets.contains(ByteBuffer.wrap(rawSet))) { return true; }
70

    
71
		return false;
72
	}
73
}
(1-1/3)