Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Map;
6
import java.util.Set;
7

    
8
import com.google.common.base.Function;
9
import com.google.common.base.Joiner;
10
import com.google.common.base.Predicate;
11
import com.google.common.base.Splitter;
12
import com.google.common.collect.Iterables;
13
import com.google.common.collect.Lists;
14
import com.google.common.collect.Maps;
15
import com.google.common.collect.Sets;
16
import com.google.protobuf.InvalidProtocolBufferException;
17
import com.googlecode.protobuf.format.JsonFormat;
18
import eu.dnetlib.data.proto.FieldTypeProtos.KeyValue;
19
import eu.dnetlib.data.proto.FieldTypeProtos.StringField;
20
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
21
import eu.dnetlib.data.proto.OafProtos.Oaf;
22
import eu.dnetlib.data.proto.OafProtos.OafEntity;
23
import eu.dnetlib.data.proto.ResultProtos.Result.Instance;
24
import eu.dnetlib.data.proto.ResultProtos.Result.Metadata;
25
import org.apache.commons.lang.StringUtils;
26
import org.apache.hadoop.hbase.client.HTable;
27
import org.apache.hadoop.hbase.client.Result;
28
import org.apache.hadoop.hbase.client.ResultScanner;
29
import org.apache.hadoop.hbase.client.Scan;
30
import org.apache.hadoop.hbase.filter.FilterList;
31
import org.apache.hadoop.hbase.filter.FilterList.Operator;
32
import org.apache.hadoop.hbase.filter.PrefixFilter;
33
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
34
import org.apache.hadoop.hbase.util.Bytes;
35
import org.apache.hadoop.io.Text;
36
import org.apache.hadoop.mapreduce.Reducer;
37

    
38
/**
39
 * Created by claudio on 08/07/16.
40
 */
41
public class EnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
42

    
43
	private static final int LIMIT = 1000;
44
	private Set<String> pidType;
45

    
46
	private final static String PRODUCER_ID = "OpenAIRE";
47

    
48
	public enum Topic {
49

    
50
		PID("/ENRICH/PID"),
51
		OA_STATUS("/ENRICH/OA_STATUS"),
52
		ABSTRACT("/ENRICH/ABSTRACT"),
53
		PUBLICATION_DATE("/ENRICH/PUBLICATION_DATE");
54

    
55
		Topic(final String value){
56
			this.value = value;
57
		}
58
		protected String value;
59
		public String getValue() {
60
			return this.value;
61
		}
62
	}
63

    
64
	private Text tKey;
65
	private Text tValue;
66

    
67
	private Map<String, String> dsTypeMap = Maps.newHashMap();
68

    
69
	private Set<String> dsWhitelist = Sets.newHashSet();
70

    
71
	private Set<String> dsBlacklist = Sets.newHashSet();
72

    
73
	// This is for EuropePMC. They expose OA abstracts, but we want to identify real OA publications. WTF.
74
	private Set<String> oaDsList = Sets.newHashSet();
75

    
76
	private Set<String> dsTypeWhitelist = Sets.newHashSet();
77

    
78

    
79
	@Override
80
	protected void setup(final Context context) throws IOException, InterruptedException {
81
		super.setup(context);
82

    
83
		System.out.println("LIMIT: " + LIMIT);
84

    
85
		tKey = new Text("");
86
		tValue = new Text();
87

    
88
		pidType = Sets.newHashSet("doi", "pmc", "pmid", "urn", "arxiv");
89

    
90
		dsWhitelist.addAll(Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get("broker.datasource.id.whitelist", ""))));
91
		dsBlacklist.addAll(Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get("broker.datasource.id.blacklist", ""))));
92
		dsTypeWhitelist.addAll(Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get("broker.datasource.type.whitelist", ""))));
93
		oaDsList.addAll(Lists.newArrayList(Splitter.on(",").omitEmptyStrings().split(context.getConfiguration().get("broker.datasource.oa.list", ""))));
94

    
95
		dsTypeMap = getDsTypeMap(context, dsTypeWhitelist);
96

    
97
		System.out.println("datasource whitelist: " + dsWhitelist);
98
		System.out.println("datasource blacklist: " + dsBlacklist);
99
		System.out.println("datasource OA list: " + oaDsList);
100

    
101
		System.out.println("datasource type whitelist: " + dsTypeWhitelist);
102
	}
103

    
104
	private Map<String, String> getDsTypeMap(final Context context, final Set<String> dsTypeWhitelist) throws IOException {
105
		System.out.println("loading datasource typology mapping");
106

    
107
		final Map<String, String> dsTypeMap = Maps.newHashMap();
108

    
109
		final Scan scan = new Scan();
110
		final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
111
		fl.addFilter(new PrefixFilter(Bytes.toBytes("10")));
112
		scan.setFilter(fl);
113
		scan.addFamily(Bytes.toBytes("datasource"));
114

    
115
		final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
116

    
117
		System.out.println(String.format("table name: '%s'", tableName));
118

    
119
		final HTable table = new HTable(context.getConfiguration(), tableName);
120

    
121
		final ResultScanner res = table.getScanner(scan);
122

    
123
		for(Result r : res) {
124
			final byte[] b = r.getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
125
			if (b != null) {
126
				final Oaf oaf = Oaf.parseFrom(b);
127
				final String dsId = StringUtils.substringAfter(oaf.getEntity().getId(), "|");
128
				final String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
129

    
130
				if (dsTypeWhitelist.contains(dsType)) {
131
					System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType));
132
					dsTypeMap.put(dsId, dsType);
133
				}
134
			}
135
		}
136

    
137
		res.close();
138

    
139
		System.out.println("datasource type map size: " + dsTypeMap.size());
140
		return dsTypeMap;
141
	}
142

    
143
	@Override
144
	protected void reduce(final ImmutableBytesWritable key, final Iterable<ImmutableBytesWritable> values, final Context context) throws IOException,
145
			InterruptedException {
146

    
147
		final List<Oaf> oafList = Lists.newArrayList(Iterables.transform(Iterables.limit(values, LIMIT), oafDeserialiser()));
148

    
149
		generateEvents(oafList, context);
150
	}
151

    
152
	private void generateEvents(final List<Oaf> oafList, final Context context) throws IOException, InterruptedException {
153

    
154
		for(Oaf current : oafList) {
155

    
156
			final String currentId = current.getEntity().getId();
157

    
158
			final String currentDsId = StringUtils.substringAfter(getKey(current.getEntity().getCollectedfromList()), "|");
159
			final String currentDsType = dsTypeMap.get(currentDsId);
160

    
161
			//System.out.println(String.format("'%s' -> '%s'", currentDsId, currentDsType));
162

    
163
			if (StringUtils.isBlank(currentDsType)) {
164
				context.getCounter("target datasource", "events skipped").increment(1);
165
			} else {
166

    
167
				//if (dsWhitelist.contains(currentDsId) | !dsBlacklist.contains(currentDsId)) {
168

    
169
				for (Oaf other : oafList) {
170

    
171
					final String otherId = other.getEntity().getId();
172
					if (!currentId.equals(otherId)) {
173

    
174
						//PIDS
175
						for (final String type : pidType) {
176
							if (!hasPid(current, type) && hasPid(other, type)) {
177
								final Oaf.Builder prototype = Oaf.newBuilder(current);
178
								final Iterable<StructuredProperty> pids = Iterables.filter(other.getEntity().getPidList(), new Predicate<StructuredProperty>() {
179
									@Override
180
									public boolean apply(final StructuredProperty pid) {
181
										return pid.getQualifier().getClassid().equalsIgnoreCase(type);
182
									}
183
								});
184
								prototype.getEntityBuilder().addAllPid(pids);
185
								emit(asEvent(prototype.build(), Topic.PID.getValue(), other), context);
186
								context.getCounter("event", Topic.PID.getValue()).increment(1);
187
							}
188
						}
189

    
190
						final String otherDsId = StringUtils.substringAfter(getKey(other.getEntity().getCollectedfromList()), "|");
191

    
192
						//OPEN ACCESS STATUS
193
						if ((oaDsList.contains(currentDsId) || !hasAccess(current, "OPEN", false)) && (hasAccess(other, "OPEN", false) && !oaDsList.contains(otherDsId))) {
194
							final Oaf.Builder prototype = Oaf.newBuilder(current);
195
							final Iterable<Instance> i = Iterables.filter(other.getEntity().getResult().getInstanceList(), new Predicate<Instance>() {
196
								@Override
197
								public boolean apply(final Instance i) {
198
									return "OPEN".equalsIgnoreCase(i.getLicence().getClassid());
199
								}
200
							});
201
							prototype.getEntityBuilder().getResultBuilder().addAllInstance(i);
202
							emit(asEvent(prototype.build(), Topic.OA_STATUS.getValue(), other), context);
203
							context.getCounter("event", Topic.OA_STATUS.getValue()).increment(1);
204
						}
205

    
206
						//ABSTRACT
207
						if (!hasAbstract(current) && hasAbstract(other)) {
208
							final Oaf.Builder prototype = Oaf.newBuilder(current);
209
							final List<StringField> descriptionList = other.getEntity().getResult().getMetadata().getDescriptionList();
210
							prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().addAllDescription(descriptionList);
211
							emit(asEvent(prototype.build(), Topic.ABSTRACT.getValue(), other), context);
212
							context.getCounter("event", Topic.ABSTRACT.getValue()).increment(1);
213
						}
214

    
215
						//PUBLICATION DATE
216
						if (!hasPubDate(current) && hasPubDate(other)) {
217
							final Oaf.Builder prototype = Oaf.newBuilder(current);
218

    
219
							final StringField date = other.getEntity().getResult().getMetadata().getDateofacceptance();
220
							prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().setDateofacceptance(date);
221
							emit(asEvent(prototype.build(), Topic.PUBLICATION_DATE.getValue(), other), context);
222
							context.getCounter("event", Topic.PUBLICATION_DATE.getValue()).increment(1);
223
						}
224
					}
225
				}
226
			}
227
			/*} else {
228
				context.getCounter("target datasource", "events skipped").increment(1);
229
			}*/
230
		}
231
	}
232

    
233
	private void emit(final EventMessage e, final Context context) throws IOException, InterruptedException {
234
		//tKey.set(e.getMap().get("id"));
235
		tValue.set(e.toString());
236
		context.write(tKey, tValue);
237
	}
238

    
239
	private EventMessage asEvent(final Oaf oaf, final String topic, final Oaf source) {
240
		final Map<String, String> map = Maps.newHashMap();
241

    
242
		//TARGET INFO
243
		final String targetDatasourceName = getValue(oaf.getEntity().getCollectedfromList());
244
		if (StringUtils.isNotBlank(targetDatasourceName)) {
245
			map.put("target.datasource.name", targetDatasourceName);
246
		}
247

    
248
		final String title = getValue(oaf.getEntity().getResult().getMetadata().getTitleList());
249
		if (StringUtils.isNotBlank(title)) {
250
			map.put("target.publication.title", title);
251
		}
252

    
253
		map.put("target.publication.id", getValue(oaf.getEntity().getOriginalIdList()));
254
		map.put("trust", "1");
255

    
256
		//PROVENANCE INFO
257
		final String sourceDatasourceName = getValue(source.getEntity().getCollectedfromList());
258
		if (StringUtils.isNotBlank(sourceDatasourceName)) {
259
			map.put("provenance.datasource.name", sourceDatasourceName);
260
		}
261

    
262
		final String sourceOriginalIds = Joiner.on(",").join(getValues(source.getEntity().getOriginalIdList()));
263
		if (StringUtils.isNotBlank(sourceOriginalIds)) {
264
			map.put("provenance.publication.id.csv", sourceOriginalIds);
265
		}
266

    
267
		//final OpenAireEventPayload p = new OpenAireEventPayload(oaf.getEntity().getId());
268

    
269
		return new EventMessage(PRODUCER_ID, topic, JsonFormat.printToString(oaf), EventMessage.TTH_INFINITE, map);
270
	}
271

    
272
	private boolean hasPubDate(final Oaf current) {
273
		final Metadata m = current.getEntity().getResult().getMetadata();
274
		return StringUtils.isNotBlank(m.getDateofacceptance().getValue());
275
	}
276

    
277
	private boolean hasAbstract(final Oaf oaf) {
278
		return Iterables.all(oaf.getEntity().getResult().getMetadata().getDescriptionList(), new Predicate<StringField>() {
279
			@Override
280
			public boolean apply(final StringField s) {
281
				return StringUtils.isBlank(s.getValue());
282
			}
283
		});
284
	}
285

    
286
	private boolean hasAccess(final Oaf oaf, final String access, final boolean strict) {
287
		return Iterables.all(oaf.getEntity().getChildrenList(), new Predicate<OafEntity>() {
288
			@Override
289
			public boolean apply(final OafEntity entity) {
290
				final Predicate<Instance> p = new Predicate<Instance>() {
291
					@Override
292
					public boolean apply(final Instance i) {
293
						return access.equalsIgnoreCase(i.getLicence().getClassid());
294
					}
295
				};
296
				return strict ? Iterables.all(entity.getResult().getInstanceList(), p) :  Iterables.any(entity.getResult().getInstanceList(), p);
297
			}
298
		});
299
	}
300

    
301
	private boolean hasPid(final Oaf oaf, final String type) {
302
		return Iterables.any(oaf.getEntity().getPidList(), new Predicate<StructuredProperty>() {
303
			@Override
304
			public boolean apply(final StructuredProperty pid) {
305
				return pid.getQualifier().getClassid().equalsIgnoreCase(type);
306
			}
307
		});
308
	}
309

    
310
	private Function<ImmutableBytesWritable, Oaf> oafDeserialiser() {
311
		return new Function<ImmutableBytesWritable, Oaf>() {
312
			@Override
313
			public Oaf apply(final ImmutableBytesWritable input) {
314
				try {
315
					return Oaf.parseFrom(input.copyBytes());
316
				} catch (InvalidProtocolBufferException e) {
317
					throw new IllegalArgumentException(e);
318
				}
319
			}
320
		};
321
	}
322

    
323
	private <T> String getValue(Iterable<T> ts) {
324
		return Iterables.getFirst(getValues(ts), "");
325
	}
326

    
327
	private <T> List<String> getValues(Iterable<T> ts) {
328
		return Lists.newArrayList(Iterables.transform(ts, new Function<T, String>() {
329
			@Override
330
			public String apply(final T t) {
331
				if (t instanceof StructuredProperty) return ((StructuredProperty) t).getValue();
332
				if (t instanceof KeyValue) return ((KeyValue) t).getValue();
333
				if (t instanceof String) return (String) t;
334

    
335
				return "";
336
			}
337
		}));
338
	}
339

    
340
	private <T> String getKey(Iterable<T> ts) {
341
		return Iterables.getFirst(getKeys(ts), "");
342
	}
343

    
344
	private <T> List<String> getKeys(Iterable<T> ts) {
345
		return Lists.newArrayList(Iterables.transform(ts, new Function<T, String>() {
346
			@Override
347
			public String apply(final T t) {
348
				if (t instanceof KeyValue) return ((KeyValue) t).getKey();
349
				if (t instanceof String) return (String) t;
350

    
351
				return "";
352
			}
353
		}));
354
	}
355

    
356

    
357
}
(7-7/11)