Project

General

Profile

« Previous | Next » 

Revision 52877

introduced subType in pace wf configuration

View differences:

DedupMapper.java
13 13
import eu.dnetlib.data.proto.TypeProtos.Type;
14 14
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
15 15
import eu.dnetlib.pace.config.DedupConfig;
16
import eu.dnetlib.pace.config.WfConfig;
17
import eu.dnetlib.pace.model.Field;
16 18
import eu.dnetlib.pace.model.MapDocument;
17 19
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
18 20
import org.apache.commons.logging.Log;
......
22 24
import org.apache.hadoop.hbase.mapreduce.TableMapper;
23 25
import org.apache.hadoop.hbase.util.Bytes;
24 26
import org.apache.hadoop.io.Text;
27
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
25 28

  
26 29
public class DedupMapper extends TableMapper<Text, ImmutableBytesWritable> {
27 30

  
......
63 66
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
64 67
		// log.info("got key: " + new String(keyIn.copyBytes()));
65 68

  
66
		final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B);
69
		final WfConfig wf = dedupConf.getWf();
70
		final byte[] body = result.getValue(wf.getEntityType().getBytes(), DedupUtils.BODY_B);
67 71

  
68 72
		if (body != null) {
69 73

  
70 74
			final OafDecoder decoder = OafDecoder.decode(body);
71 75
			if (decoder.getOaf().getDataInfo().getDeletedbyinference()) {
72
				context.getCounter(dedupConf.getWf().getEntityType(), "deleted by inference").increment(1);
76
				context.getCounter(wf.getEntityType(), "deleted by inference").increment(1);
73 77
				return;
74 78
			}
75 79

  
......
77 81

  
78 82
			context.getCounter(entity.getType().toString(), "decoded").increment(1);
79 83

  
80
			if (entity.getType().equals(Type.valueOf(dedupConf.getWf().getEntityType()))) {
84
			if (entity.getType().equals(Type.valueOf(wf.getEntityType()))) {
81 85

  
82
				// TODO: remove this hack - here because we want to dedup only publications and organizazions
83
				if (shouldDedup(entity)) {
84
					final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel());
85
					context.getCounter(entity.getType().toString(), "converted as MapDocument").increment(1);
86
				final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel());
87
				context.getCounter(entity.getType().toString(), "converted as MapDocument").increment(1);
88

  
89
				if (wf.hasSubType()) {
90

  
91
					final Map<String, Field> fields = doc.getFieldMap();
92

  
93
					if (!fields.containsKey(wf.getSubEntityType())) {
94
						throw new IllegalStateException(String.format("model map does not contain field %s", wf.getSubEntityType()));
95
					}
96

  
97
					final String subType = fields.get(wf.getSubEntityType()).stringValue();
98
					if (wf.getSubEntityValue().equalsIgnoreCase(subType)) {
99
						context.getCounter(subType, "converted as MapDocument").increment(1);
100
						emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap));
101
					} else {
102
						context.getCounter(subType, "ignored").increment(1);
103
					}
104
				} else {
86 105
					emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap));
87 106
				}
88 107
			}
89 108
		} else {
90
			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
109
			context.getCounter(wf.getEntityType(), "missing body").increment(1);
91 110
		}
92 111
	}
93 112

  
94
	private boolean shouldDedup(final OafEntity entity) {
95
		return (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("publication")) |
96
				entity.getType().equals(Type.organization);
97
	}
98

  
99 113
	private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException {
100 114
		for (final String ngram : ngrams) {
101 115
			outKey.set(ngram);

Also available in: Unified diff