Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

    
3
import java.io.IOException;
4
import java.math.BigDecimal;
5
import java.util.Map;
6
import java.util.Set;
7
import java.util.Spliterator;
8
import java.util.Spliterators;
9
import java.util.stream.Stream;
10
import java.util.stream.StreamSupport;
11

    
12
import com.google.common.collect.Maps;
13
import com.google.common.collect.Sets;
14
import com.google.protobuf.InvalidProtocolBufferException;
15
import eu.dnetlib.data.mapreduce.Algorithms;
16
import eu.dnetlib.data.mapreduce.JobParams;
17
import eu.dnetlib.data.proto.OafProtos.Oaf;
18
import eu.dnetlib.pace.config.DedupConfig;
19
import eu.dnetlib.pace.distance.PaceDocumentDistance;
20
import eu.dnetlib.pace.distance.eval.ScoreResult;
21
import eu.dnetlib.pace.model.MapDocument;
22
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
23
import org.apache.commons.lang.StringUtils;
24
import org.apache.commons.math.util.MathUtils;
25
import org.apache.hadoop.hbase.client.HTable;
26
import org.apache.hadoop.hbase.client.Result;
27
import org.apache.hadoop.hbase.client.ResultScanner;
28
import org.apache.hadoop.hbase.client.Scan;
29
import org.apache.hadoop.hbase.filter.FilterList;
30
import org.apache.hadoop.hbase.filter.FilterList.Operator;
31
import org.apache.hadoop.hbase.filter.PrefixFilter;
32
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33
import org.apache.hadoop.hbase.util.Bytes;
34
import org.apache.hadoop.io.Text;
35
import org.apache.hadoop.mapreduce.Reducer;
36

    
37
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues;
38

    
39
/**
40
 * Created by claudio on 20/02/2017.
41
 */
42
public abstract class AbstractEnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
43

    
44
	protected DedupConfig dedupConf;
45

    
46
	protected static final int LIMIT = 1000;
47

    
48
	protected static final int SCORE_DECIMALS = 2;
49

    
50
	protected Map<String, String> dsTypeMap = Maps.newHashMap();
51

    
52
	protected Set<String> dsWhitelist = Sets.newHashSet();
53

    
54
	protected Set<String> dsBlacklist = Sets.newHashSet();
55

    
56
	// This is for EuropePMC. They expose OA abstracts, but we want to identify real OA publications. WTF.
57
	protected Set<String> untrustedOaDsList = Sets.newHashSet();
58

    
59
	// White list for datasource typologies.
60
	protected Set<String> dsTypeWhitelist = Sets.newHashSet();
61

    
62
	/**
63
	 * lower bound for trust scaling
64
	 */
65
	protected double scaleLB;
66

    
67
	protected abstract String counterGroup();
68

    
69
	@Override
70
	protected void setup(final Context context) throws IOException, InterruptedException {
71
		super.setup(context);
72

    
73
		System.out.println("LIMIT: " + LIMIT);
74

    
75
		dsWhitelist.addAll(getPropertyValues(context, "broker.datasource.id.whitelist"));
76
		dsBlacklist.addAll(getPropertyValues(context, "broker.datasource.id.blacklist"));
77
		dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist"));
78
		untrustedOaDsList.addAll(getPropertyValues(context, "broker.datasource.untrusted.oa.list"));
79

    
80
		dsTypeMap = getDsTypeMap(context, dsTypeWhitelist);
81

    
82
		System.out.println("datasource whitelist: " + dsWhitelist);
83
		System.out.println("datasource blacklist: " + dsBlacklist);
84
		System.out.println("datasource OA list: " + untrustedOaDsList);
85

    
86
		System.out.println("datasource type whitelist: " + dsTypeWhitelist);
87

    
88
		final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF);
89

    
90
		System.out.println("got dedup conf: " + dedupConfJson);
91

    
92
		dedupConf = DedupConfig.load(dedupConfJson);
93

    
94
		System.out.println("parsed dedup conf: " + dedupConf.toString());
95

    
96
		scaleLB = dedupConf.getWf().getThreshold() - 0.01;
97
	}
98

    
99
	protected Map<String, String> getDsTypeMap(final Context context, final Set<String> dsTypeWhitelist) throws IOException {
100
		System.out.println("loading datasource typology mapping");
101

    
102
		final Map<String, String> dsTypeMap = Maps.newHashMap();
103

    
104
		final Scan scan = new Scan();
105
		final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
106
		fl.addFilter(new PrefixFilter(Bytes.toBytes("10")));
107
		scan.setFilter(fl);
108
		scan.addFamily(Bytes.toBytes("datasource"));
109

    
110
		final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
111

    
112
		System.out.println(String.format("table name: '%s'", tableName));
113

    
114
		try (final HTable table = new HTable(context.getConfiguration(), tableName); final ResultScanner res = table.getScanner(scan)) {
115

    
116
			for (final Result r : res) {
117
				final byte[] b = r.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
118
				if (b != null) {
119
					final Oaf oaf = Oaf.parseFrom(b);
120
					final String dsId = StringUtils.substringAfter(oaf.getEntity().getId(), "|");
121
					final String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
122

    
123
					if (dsTypeWhitelist.contains(dsType)) {
124
						//System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType));
125
						dsTypeMap.put(dsId, dsType);
126
					}
127
				}
128
			}
129
		}
130
		System.out.println("datasource type map size: " + dsTypeMap.size());
131
		return dsTypeMap;
132
	}
133

    
134
	protected float similarity(final Oaf oa, final Oaf ob) {
135

    
136
		final MapDocument a = ProtoDocumentBuilder.newInstance(oa.getEntity().getId(), oa.getEntity(), dedupConf.getPace().getModel());
137
		final MapDocument b = ProtoDocumentBuilder.newInstance(ob.getEntity().getId(), ob.getEntity(), dedupConf.getPace().getModel());
138

    
139
		final ScoreResult sr =  new PaceDocumentDistance().between(a, b, dedupConf);
140
		final float score = (float) Algorithms.scale(sr.getScore(), scaleLB, 1, 0, 1);
141

    
142
		return MathUtils.round(score, SCORE_DECIMALS, BigDecimal.ROUND_HALF_DOWN);
143
	}
144

    
145
	protected static Oaf toOaf(ImmutableBytesWritable i) {
146
		try {
147
			return Oaf.parseFrom(i.copyBytes());
148
		} catch (final InvalidProtocolBufferException e) {
149
			throw new IllegalArgumentException(e);
150
		}
151
	}
152

    
153
	protected static <T> Stream<T> stream(Iterable<T> iterable) {
154
		return StreamSupport.stream(
155
				Spliterators.spliteratorUnknownSize(
156
						iterable.iterator(),
157
						Spliterator.ORDERED
158
				),
159
				false
160
		);
161
	}
162
}
(2-2/6)