Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.dedup;
2

    
3
import java.io.IOException;
4
import java.util.Collection;
5
import java.util.List;
6
import java.util.Map;
7

    
8
import com.google.common.collect.Maps;
9
import eu.dnetlib.data.mapreduce.JobParams;
10
import eu.dnetlib.data.mapreduce.util.DedupUtils;
11
import eu.dnetlib.data.mapreduce.util.OafDecoder;
12
import eu.dnetlib.data.proto.OafProtos.OafEntity;
13
import eu.dnetlib.data.proto.TypeProtos.Type;
14
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
15
import eu.dnetlib.pace.config.DedupConfig;
16
import eu.dnetlib.pace.config.WfConfig;
17
import eu.dnetlib.pace.model.Field;
18
import eu.dnetlib.pace.model.MapDocument;
19
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
20
import org.apache.commons.logging.Log;
21
import org.apache.commons.logging.LogFactory;
22
import org.apache.hadoop.hbase.client.Result;
23
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
24
import org.apache.hadoop.hbase.mapreduce.TableMapper;
25
import org.apache.hadoop.hbase.util.Bytes;
26
import org.apache.hadoop.io.Text;
27
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
28

    
29
public class DedupMapper extends TableMapper<Text, ImmutableBytesWritable> {
30

    
31
	private static final Log log = LogFactory.getLog(DedupMapper.class);
32

    
33
	private DedupConfig dedupConf;
34

    
35
	private Map<String, List<String>> blackListMap = Maps.newHashMap();
36

    
37
	private Text outKey;
38

    
39
	private ImmutableBytesWritable ibw;
40

    
41
	@Override
42
	protected void setup(final Context context) throws IOException, InterruptedException {
43

    
44
		final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF);
45

    
46
		log.info("pace conf strings");
47
		log.info("pace conf: " + dedupConfJson);
48

    
49
		dedupConf = DedupConfig.load(dedupConfJson);
50

    
51
		blackListMap = dedupConf.getPace().getBlacklists();
52

    
53
		outKey = new Text();
54
		ibw = new ImmutableBytesWritable();
55

    
56
		//log.info("pace conf");
57
		//log.info("entity type: " + dedupConf.getWf().getEntityType());
58
		//log.info("clustering: " + dedupConf.getPace().getClustering());
59
		//log.info("conditions: " + dedupConf.getPace().getConditions());
60
		//log.info("fields: " + dedupConf.getPace().getModel());
61
		//log.info("blacklists: " + blackListMap);
62
		log.info("wf conf: " + dedupConf.toString());
63
	}
64

    
65
	@Override
66
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
67
		// log.info("got key: " + new String(keyIn.copyBytes()));
68

    
69
		final WfConfig wf = dedupConf.getWf();
70
		final byte[] body = result.getValue(wf.getEntityType().getBytes(), DedupUtils.BODY_B);
71

    
72
		if (body != null) {
73

    
74
			final OafDecoder decoder = OafDecoder.decode(body);
75
			if (decoder.getOaf().getDataInfo().getDeletedbyinference()) {
76
				context.getCounter(wf.getEntityType(), "deleted by inference").increment(1);
77
				return;
78
			}
79

    
80
			final OafEntity entity = decoder.getEntity();
81

    
82
			context.getCounter(entity.getType().toString(), "decoded").increment(1);
83

    
84
			if (entity.getType().equals(Type.valueOf(wf.getEntityType()))) {
85

    
86
				final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel());
87
				context.getCounter(entity.getType().toString(), "converted as MapDocument").increment(1);
88

    
89
				if (wf.hasSubType()) {
90

    
91
					final Map<String, Field> fields = doc.getFieldMap();
92

    
93
					if (!fields.containsKey(wf.getSubEntityType())) {
94
						throw new IllegalStateException(String.format("model map does not contain field %s", wf.getSubEntityType()));
95
					}
96

    
97
					final String subType = fields.get(wf.getSubEntityType()).stringValue();
98
					if (wf.getSubEntityValue().equalsIgnoreCase(subType)) {
99
						context.getCounter(subType, "converted as MapDocument").increment(1);
100
						emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap));
101
					} else {
102
						context.getCounter(subType, "ignored").increment(1);
103
					}
104
				} else {
105
					emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap));
106
				}
107
			}
108
		} else {
109
			context.getCounter(wf.getEntityType(), "missing body").increment(1);
110
		}
111
	}
112

    
113
	private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException {
114
		for (final String ngram : ngrams) {
115
			outKey.set(ngram);
116
			ibw.set(doc.toByteArray());
117
			context.write(outKey, ibw);
118
		}
119
	}
120

    
121
}
(7-7/16)