Project

General

Profile

« Previous | Next » 

Revision 52878

introduced subType in pace wf configuration

View differences:

modules/dnet-mapreduce-jobs/branches/beta/src/main/java/eu/dnetlib/data/mapreduce/hbase/dedup/DedupMapper.java
13 13
import eu.dnetlib.data.proto.TypeProtos.Type;
14 14
import eu.dnetlib.pace.clustering.BlacklistAwareClusteringCombiner;
15 15
import eu.dnetlib.pace.config.DedupConfig;
16
import eu.dnetlib.pace.config.WfConfig;
17
import eu.dnetlib.pace.model.Field;
16 18
import eu.dnetlib.pace.model.MapDocument;
17 19
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
18 20
import org.apache.commons.logging.Log;
......
22 24
import org.apache.hadoop.hbase.mapreduce.TableMapper;
23 25
import org.apache.hadoop.hbase.util.Bytes;
24 26
import org.apache.hadoop.io.Text;
27
import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
25 28

  
26 29
public class DedupMapper extends TableMapper<Text, ImmutableBytesWritable> {
27 30

  
......
50 53
		outKey = new Text();
51 54
		ibw = new ImmutableBytesWritable();
52 55

  
53
		log.info("pace conf");
54
		log.info("entity type: " + dedupConf.getWf().getEntityType());
55
		log.info("clustering: " + dedupConf.getPace().getClustering());
56
		log.info("conditions: " + dedupConf.getPace().getConditions());
57
		log.info("fields: " + dedupConf.getPace().getModel());
58
		log.info("blacklists: " + blackListMap);
56
		//log.info("pace conf");
57
		//log.info("entity type: " + dedupConf.getWf().getEntityType());
58
		//log.info("clustering: " + dedupConf.getPace().getClustering());
59
		//log.info("conditions: " + dedupConf.getPace().getConditions());
60
		//log.info("fields: " + dedupConf.getPace().getModel());
61
		//log.info("blacklists: " + blackListMap);
59 62
		log.info("wf conf: " + dedupConf.toString());
60 63
	}
61 64

  
......
63 66
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException, InterruptedException {
64 67
		// log.info("got key: " + new String(keyIn.copyBytes()));
65 68

  
66
		final byte[] body = result.getValue(dedupConf.getWf().getEntityType().getBytes(), DedupUtils.BODY_B);
69
		final WfConfig wf = dedupConf.getWf();
70
		final byte[] body = result.getValue(wf.getEntityType().getBytes(), DedupUtils.BODY_B);
67 71

  
68 72
		if (body != null) {
69 73

  
70 74
			final OafDecoder decoder = OafDecoder.decode(body);
71 75
			if (decoder.getOaf().getDataInfo().getDeletedbyinference()) {
72
				context.getCounter(dedupConf.getWf().getEntityType(), "deleted by inference").increment(1);
76
				context.getCounter(wf.getEntityType(), "deleted by inference").increment(1);
73 77
				return;
74 78
			}
75 79

  
......
77 81

  
78 82
			context.getCounter(entity.getType().toString(), "decoded").increment(1);
79 83

  
80
			if (entity.getType().equals(Type.valueOf(dedupConf.getWf().getEntityType()))) {
84
			if (entity.getType().equals(Type.valueOf(wf.getEntityType()))) {
81 85

  
82
				// TODO: remove this hack - here because we want to dedup only publications and organizazions
83
				if (shouldDedup(entity)) {
84
					final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel());
86
				final MapDocument doc = ProtoDocumentBuilder.newInstance(Bytes.toString(keyIn.copyBytes()), entity, dedupConf.getPace().getModel());
87
				context.getCounter(entity.getType().toString(), "converted as MapDocument").increment(1);
88

  
89
				if (wf.hasSubType()) {
90

  
91
					final Map<String, Field> fields = doc.getFieldMap();
92

  
93
					if (!fields.containsKey(wf.getSubEntityType())) {
94
						throw new IllegalStateException(String.format("model map does not contain field %s", wf.getSubEntityType()));
95
					}
96

  
97
					final String subType = fields.get(wf.getSubEntityType()).stringValue();
98
					if (wf.getSubEntityValue().equalsIgnoreCase(subType)) {
99
						context.getCounter(subType, "converted as MapDocument").increment(1);
100
						emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap));
101
					} else {
102
						context.getCounter(subType, "ignored").increment(1);
103
					}
104
				} else {
85 105
					emitNGrams(context, doc, BlacklistAwareClusteringCombiner.filterAndCombine(doc, dedupConf, blackListMap));
86 106
				}
87 107
			}
88 108
		} else {
89
			context.getCounter(dedupConf.getWf().getEntityType(), "missing body").increment(1);
109
			context.getCounter(wf.getEntityType(), "missing body").increment(1);
90 110
		}
91 111
	}
92 112

  
93
	private boolean shouldDedup(final OafEntity entity) {
94
		return (entity.getType().equals(Type.result) && entity.getResult().getMetadata().getResulttype().getClassid().equals("publication")) |
95
				entity.getType().equals(Type.organization);
96
	}
97

  
98 113
	private void emitNGrams(final Context context, final MapDocument doc, final Collection<String> ngrams) throws IOException, InterruptedException {
99 114
		for (final String ngram : ngrams) {
100 115
			outKey.set(ngram);

Also available in: Unified diff