Project

General

Profile

« Previous | Next » 

Revision 45705

updated dedup workflows according to the new hbase data encoding

View differences:

DedupSimilarityToHdfsActionsMapper.java
1 1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2 2

  
3 3
import java.io.IOException;
4
import java.util.Map;
5
import java.util.Map.Entry;
6 4

  
7 5
import eu.dnetlib.data.actionmanager.actions.AtomicActionSerialiser;
8 6
import eu.dnetlib.data.mapreduce.JobParams;
7
import eu.dnetlib.data.mapreduce.util.dao.HBaseTableDAO;
9 8
import eu.dnetlib.pace.config.DedupConfig;
10 9
import eu.dnetlib.rmi.data.hadoop.actionmanager.Agent;
11 10
import eu.dnetlib.rmi.data.hadoop.actionmanager.actions.ActionFactory;
12 11
import eu.dnetlib.rmi.data.hadoop.actionmanager.actions.AtomicAction;
13
import org.apache.commons.collections.MapUtils;
14 12
import org.apache.commons.lang3.StringUtils;
15 13
import org.apache.commons.logging.Log;
16 14
import org.apache.commons.logging.LogFactory;
17 15
import org.apache.hadoop.hbase.client.Result;
18 16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
19 17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
20
import org.apache.hadoop.hbase.util.Bytes;
21 18
import org.apache.hadoop.io.Text;
22 19

  
23 20
/**
......
64 61
	@Override
65 62
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
66 63

  
67
		final Map<byte[], byte[]> sMap = value.getFamilyMap(Bytes.toBytes(similarityCF));
68
		if (MapUtils.isEmpty(sMap)) return;
69

  
70 64
		final Agent agent = new Agent("dedup", "Deduplication", Agent.AGENT_TYPE.algo);
71 65

  
72
		for (final Entry<byte[], byte[]> similarity : sMap.entrySet()) {
66
		HBaseTableDAO.getTargetIds(value, similarityCF).forEach(qualifier -> {
67
					final String targetKey = new String(key.copyBytes());
73 68

  
74
			final String targetKey = new String(key.copyBytes());
75
			final String qualifier = new String(similarity.getKey());
76
			final AtomicAction aa = actionFactory.createAtomicAction(rawSet, agent, targetKey, similarityCF, qualifier, null);
69
					final AtomicAction aa = actionFactory.createAtomicAction(rawSet, agent, targetKey, similarityCF, qualifier, null);
77 70

  
78
			keyOut.set(aa.getTargetRowKey()+"@"+aa.getTargetColumnFamily()+"@"+aa.getTargetColumn());
79
			valueOut.set(AtomicActionSerialiser.toJSON(aa));
71
					keyOut.set(aa.getTargetRowKey() + "@" + aa.getTargetColumnFamily() + "@" + aa.getTargetColumn());
72
					valueOut.set(AtomicActionSerialiser.toJSON(aa));
80 73

  
81
			context.write(keyOut, valueOut);
82
			context.getCounter(dedupConf.getWf().getEntityType(), "similarity2actions").increment(1);
83
		}
74
					try {
75
						context.write(keyOut, valueOut);
76
						context.getCounter(dedupConf.getWf().getEntityType(), "similarity2actions").increment(1);
77
					} catch (Exception e) {
78
						throw new RuntimeException(e);
79
					}
80
				}
81
		);
82
	}
84 83

  
85
	}
86 84
}

Also available in: Unified diff