Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Set;
5

    
6
import org.apache.hadoop.hbase.client.Result;
7
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
8
import org.apache.hadoop.hbase.mapreduce.TableMapper;
9
import org.apache.hadoop.hbase.util.Bytes;
10
import org.apache.hadoop.io.Text;
11

    
12
import com.google.common.base.Function;
13
import com.google.common.collect.Iterables;
14
import com.google.common.collect.Sets;
15

    
16
import eu.dnetlib.data.mapreduce.util.DedupUtils;
17
import eu.dnetlib.data.mapreduce.util.OafDecoder;
18
import eu.dnetlib.data.proto.TypeProtos.Type;
19
import eu.dnetlib.pace.model.PersonComparatorUtils;
20

    
21
public class FindDedupCandidatePersonsMapper extends TableMapper<Text, Text> {
22

    
23
	private static final byte[] PERSON_CF = Type.person.toString().getBytes();
24
	private static final byte[] PERSONRESULT_CF = "personResult".getBytes();
25

    
26
	@Override
27
	protected void setup(final Context context) {
28

    
29
	}
30

    
31
	@Override
32
	protected void map(final ImmutableBytesWritable rowkey, final Result row, final Context context) throws IOException, InterruptedException {
33
		String id = Bytes.toString(rowkey.get());
34
		String fullname = extractFullname(row);
35
		Set<String> resultIds = extractResultIds(row);
36

    
37
		if (fullname != null) {
38
			Text text = (new DedupPersonBean(id, fullname, resultIds)).toText();
39
			for (String k : PersonComparatorUtils.getNgramsForPerson(fullname)) {
40
				context.write(new Text(k), text);
41
			}
42
		}
43
	}
44

    
45
	private Set<String> extractResultIds(final Result row) {
46
		return Sets.newHashSet(Iterables.transform(row.getFamilyMap(PERSONRESULT_CF).keySet(), new Function<byte[], String>() {
47

    
48
			@Override
49
			public String apply(final byte[] b) {
50
				return Bytes.toString(b);
51
			}
52
		}));
53
	}
54

    
55
	private String extractFullname(final Result row) {
56
		byte[] body = row.getValue(PERSON_CF, DedupUtils.BODY_B);
57
		if (body == null) return null;
58
		return OafDecoder.decode(body).getEntity().getPerson().getMetadata().getFullname().getValue();
59
	}
60
}
(18-18/23)