Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

    
3
import java.io.IOException;
4
import java.math.BigDecimal;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Objects;
8
import java.util.Set;
9
import java.util.function.Function;
10

    
11
import com.google.common.collect.Maps;
12
import com.google.common.collect.Sets;
13
import com.google.protobuf.InvalidProtocolBufferException;
14
import eu.dnetlib.data.mapreduce.Algorithms;
15
import eu.dnetlib.data.mapreduce.JobParams;
16
import eu.dnetlib.data.mapreduce.hbase.broker.model.Event;
17
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
18
import eu.dnetlib.data.proto.OafProtos.Oaf;
19
import eu.dnetlib.pace.config.DedupConfig;
20
import eu.dnetlib.pace.distance.PaceDocumentDistance;
21
import eu.dnetlib.pace.distance.eval.ScoreResult;
22
import eu.dnetlib.pace.model.MapDocument;
23
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
24
import org.apache.commons.lang.StringUtils;
25
import org.apache.commons.math.util.MathUtils;
26
import org.apache.hadoop.hbase.client.HTable;
27
import org.apache.hadoop.hbase.client.Result;
28
import org.apache.hadoop.hbase.client.ResultScanner;
29
import org.apache.hadoop.hbase.client.Scan;
30
import org.apache.hadoop.hbase.filter.FilterList;
31
import org.apache.hadoop.hbase.filter.FilterList.Operator;
32
import org.apache.hadoop.hbase.filter.PrefixFilter;
33
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34
import org.apache.hadoop.hbase.util.Bytes;
35
import org.apache.hadoop.io.Text;
36
import org.apache.hadoop.mapreduce.Reducer;
37

    
38
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues;
39

    
40
/**
41
 * Created by claudio on 20/02/2017.
42
 */
43
public abstract class AbstractEnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
44

    
45
	protected DedupConfig dedupConf;
46

    
47
	protected static final int LIMIT = 1000;
48

    
49
	protected static final int SCORE_DECIMALS = 2;
50

    
51
	protected Map<String, String> dsTypeMap = Maps.newHashMap();
52

    
53
	protected Set<String> dsWhitelist = Sets.newHashSet();
54

    
55
	protected Set<String> dsBlacklist = Sets.newHashSet();
56

    
57
	// This is for EuropePMC. They expose OA abstracts, but we want to identify real OA publications. WTF.
58
	protected Set<String> untrustedOaDsList = Sets.newHashSet();
59

    
60
	// White list for datasource typologies.
61
	protected Set<String> dsTypeWhitelist = Sets.newHashSet();
62

    
63
	protected Text tKey = new Text("");
64

    
65
	/**
66
	 * lower bound for trust scaling
67
	 */
68
	protected double scaleLB;
69

    
70
	protected abstract String counterGroup();
71

    
72
	protected Map<String, String> baseUrlMap = Maps.newHashMap();
73

    
74
	@Override
75
	protected void setup(final Context context) throws IOException, InterruptedException {
76
		super.setup(context);
77

    
78
		System.out.println("LIMIT: " + LIMIT);
79

    
80
		dsWhitelist.addAll(getPropertyValues(context, "broker.datasource.id.whitelist"));
81
		dsBlacklist.addAll(getPropertyValues(context, "broker.datasource.id.blacklist"));
82
		dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist"));
83
		untrustedOaDsList.addAll(getPropertyValues(context, "broker.datasource.untrusted.oa.list"));
84

    
85
		dsTypeMap = getDsTypeMap(context, dsTypeWhitelist);
86

    
87
		System.out.println("datasource whitelist: " + dsWhitelist);
88
		System.out.println("datasource blacklist: " + dsBlacklist);
89
		System.out.println("datasource OA list: " + untrustedOaDsList);
90

    
91
		System.out.println("datasource type whitelist: " + dsTypeWhitelist);
92

    
93
		final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF);
94

    
95
		System.out.println("got dedup conf: " + dedupConfJson);
96

    
97
		dedupConf = DedupConfig.load(dedupConfJson);
98

    
99
		System.out.println("parsed dedup conf: " + dedupConf.toString());
100

    
101
		scaleLB = dedupConf.getWf().getThreshold() - 0.01;
102

    
103
		baseUrlMap.put("publication", context.getConfiguration().get("broker.baseurl.publication", "%s"));
104
		baseUrlMap.put("dataset", context.getConfiguration().get("broker.baseurl.datset", "%s"));
105
		baseUrlMap.put("software", context.getConfiguration().get("broker.baseurl.software", "%s"));
106
		baseUrlMap.put("other", context.getConfiguration().get("broker.baseurl.other", "%s"));
107
	}
108

    
109
	protected Map<String, String> getDsTypeMap(final Context context, final Set<String> dsTypeWhitelist) throws IOException {
110
		System.out.println("loading datasource typology mapping");
111

    
112
		final Map<String, String> dsTypeMap = Maps.newHashMap();
113

    
114
		final Scan scan = new Scan();
115
		final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
116
		fl.addFilter(new PrefixFilter(Bytes.toBytes("10")));
117
		scan.setFilter(fl);
118
		scan.addFamily(Bytes.toBytes("datasource"));
119

    
120
		final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
121

    
122
		System.out.println(String.format("table name: '%s'", tableName));
123

    
124
		try (final HTable table = new HTable(context.getConfiguration(), tableName); final ResultScanner res = table.getScanner(scan)) {
125

    
126
			for (final Result r : res) {
127
				final byte[] b = r.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
128
				if (b != null) {
129
					final Oaf oaf = Oaf.parseFrom(b);
130
					final String dsId = StringUtils.substringAfter(oaf.getEntity().getId(), "|");
131
					final String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
132

    
133
					if (dsTypeWhitelist.contains(dsType)) {
134
						//System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType));
135
						dsTypeMap.put(dsId, dsType);
136
					}
137
				}
138
			}
139
		}
140
		System.out.println("datasource type map size: " + dsTypeMap.size());
141
		return dsTypeMap;
142
	}
143

    
144
	protected void emit(final List<EventWrapper> events, final Context context) {
145
		events.stream().filter(Objects::nonNull).forEach(eventWrapper -> {
146
			try {
147
				final Event event = eventWrapper.asBrokerEvent();
148
				final String json = event.toJson();
149
				final Text valueout = new Text(json);
150
				context.write(tKey, valueout);
151
				context.getCounter(counterGroup(), eventWrapper.getCounterName()).increment(1);
152
			} catch (Exception e) {
153
				throw new RuntimeException(e);
154
			}
155
		});
156
	}
157

    
158
	protected double similarity(final Oaf oa, final Oaf ob) {
159

    
160
		final MapDocument a = ProtoDocumentBuilder.newInstance(oa.getEntity().getId(), oa.getEntity(), dedupConf.getPace().getModel());
161
		final MapDocument b = ProtoDocumentBuilder.newInstance(ob.getEntity().getId(), ob.getEntity(), dedupConf.getPace().getModel());
162

    
163
		final ScoreResult sr =  new PaceDocumentDistance().between(a, b, dedupConf);
164
		return sr.getScore();
165
	}
166

    
167
	protected float scale(final double d) {
168
		final float score = (float) Algorithms.scale(d, scaleLB, 1, 0, 1);
169

    
170
		return MathUtils.round(score, SCORE_DECIMALS, BigDecimal.ROUND_HALF_DOWN);
171
	}
172

    
173
	public static Function<ImmutableBytesWritable, Oaf> oafDeserialiser() {
174
		return p -> {
175
			try {
176
				return Oaf.parseFrom(p.copyBytes());
177
			} catch (final InvalidProtocolBufferException e) {
178
				throw new IllegalArgumentException(e);
179
			}
180
		};
181
	}
182

    
183
	public static Oaf toOaf(ImmutableBytesWritable p) {
184
		try {
185
			return Oaf.parseFrom(p.copyBytes());
186
		} catch (final InvalidProtocolBufferException e) {
187
			throw new IllegalArgumentException(e);
188
		}
189
	}
190

    
191
}
(2-2/8)