Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.Map;
6

    
7
import org.apache.commons.collections.MapUtils;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.hbase.util.Bytes;
14

    
15
import com.google.common.collect.Iterables;
16

    
17
import eu.dnetlib.data.mapreduce.JobParams;
18
import eu.dnetlib.data.mapreduce.util.DedupUtils;
19
import eu.dnetlib.data.mapreduce.util.OafDecoder;
20
import eu.dnetlib.pace.config.DedupConfig;
21

    
22
public class DedupRootsToCsvMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
23

    
24
	/**
25
	 * logger.
26
	 */
27
	private static final Log log = LogFactory.getLog(DedupRootsToCsvMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
28

    
29
	private DedupConfig dedupConf;
30

    
31
	private ImmutableBytesWritable key;
32

    
33
	private ImmutableBytesWritable value;
34

    
35
	@Override
36
	protected void setup(final Context context) {
37
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
38
		System.out.println("dedup buildRoots mapper\nwf conf: " + dedupConf.toString());
39
		key = new ImmutableBytesWritable();
40
		value = new ImmutableBytesWritable();
41
	}
42

    
43
	@Override
44
	protected void map(final ImmutableBytesWritable rowkey, final Result result, final Context context) throws IOException, InterruptedException {
45

    
46
		if (DedupUtils.isRoot(rowkey)) {
47
			context.getCounter(dedupConf.getWf().getEntityType(), "root row skipped").increment(1);
48
			return;
49
		}
50

    
51
		final Map<byte[], byte[]> entityCf = result.getFamilyMap(Bytes.toBytes(dedupConf.getWf().getEntityType()));
52
		if (MapUtils.isEmpty(entityCf) && (entityCf.get(DedupUtils.BODY_B) == null)) {
53
			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
54
			return;
55
		}
56

    
57
		final Map<byte[], byte[]> mergedIn = result.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(dedupConf.getWf().getEntityType()));
58
		if (MapUtils.isEmpty(mergedIn)) {
59
			context.getCounter(dedupConf.getWf().getEntityType(), "missing mergedIn relationship").increment(1);
60
			return;
61
		}
62
		final String rootId = new String(Iterables.getOnlyElement(mergedIn.keySet()), Charset.forName("UTF-8"));
63
		final byte[] body = entityCf.get(DedupUtils.BODY_B);
64

    
65
		key.set(Bytes.toBytes(rootId));
66
		value.set(Bytes.toBytes(Iterables.getOnlyElement(OafDecoder.decode(body).getEntity().getOriginalIdList())));
67
		context.write(key, value);
68
		context.getCounter(dedupConf.getWf().getEntityType(), "root entity").increment(1);
69

    
70
	}
71

    
72
}
(11-11/16)