Project

General

Profile

1 36164 claudio.at
package eu.dnetlib.data.mapreduce.hbase.dedup;
2
3
import java.io.IOException;
4
import java.nio.charset.Charset;
5
import java.util.Map;
6
7
import org.apache.commons.collections.MapUtils;
8
import org.apache.commons.logging.Log;
9
import org.apache.commons.logging.LogFactory;
10
import org.apache.hadoop.hbase.client.Result;
11
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
12
import org.apache.hadoop.hbase.mapreduce.TableMapper;
13
import org.apache.hadoop.hbase.util.Bytes;
14
15 36796 claudio.at
import com.google.common.collect.Iterables;
16 36164 claudio.at
17
import eu.dnetlib.data.mapreduce.JobParams;
18
import eu.dnetlib.data.mapreduce.util.DedupUtils;
19
import eu.dnetlib.data.mapreduce.util.OafDecoder;
20 36670 claudio.at
import eu.dnetlib.pace.config.DedupConfig;
21 36164 claudio.at
22
public class DedupRootsToCsvMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
23
24
	/**
25
	 * logger.
26
	 */
27
	private static final Log log = LogFactory.getLog(DedupRootsToCsvMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
28
29
	private DedupConfig dedupConf;
30
31
	private ImmutableBytesWritable key;
32
33
	private ImmutableBytesWritable value;
34
35
	@Override
36
	protected void setup(final Context context) {
37 36670 claudio.at
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
38 36164 claudio.at
		System.out.println("dedup buildRoots mapper\nwf conf: " + dedupConf.toString());
39
		key = new ImmutableBytesWritable();
40
		value = new ImmutableBytesWritable();
41
	}
42
43
	@Override
44
	protected void map(final ImmutableBytesWritable rowkey, final Result result, final Context context) throws IOException, InterruptedException {
45
46 36796 claudio.at
		if (DedupUtils.isRoot(rowkey)) {
47
			context.getCounter(dedupConf.getWf().getEntityType(), "root row skipped").increment(1);
48
			return;
49
		}
50 36164 claudio.at
51 36670 claudio.at
		final Map<byte[], byte[]> entityCf = result.getFamilyMap(Bytes.toBytes(dedupConf.getWf().getEntityType()));
52 36164 claudio.at
		if (MapUtils.isEmpty(entityCf) && (entityCf.get(DedupUtils.BODY_B) == null)) {
53 36670 claudio.at
			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
54 36164 claudio.at
			return;
55
		}
56
57 36796 claudio.at
		final Map<byte[], byte[]> mergedIn = result.getFamilyMap(DedupUtils.getDedupCF_mergedInBytes(dedupConf.getWf().getEntityType()));
58
		if (MapUtils.isEmpty(mergedIn)) {
59
			context.getCounter(dedupConf.getWf().getEntityType(), "missing mergedIn relationship").increment(1);
60
			return;
61
		}
62
		final String rootId = new String(Iterables.getOnlyElement(mergedIn.keySet()), Charset.forName("UTF-8"));
63 36164 claudio.at
		final byte[] body = entityCf.get(DedupUtils.BODY_B);
64
65 36796 claudio.at
		key.set(Bytes.toBytes(rootId));
66
		value.set(Bytes.toBytes(Iterables.getOnlyElement(OafDecoder.decode(body).getEntity().getOriginalIdList())));
67 36164 claudio.at
		context.write(key, value);
68 36670 claudio.at
		context.getCounter(dedupConf.getWf().getEntityType(), "root entity").increment(1);
69 36796 claudio.at
70 36164 claudio.at
	}
71
72
}