Revision 48145
Added by Claudio Atzori almost 7 years ago
EnrichmentReducer.java | ||
---|---|---|
1 | 1 |
package eu.dnetlib.data.mapreduce.hbase.broker.enrich; |
2 | 2 |
|
3 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getKey; |
|
4 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues; |
|
5 |
|
|
6 | 3 |
import java.io.IOException; |
7 | 4 |
import java.util.List; |
8 |
import java.util.Map; |
|
9 |
import java.util.Set; |
|
10 | 5 |
|
11 |
import eu.dnetlib.data.mapreduce.Algorithms;
|
|
12 |
import eu.dnetlib.data.mapreduce.JobParams;
|
|
6 |
import com.google.common.collect.Iterables;
|
|
7 |
import com.google.common.collect.Lists;
|
|
13 | 8 |
import eu.dnetlib.data.mapreduce.hbase.broker.*; |
14 | 9 |
import eu.dnetlib.data.mapreduce.util.DedupUtils; |
15 |
import eu.dnetlib.pace.config.DedupConfig; |
|
16 |
import eu.dnetlib.pace.distance.PaceDocumentDistance; |
|
17 |
import eu.dnetlib.pace.distance.eval.ScoreResult; |
|
18 |
import eu.dnetlib.pace.model.MapDocument; |
|
19 |
import eu.dnetlib.pace.model.ProtoDocumentBuilder; |
|
10 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
20 | 11 |
import org.apache.commons.lang.StringUtils; |
21 |
import org.apache.commons.lang.math.RandomUtils; |
|
22 |
import org.apache.commons.math.util.MathUtils; |
|
23 |
import org.apache.hadoop.hbase.client.HTable; |
|
24 |
import org.apache.hadoop.hbase.client.Result; |
|
25 |
import org.apache.hadoop.hbase.client.ResultScanner; |
|
26 |
import org.apache.hadoop.hbase.client.Scan; |
|
27 |
import org.apache.hadoop.hbase.filter.FilterList; |
|
28 |
import org.apache.hadoop.hbase.filter.FilterList.Operator; |
|
29 |
import org.apache.hadoop.hbase.filter.PrefixFilter; |
|
30 | 12 |
import org.apache.hadoop.hbase.io.ImmutableBytesWritable; |
31 |
import org.apache.hadoop.hbase.util.Bytes; |
|
32 |
import org.apache.hadoop.io.Text; |
|
33 |
import org.apache.hadoop.mapreduce.Reducer; |
|
34 | 13 |
import org.dom4j.DocumentException; |
35 | 14 |
|
36 |
import com.google.common.base.Function; |
|
37 |
import com.google.common.collect.Iterables; |
|
38 |
import com.google.common.collect.Lists; |
|
39 |
import com.google.common.collect.Maps; |
|
40 |
import com.google.common.collect.Sets; |
|
41 |
import com.google.protobuf.InvalidProtocolBufferException; |
|
15 |
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getKey; |
|
42 | 16 |
|
43 |
import eu.dnetlib.data.proto.OafProtos.Oaf; |
|
44 |
|
|
45 | 17 |
/** |
46 | 18 |
* Created by claudio on 08/07/16. |
47 | 19 |
*/ |
48 |
public class EnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
|
|
20 |
public class EnrichmentReducer extends AbstractEnrichmentReducer {
|
|
49 | 21 |
|
50 |
private static final int LIMIT = 1000; |
|
51 |
private static final int SCORE_DECIMALS = 2; |
|
52 |
|
|
53 |
private Map<String, String> dsTypeMap = Maps.newHashMap(); |
|
54 |
|
|
55 |
private Set<String> dsWhitelist = Sets.newHashSet(); |
|
56 |
|
|
57 |
private Set<String> dsBlacklist = Sets.newHashSet(); |
|
58 |
|
|
59 |
// This is for EuropePMC. They expose OA abstracts, but we want to identify real OA publications. WTF. |
|
60 |
private Set<String> untrustedOaDsList = Sets.newHashSet(); |
|
61 |
|
|
62 |
// White list for datasource typologies. |
|
63 |
private Set<String> dsTypeWhitelist = Sets.newHashSet(); |
|
64 |
|
|
65 |
private DedupConfig dedupConf; |
|
66 |
|
|
67 |
/** |
|
68 |
* lower bound for trust scaling |
|
69 |
*/ |
|
70 |
private double scaleLB; |
|
71 |
|
|
72 | 22 |
@Override |
73 |
protected void setup(final Context context) throws IOException, InterruptedException { |
|
74 |
super.setup(context); |
|
75 |
|
|
76 |
System.out.println("LIMIT: " + LIMIT); |
|
77 |
|
|
78 |
dsWhitelist.addAll(getPropertyValues(context, "broker.datasource.id.whitelist")); |
|
79 |
dsBlacklist.addAll(getPropertyValues(context, "broker.datasource.id.blacklist")); |
|
80 |
dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist")); |
|
81 |
untrustedOaDsList.addAll(getPropertyValues(context, "broker.datasource.untrusted.oa.list")); |
|
82 |
|
|
83 |
dsTypeMap = getDsTypeMap(context, dsTypeWhitelist); |
|
84 |
|
|
85 |
System.out.println("datasource whitelist: " + dsWhitelist); |
|
86 |
System.out.println("datasource blacklist: " + dsBlacklist); |
|
87 |
System.out.println("datasource OA list: " + untrustedOaDsList); |
|
88 |
|
|
89 |
System.out.println("datasource type whitelist: " + dsTypeWhitelist); |
|
90 |
|
|
91 |
final String dedupConfJson = context.getConfiguration().get(JobParams.DEDUP_CONF); |
|
92 |
|
|
93 |
System.out.println("got dedup conf: " + dedupConfJson); |
|
94 |
|
|
95 |
dedupConf = DedupConfig.load(dedupConfJson); |
|
96 |
|
|
97 |
System.out.println("parsed dedup conf: " + dedupConf.toString()); |
|
98 |
|
|
99 |
scaleLB = dedupConf.getWf().getThreshold() - 0.01; |
|
23 |
protected String counterGroup() { |
|
24 |
return "Broker Enrichment"; |
|
100 | 25 |
} |
101 | 26 |
|
102 |
private Map<String, String> getDsTypeMap(final Context context, final Set<String> dsTypeWhitelist) throws IOException { |
|
103 |
System.out.println("loading datasource typology mapping"); |
|
104 |
|
|
105 |
final Map<String, String> dsTypeMap = Maps.newHashMap(); |
|
106 |
|
|
107 |
final Scan scan = new Scan(); |
|
108 |
final FilterList fl = new FilterList(Operator.MUST_PASS_ALL); |
|
109 |
fl.addFilter(new PrefixFilter(Bytes.toBytes("10"))); |
|
110 |
scan.setFilter(fl); |
|
111 |
scan.addFamily(Bytes.toBytes("datasource")); |
|
112 |
|
|
113 |
final String tableName = context.getConfiguration().get("hbase.mapred.inputtable"); |
|
114 |
|
|
115 |
System.out.println(String.format("table name: '%s'", tableName)); |
|
116 |
|
|
117 |
try (final HTable table = new HTable(context.getConfiguration(), tableName); |
|
118 |
final ResultScanner res = table.getScanner(scan)) { |
|
119 |
|
|
120 |
for (final Result r : res) { |
|
121 |
final byte[] b = r.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body")); |
|
122 |
if (b != null) { |
|
123 |
final Oaf oaf = Oaf.parseFrom(b); |
|
124 |
final String dsId = StringUtils.substringAfter(oaf.getEntity().getId(), "|"); |
|
125 |
final String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid(); |
|
126 |
|
|
127 |
if (dsTypeWhitelist.contains(dsType)) { |
|
128 |
System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType)); |
|
129 |
dsTypeMap.put(dsId, dsType); |
|
130 |
} |
|
131 |
} |
|
132 |
} |
|
133 |
|
|
134 |
} |
|
135 |
System.out.println("datasource type map size: " + dsTypeMap.size()); |
|
136 |
return dsTypeMap; |
|
137 |
} |
|
138 |
|
|
139 | 27 |
@Override |
140 | 28 |
protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException, |
141 | 29 |
InterruptedException { |
... | ... | |
190 | 78 |
} |
191 | 79 |
} |
192 | 80 |
|
193 |
private float similarity(final Oaf oa, final Oaf ob) { |
|
194 |
|
|
195 |
final MapDocument a = ProtoDocumentBuilder.newInstance(oa.getEntity().getId(), oa.getEntity(), dedupConf.getPace().getModel()); |
|
196 |
final MapDocument b = ProtoDocumentBuilder.newInstance(ob.getEntity().getId(), ob.getEntity(), dedupConf.getPace().getModel()); |
|
197 |
|
|
198 |
final ScoreResult sr = new PaceDocumentDistance().between(a, b, dedupConf); |
|
199 |
final float score = (float) Algorithms.scale(sr.getScore(), scaleLB, 1, 0, 1); |
|
200 |
|
|
201 |
return MathUtils.round(score, SCORE_DECIMALS); |
|
202 |
} |
|
203 |
|
|
204 |
private Function<ImmutableBytesWritable, Oaf> oafDeserialiser() { |
|
205 |
return new Function<ImmutableBytesWritable, Oaf>() { |
|
206 |
|
|
207 |
@Override |
|
208 |
public Oaf apply(final ImmutableBytesWritable input) { |
|
209 |
try { |
|
210 |
return Oaf.parseFrom(input.copyBytes()); |
|
211 |
} catch (final InvalidProtocolBufferException e) { |
|
212 |
throw new IllegalArgumentException(e); |
|
213 |
} |
|
214 |
} |
|
215 |
}; |
|
216 |
} |
|
217 |
|
|
218 | 81 |
} |
Also available in: Unified diff
integrated latest changes from dnet40