Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.reset;
2

    
3
import java.io.IOException;
4

    
5
import eu.dnetlib.data.mapreduce.JobParams;
6
import org.apache.hadoop.hbase.KeyValue;
7
import org.apache.hadoop.hbase.client.Delete;
8
import org.apache.hadoop.hbase.client.Mutation;
9
import org.apache.hadoop.hbase.client.Put;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.hbase.util.Bytes;
14

    
15
import com.google.common.collect.Lists;
16
import com.google.protobuf.GeneratedMessage;
17
import com.google.protobuf.InvalidProtocolBufferException;
18

    
19
import eu.dnetlib.data.mapreduce.hbase.VolatileColumnFamily;
20
import eu.dnetlib.data.mapreduce.util.DedupUtils;
21
import eu.dnetlib.data.proto.FieldTypeProtos.DataInfo;
22
import eu.dnetlib.data.proto.OafProtos.Oaf;
23
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
24

    
25
public class HBaseResetMapper extends TableMapper<ImmutableBytesWritable, Mutation> {
26

    
27
	@Override
28
	protected void setup(final Context context) throws IOException, InterruptedException {
29
		super.setup(context);
30

    
31
		String table = null;
32
		for (final String s : Lists.newArrayList("hbase.mapreduce.inputtable", "hbase.mapred.inputtable")) {
33
			try {
34
				table = context.getConfiguration().get(s).trim();
35
			} catch (final NullPointerException e) {}
36
		}
37
		if (table == null) throw new IllegalStateException("unable to find table name");
38
		System.out.println("I start to reset table '" + table + "'");
39
	}
40

    
41
	@Override
42
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
43
		// System.out.println("EVALUATING " + Bytes.toString(key.get()));
44
		final byte[] bKey = key.copyBytes();
45

    
46
		if (DedupUtils.isRoot(new String(bKey))) {
47
			deleteRow(key, context);
48
			return;
49
		}
50

    
51
		for (final KeyValue kv : value.list()) {
52
			if (Bytes.toString(kv.getQualifier()).equals(DedupUtils.BODY_S) || isColumn(kv.getFamily())) {
53
				final Oaf oaf = Oaf.parseFrom(kv.getValue());
54
				if (isColumnToUpdate(oaf)) {
55
					updateColumn(key, kv, resetOaf(oaf), context);
56
				}
57
			} else if (isColumnToDelete(kv)) {
58
				deleteColumn(key, kv, context);
59
			}
60
		}
61
	}
62

    
63
	private boolean isColumn(final byte[] family) {
64
		try {
65
			final String s = Bytes.toString(family);
66
			return !s.contains(SubRelType.dedup.toString()) && !s.contains(SubRelType.dedupSimilarity.toString()) && !VolatileColumnFamily.isVolatile(s)
67
					&& (DedupUtils.decodeCF(family) != null);
68
		} catch (final IllegalArgumentException e) {
69
			return false;
70
		}
71
	}
72

    
73
	private boolean isColumnToUpdate(final Oaf oaf) {
74
		return oaf.getDataInfo().hasDeletedbyinference() && oaf.getDataInfo().getDeletedbyinference();
75
	}
76

    
77
	private Oaf resetOaf(final Oaf oaf) {
78
		// I reset deletedbyinference
79
		final DataInfo.Builder dataInfoBuilder = DataInfo.newBuilder(oaf.getDataInfo());
80
		dataInfoBuilder.setDeletedbyinference(false).setInferenceprovenance("");
81
		final Oaf.Builder oafBuilder = Oaf.newBuilder(oaf);
82
		oafBuilder.setDataInfo(dataInfoBuilder);
83
		return oafBuilder.build();
84
	}
85

    
86
	private boolean isColumnToDelete(final KeyValue kv) throws InvalidProtocolBufferException {
87
		// Columns in "dedupRel" and "similarRel" column families and with the attribute inferred=true must be deleted
88
		final String cf = Bytes.toString(kv.getFamily());
89

    
90
		if (cf.contains(SubRelType.dedup.toString()) || cf.contains(SubRelType.dedupSimilarity.toString()) || VolatileColumnFamily.isVolatile(cf)) return true;
91

    
92
		final Oaf oaf = Oaf.parseFrom(kv.getValue());
93
		return oaf.getDataInfo().hasInferred() && oaf.getDataInfo().getInferred();
94
	}
95

    
96
	private void updateColumn(final ImmutableBytesWritable key, final KeyValue col, final GeneratedMessage newValue, final Context context) throws IOException,
97
	InterruptedException {
98
		final byte[] bKey = key.copyBytes();
99
		final Put put = new Put(bKey);
100
		put.add(col.getFamily(), col.getQualifier(), newValue.toByteArray());
101
		put.setWriteToWAL(JobParams.WRITE_TO_WAL);
102
		context.write(key, put);
103
		context.getCounter("reset job", "update body").increment(1);
104
		// System.out.println("   --- UPDATED ROW key=" + Bytes.toString(bKey));
105
	}
106

    
107
	private void deleteRow(final ImmutableBytesWritable key, final Context context) throws IOException, InterruptedException {
108
		final byte[] bKey = key.copyBytes();
109
		final Delete d = new Delete(bKey);
110
		d.setWriteToWAL(JobParams.WRITE_TO_WAL);
111
		context.write(key, d);
112
		context.getCounter("reset job", "delete row").increment(1);
113
		// System.out.println("   --- DELETED ROW key=" + Bytes.toString(bKey));
114
	}
115

    
116
	private void deleteColumn(final ImmutableBytesWritable key, final KeyValue col, final Context context) throws IOException, InterruptedException {
117
		final byte[] bKey = key.copyBytes();
118
		final Delete d = new Delete(bKey);
119
		d.deleteColumns(col.getFamily(), col.getQualifier());
120
		d.setWriteToWAL(JobParams.WRITE_TO_WAL);
121
		context.write(key, d);
122
		context.getCounter("reset job", "delete column").increment(1);
123
		// System.out.println("   --- DELETED COLUMN key=" + Bytes.toString(bKey) + " cf=" + Bytes.toString(col.getFamily()) + ", q="
124
		// + Bytes.toString(col.getQualifier()));
125
	}
126
}
    (1-1/1)