Project

General

Profile

« Previous | Next » 

Revision 48145

integrated latest changes from dnet40

View differences:

EnrichmentReducer.java
1 1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2 2

  
3
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getKey;
4
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues;
5

  
6 3
import java.io.IOException;
7 4
import java.util.List;
8
import java.util.Map;
9
import java.util.Set;
10 5

  
11
import eu.dnetlib.data.mapreduce.Algorithms;
12
import eu.dnetlib.data.mapreduce.JobParams;
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
13 8
import eu.dnetlib.data.mapreduce.hbase.broker.*;
14 9
import eu.dnetlib.data.mapreduce.util.DedupUtils;
15
import eu.dnetlib.pace.config.DedupConfig;
16
import eu.dnetlib.pace.distance.PaceDocumentDistance;
17
import eu.dnetlib.pace.distance.eval.ScoreResult;
18
import eu.dnetlib.pace.model.MapDocument;
19
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
10
import eu.dnetlib.data.proto.OafProtos.Oaf;
20 11
import org.apache.commons.lang.StringUtils;
21
import org.apache.commons.lang.math.RandomUtils;
22
import org.apache.commons.math.util.MathUtils;
23
import org.apache.hadoop.hbase.client.HTable;
24
import org.apache.hadoop.hbase.client.Result;
25
import org.apache.hadoop.hbase.client.ResultScanner;
26
import org.apache.hadoop.hbase.client.Scan;
27
import org.apache.hadoop.hbase.filter.FilterList;
28
import org.apache.hadoop.hbase.filter.FilterList.Operator;
29
import org.apache.hadoop.hbase.filter.PrefixFilter;
30 12
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
31
import org.apache.hadoop.hbase.util.Bytes;
32
import org.apache.hadoop.io.Text;
33
import org.apache.hadoop.mapreduce.Reducer;
34 13
import org.dom4j.DocumentException;
35 14

  
36
import com.google.common.base.Function;
37
import com.google.common.collect.Iterables;
38
import com.google.common.collect.Lists;
39
import com.google.common.collect.Maps;
40
import com.google.common.collect.Sets;
41
import com.google.protobuf.InvalidProtocolBufferException;
15
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getKey;
42 16

  
43
import eu.dnetlib.data.proto.OafProtos.Oaf;
44

  
45 17
/**
46 18
 * Created by claudio on 08/07/16.
47 19
 */
48
public class EnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
20
public class EnrichmentReducer extends AbstractEnrichmentReducer {
49 21

  
50
	private static final int LIMIT = 1000;
51
	private static final int SCORE_DECIMALS = 2;
52

  
53
	private Map<String, String> dsTypeMap = Maps.newHashMap();
54

  
55
	private Set<String> dsWhitelist = Sets.newHashSet();
56

  
57
	private Set<String> dsBlacklist = Sets.newHashSet();
58

  
59
	// This is for EuropePMC. They expose OA abstracts, but we want to identify real OA publications. WTF.
60
	private Set<String> untrustedOaDsList = Sets.newHashSet();
61

  
62
	// White list for datasource typologies.
63
	private Set<String> dsTypeWhitelist = Sets.newHashSet();
64

  
65
	private DedupConfig dedupConf;
66

  
67
	/**
68
	 * lower bound for trust scaling
69
	 */
70
	private double scaleLB;
71

  
72 22
	@Override
73
	protected void setup(final Context context) throws IOException, InterruptedException {
74
		super.setup(context);
75

  
76
		System.out.println("LIMIT: " + LIMIT);
77

  
78
		dsWhitelist.addAll(getPropertyValues(context, "broker.datasource.id.whitelist"));
79
		dsBlacklist.addAll(getPropertyValues(context, "broker.datasource.id.blacklist"));
80
		dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist"));
81
		untrustedOaDsList.addAll(getPropertyValues(context, "broker.datasource.untrusted.oa.list"));
82

  
83
		dsTypeMap = getDsTypeMap(context, dsTypeWhitelist);
84

  
85
		System.out.println("datasource whitelist: " + dsWhitelist);
86
		System.out.println("datasource blacklist: " + dsBlacklist);
87
		System.out.println("datasource OA list: " + untrustedOaDsList);
88

  
89
		System.out.println("datasource type whitelist: " + dsTypeWhitelist);
90

  
91
		final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF);
92

  
93
		System.out.println("got dedup conf: " + dedupConfJson);
94

  
95
		dedupConf = DedupConfig.load(dedupConfJson);
96

  
97
		System.out.println("parsed dedup conf: " + dedupConf.toString());
98

  
99
		scaleLB = dedupConf.getWf().getThreshold() - 0.01;
23
	protected String counterGroup() {
24
		return "Broker Enrichment";
100 25
	}
101 26

  
102
	private Map<String, String> getDsTypeMap(final Context context, final Set<String> dsTypeWhitelist) throws IOException {
103
		System.out.println("loading datasource typology mapping");
104

  
105
		final Map<String, String> dsTypeMap = Maps.newHashMap();
106

  
107
		final Scan scan = new Scan();
108
		final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
109
		fl.addFilter(new PrefixFilter(Bytes.toBytes("10")));
110
		scan.setFilter(fl);
111
		scan.addFamily(Bytes.toBytes("datasource"));
112

  
113
		final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
114

  
115
		System.out.println(String.format("table name: '%s'", tableName));
116

  
117
		try (final HTable table = new HTable(context.getConfiguration(), tableName);
118
				final ResultScanner res = table.getScanner(scan)) {
119

  
120
			for (final Result r : res) {
121
				final byte[] b = r.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
122
				if (b != null) {
123
					final Oaf oaf = Oaf.parseFrom(b);
124
					final String dsId = StringUtils.substringAfter(oaf.getEntity().getId(), "|");
125
					final String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
126

  
127
					if (dsTypeWhitelist.contains(dsType)) {
128
						System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType));
129
						dsTypeMap.put(dsId, dsType);
130
					}
131
				}
132
			}
133

  
134
		}
135
		System.out.println("datasource type map size: " + dsTypeMap.size());
136
		return dsTypeMap;
137
	}
138

  
139 27
	@Override
140 28
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
141 29
			InterruptedException {
......
190 78
		}
191 79
	}
192 80

  
193
	private float similarity(final Oaf oa, final Oaf ob) {
194

  
195
		final MapDocument a = ProtoDocumentBuilder.newInstance(oa.getEntity().getId(), oa.getEntity(), dedupConf.getPace().getModel());
196
		final MapDocument b = ProtoDocumentBuilder.newInstance(ob.getEntity().getId(), ob.getEntity(), dedupConf.getPace().getModel());
197

  
198
		final ScoreResult sr =  new PaceDocumentDistance().between(a, b, dedupConf);
199
		final float score = (float) Algorithms.scale(sr.getScore(), scaleLB, 1, 0, 1);
200

  
201
		return MathUtils.round(score, SCORE_DECIMALS);
202
	}
203

  
204
	private Function<ImmutableBytesWritable, Oaf> oafDeserialiser() {
205
		return new Function<ImmutableBytesWritable, Oaf>() {
206

  
207
			@Override
208
			public Oaf apply(final ImmutableBytesWritable input) {
209
				try {
210
					return Oaf.parseFrom(input.copyBytes());
211
				} catch (final InvalidProtocolBufferException e) {
212
					throw new IllegalArgumentException(e);
213
				}
214
			}
215
		};
216
	}
217

  
218 81
}

Also available in: Unified diff