Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

    
3
import java.io.IOException;
4
import java.math.BigDecimal;
5
import java.util.List;
6
import java.util.Map;
7
import java.util.Objects;
8
import java.util.Set;
9
import java.util.function.Function;
10

    
11
import com.google.common.collect.Maps;
12
import com.google.common.collect.Sets;
13
import com.google.protobuf.InvalidProtocolBufferException;
14
import eu.dnetlib.data.mapreduce.Algorithms;
15
import eu.dnetlib.data.mapreduce.JobParams;
16
import eu.dnetlib.data.mapreduce.hbase.broker.model.Event;
17
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
18
import eu.dnetlib.data.proto.OafProtos.Oaf;
19
import eu.dnetlib.pace.config.DedupConfig;
20
import eu.dnetlib.pace.model.MapDocument;
21
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
22
import eu.dnetlib.pace.tree.support.TreeProcessor;
23
import org.apache.commons.lang.StringUtils;
24
import org.apache.commons.math.util.MathUtils;
25
import org.apache.hadoop.hbase.client.HTable;
26
import org.apache.hadoop.hbase.client.Result;
27
import org.apache.hadoop.hbase.client.ResultScanner;
28
import org.apache.hadoop.hbase.client.Scan;
29
import org.apache.hadoop.hbase.filter.FilterList;
30
import org.apache.hadoop.hbase.filter.FilterList.Operator;
31
import org.apache.hadoop.hbase.filter.PrefixFilter;
32
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
33
import org.apache.hadoop.hbase.util.Bytes;
34
import org.apache.hadoop.io.Text;
35
import org.apache.hadoop.mapreduce.Reducer;
36

    
37
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues;
38

    
39
/**
40
 * Created by claudio on 20/02/2017.
41
 */
42
public abstract class AbstractEnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
43

    
44
	protected DedupConfig dedupConf;
45

    
46
	protected static final int LIMIT = 1000;
47

    
48
	protected static final int SCORE_DECIMALS = 2;
49

    
50
	protected Map<String, String> dsTypeMap = Maps.newHashMap();
51

    
52
	protected Set<String> dsWhitelist = Sets.newHashSet();
53

    
54
	protected Set<String> dsBlacklist = Sets.newHashSet();
55

    
56
	// This is for EuropePMC. They expose OA abstracts, but we want to identify real OA publications. WTF.
57
	protected Set<String> untrustedOaDsList = Sets.newHashSet();
58

    
59
	// White list for datasource typologies.
60
	protected Set<String> dsTypeWhitelist = Sets.newHashSet();
61

    
62
	protected Text tKey = new Text("");
63

    
64
	/**
65
	 * lower bound for trust scaling
66
	 */
67
	protected double scaleLB;
68

    
69
	protected abstract String counterGroup();
70

    
71
	protected Map<String, String> baseUrlMap = Maps.newHashMap();
72

    
73
	@Override
74
	protected void setup(final Context context) throws IOException, InterruptedException {
75
		super.setup(context);
76

    
77
		System.out.println("LIMIT: " + LIMIT);
78

    
79
		dsWhitelist.addAll(getPropertyValues(context, "broker.datasource.id.whitelist"));
80
		dsBlacklist.addAll(getPropertyValues(context, "broker.datasource.id.blacklist"));
81
		dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist"));
82
		untrustedOaDsList.addAll(getPropertyValues(context, "broker.datasource.untrusted.oa.list"));
83

    
84
		dsTypeMap = getDsTypeMap(context, dsTypeWhitelist);
85

    
86
		System.out.println("datasource whitelist: " + dsWhitelist);
87
		System.out.println("datasource blacklist: " + dsBlacklist);
88
		System.out.println("datasource OA list: " + untrustedOaDsList);
89

    
90
		System.out.println("datasource type whitelist: " + dsTypeWhitelist);
91

    
92
		final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF);
93

    
94
		System.out.println("got dedup conf: " + dedupConfJson);
95

    
96
		dedupConf = DedupConfig.load(dedupConfJson);
97

    
98
		System.out.println("parsed dedup conf: " + dedupConf.toString());
99

    
100
		scaleLB = dedupConf.getWf().getThreshold() - 0.01;
101

    
102
		baseUrlMap.put("publication", context.getConfiguration().get("broker.baseurl.publication", "%s"));
103
		baseUrlMap.put("dataset", context.getConfiguration().get("broker.baseurl.datset", "%s"));
104
		baseUrlMap.put("software", context.getConfiguration().get("broker.baseurl.software", "%s"));
105
		baseUrlMap.put("other", context.getConfiguration().get("broker.baseurl.other", "%s"));
106
	}
107

    
108
	protected Map<String, String> getDsTypeMap(final Context context, final Set<String> dsTypeWhitelist) throws IOException {
109
		System.out.println("loading datasource typology mapping");
110

    
111
		final Map<String, String> dsTypeMap = Maps.newHashMap();
112

    
113
		final Scan scan = new Scan();
114
		final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
115
		fl.addFilter(new PrefixFilter(Bytes.toBytes("10")));
116
		scan.setFilter(fl);
117
		scan.addFamily(Bytes.toBytes("datasource"));
118

    
119
		final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
120

    
121
		System.out.println(String.format("table name: '%s'", tableName));
122

    
123
		try (final HTable table = new HTable(context.getConfiguration(), tableName); final ResultScanner res = table.getScanner(scan)) {
124

    
125
			for (final Result r : res) {
126
				final byte[] b = r.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
127
				if (b != null) {
128
					final Oaf oaf = Oaf.parseFrom(b);
129
					final String dsId = StringUtils.substringAfter(oaf.getEntity().getId(), "|");
130
					final String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
131

    
132
					if (dsTypeWhitelist.contains(dsType)) {
133
						//System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType));
134
						dsTypeMap.put(dsId, dsType);
135
					}
136
				}
137
			}
138
		}
139
		System.out.println("datasource type map size: " + dsTypeMap.size());
140
		return dsTypeMap;
141
	}
142

    
143
	protected void emit(final List<EventWrapper> events, final Context context) {
144
		events.stream().filter(Objects::nonNull).forEach(eventWrapper -> {
145
			try {
146
				final Event event = eventWrapper.asBrokerEvent();
147
				final String json = event.toJson();
148
				final Text valueout = new Text(json);
149
				context.write(tKey, valueout);
150
				context.getCounter(counterGroup(), eventWrapper.getCounterName()).increment(1);
151
			} catch (Exception e) {
152
				throw new RuntimeException(e);
153
			}
154
		});
155
	}
156

    
157
	protected double similarity(final Oaf oa, final Oaf ob) {
158

    
159
		final MapDocument a = ProtoDocumentBuilder.newInstance(oa.getEntity().getId(), oa.getEntity(), dedupConf.getPace().getModel());
160
		final MapDocument b = ProtoDocumentBuilder.newInstance(ob.getEntity().getId(), ob.getEntity(), dedupConf.getPace().getModel());
161

    
162
		TreeProcessor tree = new TreeProcessor(dedupConf);
163
		return tree.computeScore(a, b);
164
	}
165

    
166
	protected float scale(final double d) {
167
		final float score = (float) Algorithms.scale(d, scaleLB, 1, 0, 1);
168

    
169
		return MathUtils.round(score, SCORE_DECIMALS, BigDecimal.ROUND_HALF_DOWN);
170
	}
171

    
172
	public static Function<ImmutableBytesWritable, Oaf> oafDeserialiser() {
173
		return p -> {
174
			try {
175
				return Oaf.parseFrom(p.copyBytes());
176
			} catch (final InvalidProtocolBufferException e) {
177
				throw new IllegalArgumentException(e);
178
			}
179
		};
180
	}
181

    
182
	public static Oaf toOaf(ImmutableBytesWritable p) {
183
		try {
184
			return Oaf.parseFrom(p.copyBytes());
185
		} catch (final InvalidProtocolBufferException e) {
186
			throw new IllegalArgumentException(e);
187
		}
188
	}
189

    
190
}
(2-2/8)