Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4

    
5
import org.apache.hadoop.hbase.client.Result;
6
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
7
import org.apache.hadoop.hbase.mapreduce.TableMapper;
8
import org.apache.hadoop.io.Text;
9

    
10
import eu.dnetlib.data.mapreduce.JobParams;
11
import eu.dnetlib.data.mapreduce.util.DedupUtils;
12
import eu.dnetlib.data.mapreduce.util.OafDecoder;
13
import eu.dnetlib.pace.config.DedupConfig;
14
import eu.dnetlib.pace.model.Person;
15

    
16
public class SimpleDedupPersonMapper extends TableMapper<Text, ImmutableBytesWritable> {
17

    
18
	private DedupConfig dedupConf;
19

    
20
	private Text rowKey;
21

    
22
	private ImmutableBytesWritable ibw;
23

    
24
	@Override
25
	protected void setup(final Context context) throws IOException, InterruptedException {
26
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
27
		rowKey = new Text();
28
		ibw = new ImmutableBytesWritable();
29
	}
30

    
31
	@Override
32
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
33
		// System.out.println("got key: " + new String(keyIn.copyBytes()));
34

    
35
		if (DedupUtils.isRoot(new String(keyIn.copyBytes()))) {
36
			context.getCounter(dedupConf.getWf().getEntityType(), "roots skipped").increment(1);
37
			return;
38
		}
39
		final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B);
40

    
41
		if (body != null) {
42
			try {
43
				final OafDecoder decoder = OafDecoder.decode(body);
44

    
45
				final String hash = new Person(decoder.getEntity().getPerson().getMetadata().getFullname().getValue(), false).hash();
46
				// String hash = new Person(getPersonName(decoder), true).hash();
47

    
48
				rowKey.set(hash);
49
				ibw.set(body);
50
				context.write(rowKey, ibw);
51

    
52
			} catch (final Throwable e) {
53
				System.out.println("GOT EX " + e);
54
				e.printStackTrace(System.err);
55
				context.getCounter(dedupConf.getWf().getEntityType(), e.getClass().toString()).increment(1);
56
			}
57
		} else {
58
			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
59
		}
60
	}
61

    
62
	// private String getPersonName(OafDecoder decoder) {
63
	// Metadata m = decoder.getEntity().getPerson().getMetadata();
64
	// String secondnames = Joiner.on(" ").join(m.getSecondnamesList());
65
	//
66
	// return isValid(m.getFullname()) ? m.getFullname() : (secondnames + ", " + m.getFirstname());
67
	// }
68

    
69
	// private boolean isValid(String fullname) {
70
	// return fullname != null && !fullname.isEmpty();
71
	// }
72

    
73
}
(21-21/22)