Project

General

Profile

1 44584 claudio.at
package eu.dnetlib.data.mapreduce.hbase.broker;
2
3
import java.io.IOException;
4 49832 claudio.at
import java.util.List;
5 52765 michele.ar
import java.util.stream.Collectors;
6 44584 claudio.at
7
import com.google.common.collect.Lists;
8
import eu.dnetlib.broker.objects.OpenAireEventPayload;
9
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
10
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
11 49832 claudio.at
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
12 44584 claudio.at
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.OafProtos.OafEntity;
15
import eu.dnetlib.data.proto.ResultProtos.Result.Metadata;
16 52777 claudio.at
import org.apache.commons.lang.StringUtils;
17
import org.apache.hadoop.mapreduce.Reducer.Context;
18
import org.dom4j.DocumentException;
19 44584 claudio.at
20 52777 claudio.at
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
21
22 44584 claudio.at
/**
23
 * Created by claudio on 16/11/2016.
24
 */
25
public class SubjectEventFactory {
26
27 49832 claudio.at
	public static List<EventWrapper> process(final Context context, final Oaf current, final Oaf other, final float trust)
28 44584 claudio.at
			throws IOException, InterruptedException, DocumentException {
29 49832 claudio.at
		return new SubjectEventFactory().processSubjectPair(context, current, other, trust);
30 44584 claudio.at
	}
31
32 49832 claudio.at
	public static List<EventWrapper> process(final Context context, final Oaf current)
33 44584 claudio.at
			throws IOException, InterruptedException, DocumentException {
34 49832 claudio.at
		return new SubjectEventFactory().processSubjectSingle(context, current);
35 44584 claudio.at
	}
36
37 49832 claudio.at
	private List<EventWrapper> processSubjectSingle(final Context context, final Oaf current) throws InterruptedException, DocumentException, IOException {
38 44584 claudio.at
		final Metadata mCurrent = current.getEntity().getResult().getMetadata();
39 49832 claudio.at
		final List<EventWrapper> events = Lists.newArrayList();
40 44584 claudio.at
		if (mCurrent.getSubjectList().isEmpty()) {
41
			for (final StructuredProperty subject : mCurrent.getSubjectList()) {
42
43
				if (subject.hasDataInfo() && subject.getDataInfo().getInferred()) {
44 52765 michele.ar
					events.add(
45
							doProcessSubject(context, current, null, subject, getSubjectTopic(subject, true), Float.valueOf(subject.getDataInfo().getTrust())));
46 44584 claudio.at
				}
47
			}
48
		}
49 49832 claudio.at
		return events;
50 44584 claudio.at
	}
51
52 52765 michele.ar
	private List<EventWrapper> processSubjectPair(final Context context, final Oaf current, final Oaf other, final Float trust)
53
			throws IOException, InterruptedException {
54 44584 claudio.at
55
		final Metadata mCurrent = current.getEntity().getResult().getMetadata();
56
		final Metadata mOther = other.getEntity().getResult().getMetadata();
57 49832 claudio.at
		final List<EventWrapper> events = Lists.newArrayList();
58 44584 claudio.at
59
		if (mCurrent.getSubjectList().isEmpty()) {
60
			for (final StructuredProperty subjectOther : mOther.getSubjectList()) {
61 49832 claudio.at
				events.add(doProcessSubject(context, current, other, subjectOther, getSubjectTopic(subjectOther, true), trust));
62 44584 claudio.at
			}
63
		}
64
65
		if (other != null) {
66
			for (final StructuredProperty subjectOther : mOther.getSubjectList()) {
67
				if (!hasSubjectValue(current, subjectOther.getQualifier().getClassid(), subjectOther.getValue())) {
68 52777 claudio.at
					final EventWrapper e = doProcessSubject(context, current, other, subjectOther, getSubjectTopic(subjectOther, false), trust);
69
					if (e != null) {
70
						events.add(e);
71
					}
72 44584 claudio.at
				}
73
			}
74
		}
75 49832 claudio.at
76
		return events;
77 44584 claudio.at
	}
78
79 49832 claudio.at
	private EventWrapper doProcessSubject(final Context context,
80 44584 claudio.at
			final Oaf current,
81
			final Oaf other,
82
			final StructuredProperty subjectOther,
83
			final Topic topic,
84
			final Float trust) throws IOException, InterruptedException {
85
86
		if (topic == null) {
87 48145 claudio.at
			context.getCounter("events skipped", "SUBJECT not mapped").increment(1);
88 49832 claudio.at
			return null;
89 44584 claudio.at
		}
90 48145 claudio.at
		// we got java.lang.NumberFormatException: empty String, because of empty trust values. Default to 1.0 in such cases
91
		final String t = subjectOther.getDataInfo().getTrust();
92
		final float adjustedTrust = trust * Float.valueOf(StringUtils.isBlank(t) ? "1.0" : t);
93 44584 claudio.at
		final Oaf.Builder prototype = Oaf.newBuilder(current);
94
		prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().addSubject(subjectOther);
95
		final Oaf oaf = prototype.build();
96
97
		final OafEntity eventEntity = other != null ? other.getEntity() : current.getEntity();
98
99
		final OpenAireEventPayload payload =
100 48145 claudio.at
				HighlightFactory.highlightEnrichSubject(OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), eventEntity, adjustedTrust),
101 44584 claudio.at
						Lists.newArrayList(subjectOther));
102 49832 claudio.at
		return EventWrapper.newInstance(
103
				asEvent(oaf.getEntity(), topic, payload, eventEntity, adjustedTrust),
104 52765 michele.ar
				payload.getHighlight().getSubjects().stream().filter(StringUtils::isNotBlank).sorted().collect(Collectors.joining(", ")),
105 49832 claudio.at
				topic.getValue());
106 44584 claudio.at
	}
107
108 52765 michele.ar
	private Topic getSubjectTopic(final StructuredProperty subject, final boolean missingOrMore) {
109 44584 claudio.at
		switch (subject.getQualifier().getClassid()) {
110 52765 michele.ar
		case "mesheuropmc":
111 44584 claudio.at
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_MESHEUROPMC : Topic.ENRICH_MORE_SUBJECT_MESHEUROPMC;
112 52765 michele.ar
		case "arxiv":
113 44584 claudio.at
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_ARXIV : Topic.ENRICH_MORE_SUBJECT_ARXIV;
114 52765 michele.ar
		case "jel":
115 44584 claudio.at
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_JEL : Topic.ENRICH_MORE_SUBJECT_JEL;
116 52765 michele.ar
		case "ddc":
117 44584 claudio.at
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_DDC : Topic.ENRICH_MORE_SUBJECT_DDC;
118 52765 michele.ar
		case "acm":
119 44584 claudio.at
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_ACM : Topic.ENRICH_MORE_SUBJECT_ACM;
120 52765 michele.ar
		case "rvk":
121 44584 claudio.at
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_RVK : Topic.ENRICH_MORE_SUBJECT_RVK;
122
		default:
123
			return null;
124
		}
125
	}
126
127
	private boolean hasSubjectValue(final Oaf oaf, final String classId, final String value) {
128 49832 claudio.at
		return oaf.getEntity().getResult().getMetadata().getSubjectList()
129
				.stream()
130
				.anyMatch(s -> s.getValue().equalsIgnoreCase(value) & s.getQualifier().getClassid().endsWith(classId));
131 44584 claudio.at
	}
132
133
}