Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.reset;
2

    
3
import java.io.IOException;
4

    
5
import org.apache.hadoop.hbase.KeyValue;
6
import org.apache.hadoop.hbase.client.Delete;
7
import org.apache.hadoop.hbase.client.Mutation;
8
import org.apache.hadoop.hbase.client.Put;
9
import org.apache.hadoop.hbase.client.Result;
10
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
11
import org.apache.hadoop.hbase.mapreduce.TableMapper;
12
import org.apache.hadoop.hbase.util.Bytes;
13

    
14
import com.google.common.collect.Lists;
15
import com.google.protobuf.GeneratedMessage;
16
import com.google.protobuf.InvalidProtocolBufferException;
17

    
18
import eu.dnetlib.data.mapreduce.hbase.HBaseTableUtils.VolatileColumnFamily;
19
import eu.dnetlib.data.mapreduce.util.DedupUtils;
20
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
21
import eu.dnetlib.data.proto.OafProtos.Oaf;
22
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
23

    
24
public class HBaseResetMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
25

    
26
	private static final boolean WRITE_TO_WAL = false;
27

    
28
	@Override
29
	protected void setup(final Context context) throws IOException, InterruptedException {
30
		super.setup(context);
31

    
32
		String table = null;
33
		for (String s : Lists.newArrayList("hbase.mapreduce.inputtable", "hbase.mapred.inputtable")) {
34
			try {
35
				table = context.getConfiguration().get(s).trim();
36
			} catch (NullPointerException e) {}
37
		}
38
		if (table == null) throw new IllegalStateException("unable to find table name");
39
		System.out.println("I start to reset table '" + table + "'");
40
	}
41

    
42
	@Override
43
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
44
		// System.out.println("EVALUATING " + Bytes.toString(key.get()));
45
		byte[] bKey = key.copyBytes();
46

    
47
		if (DedupUtils.isRoot(new String(bKey))) {
48
			deleteRow(key, context);
49
			return;
50
		}
51

    
52
		for (KeyValue kv : value.list()) {
53
			if (Bytes.toString(kv.getQualifier()).equals(DedupUtils.BODY_S) || isColumn(kv.getFamily())) {
54
				Oaf oaf = Oaf.parseFrom(kv.getValue());
55
				if (isColumnToUpdate(oaf)) {
56
					updateColumn(key, kv, resetOaf(oaf), context);
57
				}
58
			} else if (isColumnToDelete(kv)) {
59
				deleteColumn(key, kv, context);
60
			}
61
		}
62
	}
63

    
64
	private boolean isColumn(final byte[] family) {
65
		try {
66
			String s = Bytes.toString(family);
67
			return !s.contains(SubRelType.dedup.toString()) && !s.contains(SubRelType.dedupSimilarity.toString()) && !VolatileColumnFamily.isVolatile(s)
68
					&& (DedupUtils.decodeCF(family) != null);
69
		} catch (IllegalArgumentException e) {
70
			return false;
71
		}
72
	}
73

    
74
	private boolean isColumnToUpdate(final Oaf oaf) {
75
		return oaf.getDataInfo().hasDeletedbyinference() && oaf.getDataInfo().getDeletedbyinference();
76
	}
77

    
78
	private Oaf resetOaf(final Oaf oaf) {
79
		// I reset deletedbyinference
80
		DataInfo.Builder dataInfoBuilder = DataInfo.newBuilder(oaf.getDataInfo());
81
		dataInfoBuilder.setDeletedbyinference(false).setInferenceprovenance("");
82
		Oaf.Builder oafBuilder = Oaf.newBuilder(oaf);
83
		oafBuilder.setDataInfo(dataInfoBuilder);
84
		return oafBuilder.build();
85
	}
86

    
87
	private boolean isColumnToDelete(final KeyValue kv) throws InvalidProtocolBufferException {
88
		// Columns in "dedupRel" and "similarRel" column families and with the attribute inferred=true must be deleted
89
		final String cf = Bytes.toString(kv.getFamily());
90

    
91
		if (cf.contains(SubRelType.dedup.toString()) || cf.contains(SubRelType.dedupSimilarity.toString()) || VolatileColumnFamily.isVolatile(cf)) return true;
92

    
93
		final Oaf oaf = Oaf.parseFrom(kv.getValue());
94
		return oaf.getDataInfo().hasInferred() && oaf.getDataInfo().getInferred();
95
	}
96

    
97
	private void updateColumn(final ImmutableBytesWritable key, final KeyValue col, final GeneratedMessage newValue, final Context context) throws IOException,
98
			InterruptedException {
99
		byte[] bKey = key.copyBytes();
100
		Put put = new Put(bKey);
101
		put.add(col.getFamily(), col.getQualifier(), newValue.toByteArray());
102
		put.setWriteToWAL(WRITE_TO_WAL);
103
		context.write(key, put);
104
		context.getCounter("reset job", "update body").increment(1);
105
		// System.out.println("   --- UPDATED ROW key=" + Bytes.toString(bKey));
106
	}
107

    
108
	private void deleteRow(final ImmutableBytesWritable key, final Context context) throws IOException, InterruptedException {
109
		byte[] bKey = key.copyBytes();
110
		Delete d = new Delete(bKey);
111
		d.setWriteToWAL(WRITE_TO_WAL);
112
		context.write(key, d);
113
		context.getCounter("reset job", "delete row").increment(1);
114
		// System.out.println("   --- DELETED ROW key=" + Bytes.toString(bKey));
115
	}
116

    
117
	private void deleteColumn(final ImmutableBytesWritable key, final KeyValue col, final Context context) throws IOException, InterruptedException {
118
		byte[] bKey = key.copyBytes();
119
		Delete d = new Delete(bKey);
120
		d.deleteColumns(col.getFamily(), col.getQualifier());
121
		d.setWriteToWAL(WRITE_TO_WAL);
122
		context.write(key, d);
123
		context.getCounter("reset job", "delete column").increment(1);
124
		// System.out.println("   --- DELETED COLUMN key=" + Bytes.toString(bKey) + " cf=" + Bytes.toString(col.getFamily()) + ", q="
125
		// + Bytes.toString(col.getQualifier()));
126
	}
127
}
(2-2/2)