Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Map.Entry;
6

    
7
import eu.dnetlib.actionmanager.actions.ActionFactory;
8
import eu.dnetlib.actionmanager.actions.AtomicAction;
9
import eu.dnetlib.actionmanager.common.Agent;
10
import eu.dnetlib.data.mapreduce.JobParams;
11
import eu.dnetlib.pace.config.DedupConfig;
12
import org.apache.commons.collections.MapUtils;
13
import org.apache.commons.lang.StringUtils;
14
import org.apache.commons.logging.Log;
15
import org.apache.commons.logging.LogFactory;
16
import org.apache.hadoop.hbase.client.Put;
17
import org.apache.hadoop.hbase.client.Result;
18
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19
import org.apache.hadoop.hbase.mapreduce.TableMapper;
20
import org.apache.hadoop.hbase.util.Bytes;
21
import org.apache.hadoop.io.Text;
22

    
23
/**
24
 * Created by claudio on 28/04/16.
25
 */
26
public class DedupSimilarityToHdfsActionsMapper extends TableMapper<Text, Text> {
27

    
28
	private static final Log log = LogFactory.getLog(DedupSimilarityToHdfsActionsMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
29

    
30
	private static final String RAW_SET = "rawSetId";
31
	private static final String SIMILARITY_CF = "similarityCF";
32

    
33
	private String rawSet = null;
34

    
35
	private String similarityCF = null;
36

    
37
	private DedupConfig dedupConf = null;
38

    
39
	private ActionFactory actionFactory = null;
40

    
41
	private Text keyOut;
42
	private Text valueOut;
43

    
44
	@Override
45
	protected void setup(final Context context) throws IOException, InterruptedException {
46
		rawSet = context.getConfiguration().get(RAW_SET);
47
		if (StringUtils.isBlank(rawSet)) throw new IOException("Input parameter (" + RAW_SET + ") is missing or empty: '" + rawSet + "'");
48
		log.info("raw set: '" + rawSet + "'");
49

    
50
		similarityCF = context.getConfiguration().get(SIMILARITY_CF);
51
		if (StringUtils.isBlank(similarityCF)) throw new IOException("Input parameter (" + SIMILARITY_CF + ") is missing or empty: '" + similarityCF + "'");
52
		log.info("similarityCF: '" + similarityCF + "'");
53

    
54
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
55

    
56
		log.info("wf conf: " + dedupConf.toString());
57

    
58
		actionFactory = new ActionFactory();
59

    
60
		keyOut = new Text();
61
		valueOut = new Text();
62
	}
63

    
64
	@Override
65
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
66

    
67
		final Map<byte[], byte[]> sMap = value.getFamilyMap(Bytes.toBytes(similarityCF));
68
		if (MapUtils.isEmpty(sMap)) return;
69

    
70
		final Agent agent = new Agent("dedup", "Deduplication", Agent.AGENT_TYPE.algo);
71

    
72
		for (final Entry<byte[], byte[]> similarity : sMap.entrySet()) {
73

    
74
			final String targetKey = new String(key.copyBytes());
75
			final String qualifier = new String(similarity.getKey());
76
			final AtomicAction aa = actionFactory.createAtomicAction(rawSet, agent, targetKey, similarityCF, qualifier, null);
77

    
78
			keyOut.set(aa.getTargetRowKey()+"@"+aa.getTargetColumnFamily()+"@"+aa.getTargetColumn());
79
			valueOut.set(aa.toJSON());
80

    
81
			context.write(keyOut, valueOut);
82
			context.getCounter(dedupConf.getWf().getEntityType(), "similarity2actions").increment(1);
83
		}
84

    
85
	}
86
}
(13-13/16)