Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

    
3
import java.io.IOException;
4
import java.math.BigDecimal;
5
import java.util.Map;
6
import java.util.Set;
7

    
8
import com.google.common.base.Function;
9
import com.google.common.collect.Maps;
10
import com.google.common.collect.Sets;
11
import com.google.protobuf.InvalidProtocolBufferException;
12
import eu.dnetlib.data.mapreduce.Algorithms;
13
import eu.dnetlib.data.mapreduce.JobParams;
14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.pace.config.DedupConfig;
16
import eu.dnetlib.pace.distance.PaceDocumentDistance;
17
import eu.dnetlib.pace.distance.eval.ScoreResult;
18
import eu.dnetlib.pace.model.MapDocument;
19
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
20
import org.apache.commons.lang.StringUtils;
21
import org.apache.commons.math.util.MathUtils;
22
import org.apache.hadoop.hbase.client.HTable;
23
import org.apache.hadoop.hbase.client.Result;
24
import org.apache.hadoop.hbase.client.ResultScanner;
25
import org.apache.hadoop.hbase.client.Scan;
26
import org.apache.hadoop.hbase.filter.FilterList;
27
import org.apache.hadoop.hbase.filter.FilterList.Operator;
28
import org.apache.hadoop.hbase.filter.PrefixFilter;
29
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
30
import org.apache.hadoop.hbase.util.Bytes;
31
import org.apache.hadoop.io.Text;
32
import org.apache.hadoop.mapreduce.Reducer;
33

    
34
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues;
35

    
36
/**
37
 * Created by claudio on 20/02/2017.
38
 */
39
public abstract class AbstractEnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
40

    
41
	protected DedupConfig dedupConf;
42

    
43
	protected static final int LIMIT = 1000;
44

    
45
	protected static final int SCORE_DECIMALS = 2;
46

    
47
	protected Map<String, String> dsTypeMap = Maps.newHashMap();
48

    
49
	protected Set<String> dsWhitelist = Sets.newHashSet();
50

    
51
	protected Set<String> dsBlacklist = Sets.newHashSet();
52

    
53
	// This is for EuropePMC. They expose OA abstracts, but we want to identify real OA publications. WTF.
54
	protected Set<String> untrustedOaDsList = Sets.newHashSet();
55

    
56
	// White list for datasource typologies.
57
	protected Set<String> dsTypeWhitelist = Sets.newHashSet();
58

    
59
	/**
60
	 * lower bound for trust scaling
61
	 */
62
	protected double scaleLB;
63

    
64
	protected abstract String counterGroup();
65

    
66
	@Override
67
	protected void setup(final Context context) throws IOException, InterruptedException {
68
		super.setup(context);
69

    
70
		System.out.println("LIMIT: " + LIMIT);
71

    
72
		dsWhitelist.addAll(getPropertyValues(context, "broker.datasource.id.whitelist"));
73
		dsBlacklist.addAll(getPropertyValues(context, "broker.datasource.id.blacklist"));
74
		dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist"));
75
		untrustedOaDsList.addAll(getPropertyValues(context, "broker.datasource.untrusted.oa.list"));
76

    
77
		dsTypeMap = getDsTypeMap(context, dsTypeWhitelist);
78

    
79
		System.out.println("datasource whitelist: " + dsWhitelist);
80
		System.out.println("datasource blacklist: " + dsBlacklist);
81
		System.out.println("datasource OA list: " + untrustedOaDsList);
82

    
83
		System.out.println("datasource type whitelist: " + dsTypeWhitelist);
84

    
85
		final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF);
86

    
87
		System.out.println("got dedup conf: " + dedupConfJson);
88

    
89
		dedupConf = DedupConfig.load(dedupConfJson);
90

    
91
		System.out.println("parsed dedup conf: " + dedupConf.toString());
92

    
93
		scaleLB = dedupConf.getWf().getThreshold() - 0.01;
94
	}
95

    
96
	protected Map<String, String> getDsTypeMap(final Context context, final Set<String> dsTypeWhitelist) throws IOException {
97
		System.out.println("loading datasource typology mapping");
98

    
99
		final Map<String, String> dsTypeMap = Maps.newHashMap();
100

    
101
		final Scan scan = new Scan();
102
		final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
103
		fl.addFilter(new PrefixFilter(Bytes.toBytes("10")));
104
		scan.setFilter(fl);
105
		scan.addFamily(Bytes.toBytes("datasource"));
106

    
107
		final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
108

    
109
		System.out.println(String.format("table name: '%s'", tableName));
110

    
111
		try (final HTable table = new HTable(context.getConfiguration(), tableName); final ResultScanner res = table.getScanner(scan)) {
112

    
113
			for (final Result r : res) {
114
				final byte[] b = r.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
115
				if (b != null) {
116
					final Oaf oaf = Oaf.parseFrom(b);
117
					final String dsId = StringUtils.substringAfter(oaf.getEntity().getId(), "|");
118
					final String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
119

    
120
					if (dsTypeWhitelist.contains(dsType)) {
121
						//System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType));
122
						dsTypeMap.put(dsId, dsType);
123
					}
124
				}
125
			}
126
		}
127
		System.out.println("datasource type map size: " + dsTypeMap.size());
128
		return dsTypeMap;
129
	}
130

    
131
	protected float similarity(final Oaf oa, final Oaf ob) {
132

    
133
		final MapDocument a = ProtoDocumentBuilder.newInstance(oa.getEntity().getId(), oa.getEntity(), dedupConf.getPace().getModel());
134
		final MapDocument b = ProtoDocumentBuilder.newInstance(ob.getEntity().getId(), ob.getEntity(), dedupConf.getPace().getModel());
135

    
136
		final ScoreResult sr =  new PaceDocumentDistance().between(a, b, dedupConf);
137
		final float score = (float) Algorithms.scale(sr.getScore(), scaleLB, 1, 0, 1);
138

    
139
		return MathUtils.round(score, SCORE_DECIMALS, BigDecimal.ROUND_HALF_DOWN);
140
	}
141

    
142
	protected Function<ImmutableBytesWritable, Oaf> oafDeserialiser() {
143
		return new Function<ImmutableBytesWritable, Oaf>() {
144

    
145
			@Override
146
			public Oaf apply(final ImmutableBytesWritable input) {
147
				try {
148
					return Oaf.parseFrom(input.copyBytes());
149
				} catch (final InvalidProtocolBufferException e) {
150
					throw new IllegalArgumentException(e);
151
				}
152
			}
153
		};
154
	}
155
}
(2-2/6)