1
|
package eu.dnetlib.data.mapreduce.hbase.broker;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.util.List;
|
5
|
import java.util.Map;
|
6
|
import java.util.Set;
|
7
|
|
8
|
import com.google.common.base.Function;
|
9
|
import com.google.common.base.Joiner;
|
10
|
import com.google.common.base.Predicate;
|
11
|
import com.google.common.base.Splitter;
|
12
|
import com.google.common.collect.Iterables;
|
13
|
import com.google.common.collect.Lists;
|
14
|
import com.google.common.collect.Maps;
|
15
|
import com.google.common.collect.Sets;
|
16
|
import com.google.protobuf.InvalidProtocolBufferException;
|
17
|
import com.googlecode.protobuf.format.JsonFormat;
|
18
|
import eu.dnetlib.data.proto.FieldTypeProtos.KeyValue;
|
19
|
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
|
20
|
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
|
21
|
import eu.dnetlib.data.proto.OafProtos.Oaf;
|
22
|
import eu.dnetlib.data.proto.OafProtos.OafEntity;
|
23
|
import eu.dnetlib.data.proto.ResultProtos.Result.Instance;
|
24
|
import eu.dnetlib.data.proto.ResultProtos.Result.Metadata;
|
25
|
import org.apache.commons.lang.StringUtils;
|
26
|
import org.apache.hadoop.hbase.client.HTable;
|
27
|
import org.apache.hadoop.hbase.client.Result;
|
28
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
29
|
import org.apache.hadoop.hbase.client.Scan;
|
30
|
import org.apache.hadoop.hbase.filter.FilterList;
|
31
|
import org.apache.hadoop.hbase.filter.FilterList.Operator;
|
32
|
import org.apache.hadoop.hbase.filter.PrefixFilter;
|
33
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
34
|
import org.apache.hadoop.hbase.util.Bytes;
|
35
|
import org.apache.hadoop.io.Text;
|
36
|
import org.apache.hadoop.mapreduce.Reducer;
|
37
|
|
38
|
/**
|
39
|
* Created by claudio on 08/07/16.
|
40
|
*/
|
41
|
public class EnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
|
42
|
|
43
|
private static final int LIMIT = 1000;
|
44
|
private Set<String> pidType;
|
45
|
|
46
|
private final static String PRODUCER_ID = "OpenAIRE";
|
47
|
|
48
|
public enum Topic {
|
49
|
|
50
|
PID("/ENRICH/PID"),
|
51
|
OA_STATUS("/ENRICH/OA_STATUS"),
|
52
|
ABSTRACT("/ENRICH/ABSTRACT"),
|
53
|
PUBLICATION_DATE("/ENRICH/PUBLICATION_DATE");
|
54
|
|
55
|
Topic(final String value){
|
56
|
this.value = value;
|
57
|
}
|
58
|
protected String value;
|
59
|
public String getValue() {
|
60
|
return this.value;
|
61
|
}
|
62
|
}
|
63
|
|
64
|
private Text tKey;
|
65
|
private Text tValue;
|
66
|
|
67
|
private Map<String, String> dsTypeMap = Maps.newHashMap();
|
68
|
|
69
|
private Set<String> dsWhitelist = Sets.newHashSet();
|
70
|
|
71
|
private Set<String> dsBlacklist = Sets.newHashSet();
|
72
|
|
73
|
// This is for EuropePMC. They expose OA abstracts, but we want to identify real OA publications. WTF.
|
74
|
private Set<String> oaDsList = Sets.newHashSet();
|
75
|
|
76
|
private Set<String> dsTypeWhitelist = Sets.newHashSet();
|
77
|
|
78
|
|
79
|
@Override
|
80
|
protected void setup(final Context context) throws IOException, InterruptedException {
|
81
|
super.setup(context);
|
82
|
|
83
|
System.out.println("LIMIT: " + LIMIT);
|
84
|
|
85
|
tKey = new Text("");
|
86
|
tValue = new Text();
|
87
|
|
88
|
pidType = Sets.newHashSet("doi", "pmc", "pmid", "urn", "arxiv");
|
89
|
|
90
|
dsWhitelist.addAll(Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get("broker.datasource.id.whitelist", ""))));
|
91
|
dsBlacklist.addAll(Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get("broker.datasource.id.blacklist", ""))));
|
92
|
dsTypeWhitelist.addAll(Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get("broker.datasource.type.whitelist", ""))));
|
93
|
oaDsList.addAll(Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get("broker.datasource.oa.list", ""))));
|
94
|
|
95
|
dsTypeMap = getDsTypeMap(context, dsTypeWhitelist);
|
96
|
|
97
|
System.out.println("datasource whitelist: " + dsWhitelist);
|
98
|
System.out.println("datasource blacklist: " + dsBlacklist);
|
99
|
System.out.println("datasource OA list: " + oaDsList);
|
100
|
|
101
|
System.out.println("datasource type whitelist: " + dsTypeWhitelist);
|
102
|
}
|
103
|
|
104
|
private Map<String, String> getDsTypeMap(final Context context, final Set<String> dsTypeWhitelist) throws IOException {
|
105
|
System.out.println("loading datasource typology mapping");
|
106
|
|
107
|
final Map<String, String> dsTypeMap = Maps.newHashMap();
|
108
|
|
109
|
final Scan scan = new Scan();
|
110
|
final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
|
111
|
fl.addFilter(new PrefixFilter(Bytes.toBytes("10")));
|
112
|
scan.setFilter(fl);
|
113
|
scan.addFamily(Bytes.toBytes("datasource"));
|
114
|
|
115
|
final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
|
116
|
|
117
|
System.out.println(String.format("table name: '%s'", tableName));
|
118
|
|
119
|
final HTable table = new HTable(context.getConfiguration(), tableName);
|
120
|
|
121
|
final ResultScanner res = table.getScanner(scan);
|
122
|
|
123
|
for(Result r : res) {
|
124
|
final byte[] b = r.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
|
125
|
if (b != null) {
|
126
|
final Oaf oaf = Oaf.parseFrom(b);
|
127
|
final String dsId = StringUtils.substringAfter(oaf.getEntity().getId(), "|");
|
128
|
final String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
|
129
|
|
130
|
if (dsTypeWhitelist.contains(dsType)) {
|
131
|
System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType));
|
132
|
dsTypeMap.put(dsId, dsType);
|
133
|
}
|
134
|
}
|
135
|
}
|
136
|
|
137
|
res.close();
|
138
|
|
139
|
System.out.println("datasource type map size: " + dsTypeMap.size());
|
140
|
return dsTypeMap;
|
141
|
}
|
142
|
|
143
|
@Override
|
144
|
protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
|
145
|
InterruptedException {
|
146
|
|
147
|
final List<Oaf> oafList = Lists.newArrayList(Iterables.transform(Iterables.limit(values, LIMIT), oafDeserialiser()));
|
148
|
|
149
|
generateEvents(oafList, context);
|
150
|
}
|
151
|
|
152
|
private void generateEvents(final List<Oaf> oafList, final Context context) throws IOException, InterruptedException {
|
153
|
|
154
|
for(Oaf current : oafList) {
|
155
|
|
156
|
final String currentId = current.getEntity().getId();
|
157
|
|
158
|
final String currentDsId = StringUtils.substringAfter(getKey(current.getEntity().getCollectedfromList()), "|");
|
159
|
final String currentDsType = dsTypeMap.get(currentDsId);
|
160
|
|
161
|
//System.out.println(String.format("'%s' -> '%s'", currentDsId, currentDsType));
|
162
|
|
163
|
if (StringUtils.isBlank(currentDsType)) {
|
164
|
context.getCounter("target datasource", "events skipped").increment(1);
|
165
|
} else {
|
166
|
|
167
|
//if (dsWhitelist.contains(currentDsId) | !dsBlacklist.contains(currentDsId)) {
|
168
|
|
169
|
for (Oaf other : oafList) {
|
170
|
|
171
|
final String otherId = other.getEntity().getId();
|
172
|
if (!currentId.equals(otherId)) {
|
173
|
|
174
|
//PIDS
|
175
|
for (final String type : pidType) {
|
176
|
if (!hasPid(current, type) && hasPid(other, type)) {
|
177
|
final Oaf.Builder prototype = Oaf.newBuilder(current);
|
178
|
final Iterable<StructuredProperty> pids = Iterables.filter(other.getEntity().getPidList(), new Predicate<StructuredProperty>() {
|
179
|
@Override
|
180
|
public boolean apply(final StructuredProperty pid) {
|
181
|
return pid.getQualifier().getClassid().equalsIgnoreCase(type);
|
182
|
}
|
183
|
});
|
184
|
prototype.getEntityBuilder().addAllPid(pids);
|
185
|
emit(asEvent(prototype.build(), Topic.PID.getValue(), other), context);
|
186
|
context.getCounter("event", Topic.PID.getValue()).increment(1);
|
187
|
}
|
188
|
}
|
189
|
|
190
|
final String otherDsId = StringUtils.substringAfter(getKey(other.getEntity().getCollectedfromList()), "|");
|
191
|
|
192
|
//OPEN ACCESS STATUS
|
193
|
if ((oaDsList.contains(currentDsId) || !hasAccess(current, "OPEN", false)) && (hasAccess(other, "OPEN", false) && !oaDsList.contains(otherDsId))) {
|
194
|
final Oaf.Builder prototype = Oaf.newBuilder(current);
|
195
|
final Iterable<Instance> i = Iterables.filter(other.getEntity().getResult().getInstanceList(), new Predicate<Instance>() {
|
196
|
@Override
|
197
|
public boolean apply(final Instance i) {
|
198
|
return "OPEN".equalsIgnoreCase(i.getLicence().getClassid());
|
199
|
}
|
200
|
});
|
201
|
prototype.getEntityBuilder().getResultBuilder().addAllInstance(i);
|
202
|
emit(asEvent(prototype.build(), Topic.OA_STATUS.getValue(), other), context);
|
203
|
context.getCounter("event", Topic.OA_STATUS.getValue()).increment(1);
|
204
|
}
|
205
|
|
206
|
//ABSTRACT
|
207
|
if (!hasAbstract(current) && hasAbstract(other)) {
|
208
|
final Oaf.Builder prototype = Oaf.newBuilder(current);
|
209
|
final List<StringField> descriptionList = other.getEntity().getResult().getMetadata().getDescriptionList();
|
210
|
prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().addAllDescription(descriptionList);
|
211
|
emit(asEvent(prototype.build(), Topic.ABSTRACT.getValue(), other), context);
|
212
|
context.getCounter("event", Topic.ABSTRACT.getValue()).increment(1);
|
213
|
}
|
214
|
|
215
|
//PUBLICATION DATE
|
216
|
if (!hasPubDate(current) && hasPubDate(other)) {
|
217
|
final Oaf.Builder prototype = Oaf.newBuilder(current);
|
218
|
|
219
|
final StringField date = other.getEntity().getResult().getMetadata().getDateofacceptance();
|
220
|
prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().setDateofacceptance(date);
|
221
|
emit(asEvent(prototype.build(), Topic.PUBLICATION_DATE.getValue(), other), context);
|
222
|
context.getCounter("event", Topic.PUBLICATION_DATE.getValue()).increment(1);
|
223
|
}
|
224
|
}
|
225
|
}
|
226
|
}
|
227
|
/*} else {
|
228
|
context.getCounter("target datasource", "events skipped").increment(1);
|
229
|
}*/
|
230
|
}
|
231
|
}
|
232
|
|
233
|
private void emit(final EventMessage e, final Context context) throws IOException, InterruptedException {
|
234
|
//tKey.set(e.getMap().get("id"));
|
235
|
tValue.set(e.toString());
|
236
|
context.write(tKey, tValue);
|
237
|
}
|
238
|
|
239
|
private EventMessage asEvent(final Oaf oaf, final String topic, final Oaf source) {
|
240
|
final Map<String, String> map = Maps.newHashMap();
|
241
|
|
242
|
//TARGET INFO
|
243
|
final String targetDatasourceName = getValue(oaf.getEntity().getCollectedfromList());
|
244
|
if (StringUtils.isNotBlank(targetDatasourceName)) {
|
245
|
map.put("target.datasource.name", targetDatasourceName);
|
246
|
}
|
247
|
|
248
|
final String title = getValue(oaf.getEntity().getResult().getMetadata().getTitleList());
|
249
|
if (StringUtils.isNotBlank(title)) {
|
250
|
map.put("target.publication.title", title);
|
251
|
}
|
252
|
|
253
|
map.put("target.publication.id", getValue(oaf.getEntity().getOriginalIdList()));
|
254
|
map.put("trust", "1");
|
255
|
|
256
|
//PROVENANCE INFO
|
257
|
final String sourceDatasourceName = getValue(source.getEntity().getCollectedfromList());
|
258
|
if (StringUtils.isNotBlank(sourceDatasourceName)) {
|
259
|
map.put("provenance.datasource.name", sourceDatasourceName);
|
260
|
}
|
261
|
|
262
|
final String sourceOriginalIds = Joiner.on(",").join(getValues(source.getEntity().getOriginalIdList()));
|
263
|
if (StringUtils.isNotBlank(sourceOriginalIds)) {
|
264
|
map.put("provenance.publication.id.csv", sourceOriginalIds);
|
265
|
}
|
266
|
|
267
|
//final OpenAireEventPayload p = new OpenAireEventPayload(oaf.getEntity().getId());
|
268
|
|
269
|
return new EventMessage(PRODUCER_ID, topic, JsonFormat.printToString(oaf), EventMessage.TTH_INFINITE, map);
|
270
|
}
|
271
|
|
272
|
private boolean hasPubDate(final Oaf current) {
|
273
|
final Metadata m = current.getEntity().getResult().getMetadata();
|
274
|
return StringUtils.isNotBlank(m.getDateofacceptance().getValue());
|
275
|
}
|
276
|
|
277
|
private boolean hasAbstract(final Oaf oaf) {
|
278
|
return Iterables.all(oaf.getEntity().getResult().getMetadata().getDescriptionList(), new Predicate<StringField>() {
|
279
|
@Override
|
280
|
public boolean apply(final StringField s) {
|
281
|
return StringUtils.isBlank(s.getValue());
|
282
|
}
|
283
|
});
|
284
|
}
|
285
|
|
286
|
private boolean hasAccess(final Oaf oaf, final String access, final boolean strict) {
|
287
|
return Iterables.all(oaf.getEntity().getChildrenList(), new Predicate<OafEntity>() {
|
288
|
@Override
|
289
|
public boolean apply(final OafEntity entity) {
|
290
|
final Predicate<Instance> p = new Predicate<Instance>() {
|
291
|
@Override
|
292
|
public boolean apply(final Instance i) {
|
293
|
return access.equalsIgnoreCase(i.getLicence().getClassid());
|
294
|
}
|
295
|
};
|
296
|
return strict ? Iterables.all(entity.getResult().getInstanceList(), p) : Iterables.any(entity.getResult().getInstanceList(), p);
|
297
|
}
|
298
|
});
|
299
|
}
|
300
|
|
301
|
private boolean hasPid(final Oaf oaf, final String type) {
|
302
|
return Iterables.any(oaf.getEntity().getPidList(), new Predicate<StructuredProperty>() {
|
303
|
@Override
|
304
|
public boolean apply(final StructuredProperty pid) {
|
305
|
return pid.getQualifier().getClassid().equalsIgnoreCase(type);
|
306
|
}
|
307
|
});
|
308
|
}
|
309
|
|
310
|
private Function<ImmutableBytesWritable, Oaf> oafDeserialiser() {
|
311
|
return new Function<ImmutableBytesWritable, Oaf>() {
|
312
|
@Override
|
313
|
public Oaf apply(final ImmutableBytesWritable input) {
|
314
|
try {
|
315
|
return Oaf.parseFrom(input.copyBytes());
|
316
|
} catch (InvalidProtocolBufferException e) {
|
317
|
throw new IllegalArgumentException(e);
|
318
|
}
|
319
|
}
|
320
|
};
|
321
|
}
|
322
|
|
323
|
private <T> String getValue(Iterable<T> ts) {
|
324
|
return Iterables.getFirst(getValues(ts), "");
|
325
|
}
|
326
|
|
327
|
private <T> List<String> getValues(Iterable<T> ts) {
|
328
|
return Lists.newArrayList(Iterables.transform(ts, new Function<T, String>() {
|
329
|
@Override
|
330
|
public String apply(final T t) {
|
331
|
if (t instanceof StructuredProperty) return ((StructuredProperty) t).getValue();
|
332
|
if (t instanceof KeyValue) return ((KeyValue) t).getValue();
|
333
|
if (t instanceof String) return (String) t;
|
334
|
|
335
|
return "";
|
336
|
}
|
337
|
}));
|
338
|
}
|
339
|
|
340
|
private <T> String getKey(Iterable<T> ts) {
|
341
|
return Iterables.getFirst(getKeys(ts), "");
|
342
|
}
|
343
|
|
344
|
private <T> List<String> getKeys(Iterable<T> ts) {
|
345
|
return Lists.newArrayList(Iterables.transform(ts, new Function<T, String>() {
|
346
|
@Override
|
347
|
public String apply(final T t) {
|
348
|
if (t instanceof KeyValue) return ((KeyValue) t).getKey();
|
349
|
if (t instanceof String) return (String) t;
|
350
|
|
351
|
return "";
|
352
|
}
|
353
|
}));
|
354
|
}
|
355
|
|
356
|
|
357
|
}
|