Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Collection;
5
import java.util.List;
6
import java.util.Map;
7

    
8
import com.google.common.collect.Maps;
9
import eu.dnetlib.data.mapreduce.JobParams;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.proto.OafProtos.OafEntity;
13
import eu.dnetlib.data.proto.TypeProtos.Type;
14
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
15
import eu.dnetlib.pace.config.DedupConfig;
16
import eu.dnetlib.pace.model.MapDocument;
17
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
18
import org.apache.commons.logging.Log;
19
import org.apache.commons.logging.LogFactory;
20
import org.apache.hadoop.hbase.client.Result;
21
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
22
import org.apache.hadoop.hbase.mapreduce.TableMapper;
23
import org.apache.hadoop.hbase.util.Bytes;
24
import org.apache.hadoop.io.Text;
25

    
26
public class DedupMapper extends TableMapper<Text, ImmutableBytesWritable> {
27

    
28
	private static final Log log = LogFactory.getLog(DedupMapper.class);
29

    
30
	private DedupConfig dedupConf;
31

    
32
	private Map<String, List<String>> blackListMap = Maps.newHashMap();
33

    
34
	private Text outKey;
35

    
36
	private ImmutableBytesWritable ibw;
37

    
38
	@Override
39
	protected void setup(final Context context) throws IOException, InterruptedException {
40

    
41
		final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF);
42

    
43
		log.info("pace conf strings");
44
		log.info("pace conf: " + dedupConfJson);
45

    
46
		dedupConf = DedupConfig.load(dedupConfJson);
47

    
48
		blackListMap = dedupConf.getPace().getBlacklists();
49

    
50
		outKey = new Text();
51
		ibw = new ImmutableBytesWritable();
52

    
53
		log.info("pace conf");
54
		log.info("entity type: " + dedupConf.getWf().getEntityType());
55
		log.info("clustering: " + dedupConf.getPace().getClustering());
56
		log.info("conditions: " + dedupConf.getPace().getConditions());
57
		log.info("fields: " + dedupConf.getPace().getModel());
58
		log.info("blacklists: " + blackListMap);
59
		log.info("wf conf: " + dedupConf.toString());
60
	}
61

    
62
	@Override
63
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
64
		// log.info("got key: " + new String(keyIn.copyBytes()));
65

    
66
		final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B);
67

    
68
		if (body != null) {
69

    
70
			final OafDecoder decoder = OafDecoder.decode(body);
71
			if (decoder.getOaf().getDataInfo().getDeletedbyinference()) {
72
				context.getCounter(dedupConf.getWf().getEntityType(), "deleted by inference").increment(1);
73
				return;
74
			}
75

    
76
			final OafEntity entity = decoder.getEntity();
77

    
78
			context.getCounter(entity.getType().toString(), "decoded").increment(1);
79

    
80
			if (entity.getType().equals(Type.valueOf(dedupConf.getWf().getEntityType()))) {
81

    
82
				// TODO: remove this hack - here because we want to dedup only publications and organizazions
83
				if (shouldDedup(entity)) {
84
					final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel());
85
					emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap));
86
				}
87
			}
88
		} else {
89
			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
90
		}
91
	}
92

    
93
	private boolean shouldDedup(final OafEntity entity) {
94
		return (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("publication")) |
95
				entity.getType().equals(Type.organization);
96
	}
97

    
98
	private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException {
99
		for (final String ngram : ngrams) {
100
			outKey.set(ngram);
101
			ibw.set(doc.toByteArray());
102
			context.write(outKey, ibw);
103
		}
104
	}
105

    
106
}
(7-7/16)