Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

    
3
import java.io.IOException;
4
import java.math.BigDecimal;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Objects;
8
import java.util.Set;
9
import java.util.function.Function;
10

    
11
import com.google.common.collect.Maps;
12
import com.google.common.collect.Sets;
13
import com.google.protobuf.InvalidProtocolBufferException;
14
import eu.dnetlib.data.mapreduce.Algorithms;
15
import eu.dnetlib.data.mapreduce.JobParams;
16
import eu.dnetlib.data.mapreduce.hbase.broker.model.Event;
17
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
18
import eu.dnetlib.data.proto.OafProtos.Oaf;
19
import eu.dnetlib.pace.config.DedupConfig;
20
import eu.dnetlib.pace.distance.PaceDocumentDistance;
21
import eu.dnetlib.pace.distance.eval.ScoreResult;
22
import eu.dnetlib.pace.model.MapDocument;
23
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
24
import org.apache.commons.lang.StringUtils;
25
import org.apache.commons.math.util.MathUtils;
26
import org.apache.hadoop.hbase.client.HTable;
27
import org.apache.hadoop.hbase.client.Result;
28
import org.apache.hadoop.hbase.client.ResultScanner;
29
import org.apache.hadoop.hbase.client.Scan;
30
import org.apache.hadoop.hbase.filter.FilterList;
31
import org.apache.hadoop.hbase.filter.FilterList.Operator;
32
import org.apache.hadoop.hbase.filter.PrefixFilter;
33
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34
import org.apache.hadoop.hbase.util.Bytes;
35
import org.apache.hadoop.io.Text;
36
import org.apache.hadoop.mapreduce.Reducer;
37

    
38
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues;
39

    
40
/**
41
 * Created by claudio on 20/02/2017.
42
 */
43
public abstract class AbstractEnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
44

    
45
	protected DedupConfig dedupConf;
46

    
47
	protected static final int LIMIT = 1000;
48

    
49
	protected static final int SCORE_DECIMALS = 2;
50

    
51
	protected Map<String, String> dsTypeMap = Maps.newHashMap();
52

    
53
	protected Set<String> dsWhitelist = Sets.newHashSet();
54

    
55
	protected Set<String> dsBlacklist = Sets.newHashSet();
56

    
57
	// This is for EuropePMC. They expose OA abstracts, but we want to identify real OA publications. WTF.
58
	protected Set<String> untrustedOaDsList = Sets.newHashSet();
59

    
60
	// White list for datasource typologies.
61
	protected Set<String> dsTypeWhitelist = Sets.newHashSet();
62

    
63
	protected Text tKey = new Text("");
64

    
65
	/**
66
	 * lower bound for trust scaling
67
	 */
68
	protected double scaleLB;
69

    
70
	protected abstract String counterGroup();
71

    
72
	@Override
73
	protected void setup(final Context context) throws IOException, InterruptedException {
74
		super.setup(context);
75

    
76
		System.out.println("LIMIT: " + LIMIT);
77

    
78
		dsWhitelist.addAll(getPropertyValues(context, "broker.datasource.id.whitelist"));
79
		dsBlacklist.addAll(getPropertyValues(context, "broker.datasource.id.blacklist"));
80
		dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist"));
81
		untrustedOaDsList.addAll(getPropertyValues(context, "broker.datasource.untrusted.oa.list"));
82

    
83
		dsTypeMap = getDsTypeMap(context, dsTypeWhitelist);
84

    
85
		System.out.println("datasource whitelist: " + dsWhitelist);
86
		System.out.println("datasource blacklist: " + dsBlacklist);
87
		System.out.println("datasource OA list: " + untrustedOaDsList);
88

    
89
		System.out.println("datasource type whitelist: " + dsTypeWhitelist);
90

    
91
		final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF);
92

    
93
		System.out.println("got dedup conf: " + dedupConfJson);
94

    
95
		dedupConf = DedupConfig.load(dedupConfJson);
96

    
97
		System.out.println("parsed dedup conf: " + dedupConf.toString());
98

    
99
		scaleLB = dedupConf.getWf().getThreshold() - 0.01;
100
	}
101

    
102
	protected Map<String, String> getDsTypeMap(final Context context, final Set<String> dsTypeWhitelist) throws IOException {
103
		System.out.println("loading datasource typology mapping");
104

    
105
		final Map<String, String> dsTypeMap = Maps.newHashMap();
106

    
107
		final Scan scan = new Scan();
108
		final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
109
		fl.addFilter(new PrefixFilter(Bytes.toBytes("10")));
110
		scan.setFilter(fl);
111
		scan.addFamily(Bytes.toBytes("datasource"));
112

    
113
		final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
114

    
115
		System.out.println(String.format("table name: '%s'", tableName));
116

    
117
		try (final HTable table = new HTable(context.getConfiguration(), tableName); final ResultScanner res = table.getScanner(scan)) {
118

    
119
			for (final Result r : res) {
120
				final byte[] b = r.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
121
				if (b != null) {
122
					final Oaf oaf = Oaf.parseFrom(b);
123
					final String dsId = StringUtils.substringAfter(oaf.getEntity().getId(), "|");
124
					final String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
125

    
126
					if (dsTypeWhitelist.contains(dsType)) {
127
						//System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType));
128
						dsTypeMap.put(dsId, dsType);
129
					}
130
				}
131
			}
132
		}
133
		System.out.println("datasource type map size: " + dsTypeMap.size());
134
		return dsTypeMap;
135
	}
136

    
137
	protected void emit(final List<EventWrapper> events, final Context context) {
138
		events.stream().filter(Objects::nonNull).forEach(eventWrapper -> {
139
			try {
140
				final Event event = eventWrapper.asBrokerEvent();
141
				final String json = event.toJson();
142
				final Text valueout = new Text(json);
143
				context.write(tKey, valueout);
144
				context.getCounter(counterGroup(), eventWrapper.getCounterName()).increment(1);
145
			} catch (Exception e) {
146
				throw new RuntimeException(e);
147
			}
148
		});
149
	}
150

    
151
	protected double similarity(final Oaf oa, final Oaf ob) {
152

    
153
		final MapDocument a = ProtoDocumentBuilder.newInstance(oa.getEntity().getId(), oa.getEntity(), dedupConf.getPace().getModel());
154
		final MapDocument b = ProtoDocumentBuilder.newInstance(ob.getEntity().getId(), ob.getEntity(), dedupConf.getPace().getModel());
155

    
156
		final ScoreResult sr =  new PaceDocumentDistance().between(a, b, dedupConf);
157
		return sr.getScore();
158
	}
159

    
160
	protected float scale(final double d) {
161
		final float score = (float) Algorithms.scale(d, scaleLB, 1, 0, 1);
162

    
163
		return MathUtils.round(score, SCORE_DECIMALS, BigDecimal.ROUND_HALF_DOWN);
164
	}
165

    
166
	public static Function<ImmutableBytesWritable, Oaf> oafDeserialiser() {
167
		return p -> {
168
			try {
169
				return Oaf.parseFrom(p.copyBytes());
170
			} catch (final InvalidProtocolBufferException e) {
171
				throw new IllegalArgumentException(e);
172
			}
173
		};
174
	}
175

    
176
	public static Oaf toOaf(ImmutableBytesWritable p) {
177
		try {
178
			return Oaf.parseFrom(p.copyBytes());
179
		} catch (final InvalidProtocolBufferException e) {
180
			throw new IllegalArgumentException(e);
181
		}
182
	}
183

    
184
}
(2-2/8)