Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.List;
5

    
6
import eu.dnetlib.data.mapreduce.JobParams;
7
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
8
import eu.dnetlib.data.proto.DNGFProtos.DNGF;
9
import eu.dnetlib.data.proto.FieldTypeProtos;
10
import eu.dnetlib.data.proto.TypeProtos.Type;
11
import eu.dnetlib.pace.config.DedupConfig;
12
import org.apache.commons.logging.Log;
13
import org.apache.commons.logging.LogFactory;
14
import org.apache.hadoop.hbase.client.Put;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18
import static eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO.getMetadata;
19

    
20
public class DedupMarkDeletedEntityMapper extends TableMapper<ImmutableBytesWritable, Put> {
21

    
22
	private static final Log log = LogFactory.getLog(DedupMarkDeletedEntityMapper.class);
23
    private static final String DEFAULT_PROVENANCE = "dnet";
24

    
25
	private DedupConfig dedupConf;
26

    
27
	@Override
28
	protected void setup(final Context context) throws IOException, InterruptedException {
29
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
30
		log.info("dedup findRoots mapper\nwf conf: " + dedupConf.toString());
31
	}
32

    
33
	@Override
34
	protected void map(final ImmutableBytesWritable rowkey, final Result value, final Context context) throws IOException, InterruptedException {
35
		// System.out.println("Find root mapping: " + new String(rowkey.copyBytes()));
36

    
37
		final Type type = Type.valueOf(dedupConf.getWf().getEntityType());
38
		final List<String> mergedIn = HBaseTableDAO.getTargetIds(value, "isMergedIn");
39

    
40
		if ((mergedIn != null) && !mergedIn.isEmpty()) {
41

    
42
			final byte[] row = rowkey.copyBytes();
43

    
44
			// marks the original body deleted
45
			emitBody(context, row, getMetadata(value, type));
46

    
47
		} else {
48
			context.getCounter(type.toString(), "row not merged").increment(1);
49
		}
50
	}
51

    
52
	private void emitBody(final Context context, final byte[] row, final DNGF prototype) throws IOException, InterruptedException {
53
		final String type = dedupConf.getWf().getEntityType();
54
		if (prototype == null) {
55
			context.getCounter(type, "missing body").increment(1);
56
			System.err.println("missing body: " + new String(row));
57
			return;
58
		}
59

    
60
		if (prototype.getDataInfo().getDeletedbyinference()) {
61
			context.getCounter(type, "bodies already deleted").increment(1);
62
		} else {
63
			final DNGF.Builder oafRoot = DNGF.newBuilder(prototype);
64
			oafRoot.getDataInfoBuilder().setDeletedbyinference(true).setInferred(true).setInferenceprovenance(dedupConf.getWf().getConfigurationId());
65

    
66
            if (!oafRoot.getDataInfoBuilder().hasProvenanceaction()) {
67
                context.getCounter(type, "missing provenance, setting " + DEFAULT_PROVENANCE).increment(1);
68
                oafRoot.getDataInfoBuilder()
69
                        .setProvenanceaction(FieldTypeProtos.Qualifier.newBuilder()
70
                                .setClassname(DEFAULT_PROVENANCE).setClassid(DEFAULT_PROVENANCE).setSchemename(DEFAULT_PROVENANCE).setSchemeid(DEFAULT_PROVENANCE));
71
            }
72

    
73
			context.write(new ImmutableBytesWritable(row), HBaseTableDAO.asPut(oafRoot.build()));
74
			context.getCounter(type, "bodies marked deleted").increment(1);
75
		}
76
	}
77

    
78
}
(11-11/22)