Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.stream.Collectors;
6

    
7
import com.google.common.collect.Lists;
8
import eu.dnetlib.broker.objects.OpenAireEventPayload;
9
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
10
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
11
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
12
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.OafProtos.OafEntity;
15
import eu.dnetlib.data.proto.ResultProtos.Result.Metadata;
16
import org.apache.commons.lang.StringUtils;
17
import org.apache.hadoop.mapreduce.Reducer.Context;
18
import org.dom4j.DocumentException;
19

    
20
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
21

    
22
/**
23
 * Created by claudio on 16/11/2016.
24
 */
25
public class SubjectEventFactory {
26

    
27
	public static List<EventWrapper> process(final Context context, final Oaf current, final Oaf other, final float trust)
28
			throws IOException, InterruptedException, DocumentException {
29
		return new SubjectEventFactory().processSubjectPair(context, current, other, trust);
30
	}
31

    
32
	public static List<EventWrapper> process(final Context context, final Oaf current)
33
			throws IOException, InterruptedException, DocumentException {
34
		return new SubjectEventFactory().processSubjectSingle(context, current);
35
	}
36

    
37
	private List<EventWrapper> processSubjectSingle(final Context context, final Oaf current) throws InterruptedException, DocumentException, IOException {
38
		final Metadata mCurrent = current.getEntity().getResult().getMetadata();
39
		final List<EventWrapper> events = Lists.newArrayList();
40
		if (mCurrent.getSubjectList().isEmpty()) {
41
			for (final StructuredProperty subject : mCurrent.getSubjectList()) {
42

    
43
				if (subject.hasDataInfo() && subject.getDataInfo().getInferred()) {
44
					events.add(
45
							doProcessSubject(context, current, null, subject, getSubjectTopic(subject, true), Float.valueOf(subject.getDataInfo().getTrust())));
46
				}
47
			}
48
		}
49
		return events;
50
	}
51

    
52
	private List<EventWrapper> processSubjectPair(final Context context, final Oaf current, final Oaf other, final Float trust)
53
			throws IOException, InterruptedException {
54

    
55
		final Metadata mCurrent = current.getEntity().getResult().getMetadata();
56
		final Metadata mOther = other.getEntity().getResult().getMetadata();
57
		final List<EventWrapper> events = Lists.newArrayList();
58

    
59
		if (mCurrent.getSubjectList().isEmpty()) {
60
			for (final StructuredProperty subjectOther : mOther.getSubjectList()) {
61
				events.add(doProcessSubject(context, current, other, subjectOther, getSubjectTopic(subjectOther, true), trust));
62
			}
63
		}
64

    
65
		if (other != null) {
66
			for (final StructuredProperty subjectOther : mOther.getSubjectList()) {
67
				if (!hasSubjectValue(current, subjectOther.getQualifier().getClassid(), subjectOther.getValue())) {
68
					final EventWrapper e = doProcessSubject(context, current, other, subjectOther, getSubjectTopic(subjectOther, false), trust);
69
					if (e != null) {
70
						events.add(e);
71
					}
72
				}
73
			}
74
		}
75

    
76
		return events;
77
	}
78

    
79
	private EventWrapper doProcessSubject(final Context context,
80
			final Oaf current,
81
			final Oaf other,
82
			final StructuredProperty subjectOther,
83
			final Topic topic,
84
			final Float trust) throws IOException, InterruptedException {
85

    
86
		if (topic == null) {
87
			context.getCounter("events skipped", "SUBJECT not mapped").increment(1);
88
			return null;
89
		}
90
		// we got java.lang.NumberFormatException: empty String, because of empty trust values. Default to 1.0 in such cases
91
		final String t = subjectOther.getDataInfo().getTrust();
92
		final float adjustedTrust = trust * Float.valueOf(StringUtils.isBlank(t) ? "1.0" : t);
93
		final Oaf.Builder prototype = Oaf.newBuilder(current);
94
		prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().addSubject(subjectOther);
95
		final Oaf oaf = prototype.build();
96

    
97
		final OafEntity eventEntity = other != null ? other.getEntity() : current.getEntity();
98

    
99
		final OpenAireEventPayload payload =
100
				HighlightFactory.highlightEnrichSubject(OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), eventEntity, adjustedTrust),
101
						Lists.newArrayList(subjectOther));
102
		return EventWrapper.newInstance(
103
				asEvent(oaf.getEntity(), topic, payload, eventEntity, adjustedTrust),
104
				payload.getHighlight().getSubjects().stream().filter(StringUtils::isNotBlank).sorted().collect(Collectors.joining(", ")),
105
				topic.getValue());
106
	}
107

    
108
	private Topic getSubjectTopic(final StructuredProperty subject, final boolean missingOrMore) {
109
		switch (subject.getQualifier().getClassid()) {
110
		case "mesheuropmc":
111
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_MESHEUROPMC : Topic.ENRICH_MORE_SUBJECT_MESHEUROPMC;
112
		case "arxiv":
113
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_ARXIV : Topic.ENRICH_MORE_SUBJECT_ARXIV;
114
		case "jel":
115
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_JEL : Topic.ENRICH_MORE_SUBJECT_JEL;
116
		case "ddc":
117
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_DDC : Topic.ENRICH_MORE_SUBJECT_DDC;
118
		case "acm":
119
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_ACM : Topic.ENRICH_MORE_SUBJECT_ACM;
120
		case "rvk":
121
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_RVK : Topic.ENRICH_MORE_SUBJECT_RVK;
122
		default:
123
			return null;
124
		}
125
	}
126

    
127
	private boolean hasSubjectValue(final Oaf oaf, final String classId, final String value) {
128
		return oaf.getEntity().getResult().getMetadata().getSubjectList()
129
				.stream()
130
				.anyMatch(s -> s.getValue().equalsIgnoreCase(value) & s.getQualifier().getClassid().endsWith(classId));
131
	}
132

    
133
}
(9-9/10)