Project

General

Profile

« Previous | Next » 

Revision 44130

align with latest dnet-actionmanager-common

View differences:

modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupSimilarityToActionsMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

  
3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Map.Entry;
6

  
7
import org.apache.commons.collections.MapUtils;
8
import org.apache.commons.lang.StringUtils;
9
import org.apache.commons.logging.Log;
10
import org.apache.commons.logging.LogFactory;
11
import org.apache.hadoop.hbase.client.Put;
12
import org.apache.hadoop.hbase.client.Result;
13
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
14
import org.apache.hadoop.hbase.mapreduce.TableMapper;
15
import org.apache.hadoop.hbase.util.Bytes;
16

  
17
import eu.dnetlib.actionmanager.actions.ActionFactory;
18
import eu.dnetlib.actionmanager.actions.AtomicAction;
19
import eu.dnetlib.actionmanager.common.Agent;
20
import eu.dnetlib.data.mapreduce.JobParams;
21
import eu.dnetlib.pace.config.DedupConfig;
22

  
23
public class DedupSimilarityToActionsMapper extends TableMapper<ImmutableBytesWritable, Put> {
24

  
25
	private static final Log log = LogFactory.getLog(DedupSimilarityToActionsMapper.class); // NOPMD by marko on 11/24/08 5:02 PM
26

  
27
	private static final String RAW_SET = "rawSetId";
28
	private static final String SIMILARITY_CF = "similarityCF";
29

  
30
	private String rawSet = null;
31

  
32
	private String similarityCF = null;
33

  
34
	private DedupConfig dedupConf = null;
35

  
36
	private ActionFactory actionFactory = null;
37

  
38
	@Override
39
	protected void setup(final Context context) throws IOException, InterruptedException {
40
		rawSet = context.getConfiguration().get(RAW_SET);
41
		if (StringUtils.isBlank(rawSet)) throw new IOException("Input parameter (" + RAW_SET + ") is missing or empty: '" + rawSet + "'");
42
		log.info("raw set: '" + rawSet + "'");
43

  
44
		similarityCF = context.getConfiguration().get(SIMILARITY_CF);
45
		if (StringUtils.isBlank(similarityCF)) throw new IOException("Input parameter (" + SIMILARITY_CF + ") is missing or empty: '" + similarityCF + "'");
46
		log.info("similarityCF: '" + similarityCF + "'");
47

  
48
		dedupConf = DedupConfig.load(context.getConfiguration().get(JobParams.DEDUP_CONF));
49

  
50
		log.info("wf conf: " + dedupConf.toString());
51

  
52
		actionFactory = new ActionFactory();
53
	}
54

  
55
	@Override
56
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
57

  
58
		final Map<byte[], byte[]> sMap = value.getFamilyMap(Bytes.toBytes(similarityCF));
59
		if (MapUtils.isEmpty(sMap)) return;
60

  
61
		final Agent agent = new Agent("dedup", "Deduplication", Agent.AGENT_TYPE.algo);
62

  
63
		for (final Entry<byte[], byte[]> similarity : sMap.entrySet()) {
64

  
65
			final String targetKey = new String(key.copyBytes());
66
			final String qualifier = new String(similarity.getKey());
67
			final AtomicAction aa = actionFactory.createAtomicAction(rawSet, agent, targetKey, similarityCF, qualifier, null);
68

  
69
			for (final Put put : aa.asPutOperations(null, null, null, null)) {
70

  
71
				context.write(new ImmutableBytesWritable(Bytes.toBytes(aa.getRowKey())), put);
72
				context.getCounter(dedupConf.getWf().getEntityType(), "similarity2actions").increment(1);
73
			}
74
		}
75

  
76
	}
77
}
modules/dnet-mapreduce-jobs/trunk/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupSimilarityToHdfsActionsMapper.java
76 76
			final AtomicAction aa = actionFactory.createAtomicAction(rawSet, agent, targetKey, similarityCF, qualifier, null);
77 77

  
78 78
			keyOut.set(aa.getTargetRowKey()+"@"+aa.getTargetColumnFamily()+"@"+aa.getTargetColumn());
79
			valueOut.set(aa.toString());
79
			valueOut.set(aa.toJSON());
80 80

  
81 81
			context.write(keyOut, valueOut);
82 82
			context.getCounter(dedupConf.getWf().getEntityType(), "similarity2actions").increment(1);

Also available in: Unified diff