Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4

    
5
import com.google.common.base.Predicate;
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8
import eu.dnetlib.broker.objects.OpenAireEventPayload;
9
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
10
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
11
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
12
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import eu.dnetlib.data.proto.OafProtos.OafEntity;
15
import eu.dnetlib.data.proto.ResultProtos.Result.Metadata;
16
import org.apache.commons.lang.StringUtils;
17
import org.apache.hadoop.io.Text;
18
import org.apache.hadoop.mapreduce.Reducer.Context;
19
import org.dom4j.DocumentException;
20

    
21
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
22

    
23
/**
24
 * Created by claudio on 16/11/2016.
25
 */
26
public class SubjectEventFactory {
27

    
28
	protected Text tKey = new Text("");
29

    
30
	public static void process(final Context context, final Oaf current, final Oaf other, final float trust)
31
			throws IOException, InterruptedException, DocumentException {
32
		new SubjectEventFactory().processSubjectPair(context, current, other, trust);
33
	}
34

    
35
	public static void process(final Context context, final Oaf current)
36
			throws IOException, InterruptedException, DocumentException {
37
		new SubjectEventFactory().processSubjectSingle(context, current);
38
	}
39

    
40
	private void processSubjectSingle(final Context context, final Oaf current) throws InterruptedException, DocumentException, IOException {
41
		final Metadata mCurrent = current.getEntity().getResult().getMetadata();
42
		if (mCurrent.getSubjectList().isEmpty()) {
43
			for (final StructuredProperty subject : mCurrent.getSubjectList()) {
44

    
45
				if (subject.hasDataInfo() && subject.getDataInfo().getInferred()) {
46
					doProcessSubject(context, current, null, subject, getSubjectTopic(subject, true), Float.valueOf(subject.getDataInfo().getTrust()));
47
				}
48
			}
49
		}
50
	}
51

    
52
	private void processSubjectPair(final Context context, final Oaf current, final Oaf other, final Float trust) throws IOException, InterruptedException {
53

    
54
		final Metadata mCurrent = current.getEntity().getResult().getMetadata();
55
		final Metadata mOther = other.getEntity().getResult().getMetadata();
56

    
57
		if (mCurrent.getSubjectList().isEmpty()) {
58
			for (final StructuredProperty subjectOther : mOther.getSubjectList()) {
59
				doProcessSubject(context, current, other, subjectOther, getSubjectTopic(subjectOther, true), trust);
60
			}
61
		}
62

    
63
		if (other != null) {
64
			for (final StructuredProperty subjectOther : mOther.getSubjectList()) {
65
				if (!hasSubjectValue(current, subjectOther.getQualifier().getClassid(), subjectOther.getValue())) {
66
					doProcessSubject(context, current, other, subjectOther, getSubjectTopic(subjectOther, false), trust);
67
				}
68
			}
69
		}
70
	}
71

    
72
	private void doProcessSubject(final Context context,
73
			final Oaf current,
74
			final Oaf other,
75
			final StructuredProperty subjectOther,
76
			final Topic topic,
77
			final Float trust) throws IOException, InterruptedException {
78

    
79
		if (topic == null) {
80
			context.getCounter("events skipped", "SUBJECT not mapped").increment(1);
81
			return;
82
		}
83
		// we got java.lang.NumberFormatException: empty String, because of empty trust values. Default to 1.0 in such cases
84
		final String t = subjectOther.getDataInfo().getTrust();
85
		final float adjustedTrust = trust * Float.valueOf(StringUtils.isBlank(t) ? "1.0" : t);
86
		final Oaf.Builder prototype = Oaf.newBuilder(current);
87
		prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().addSubject(subjectOther);
88
		final Oaf oaf = prototype.build();
89

    
90
		final OafEntity eventEntity = other != null ? other.getEntity() : current.getEntity();
91

    
92
		final OpenAireEventPayload payload =
93
				HighlightFactory.highlightEnrichSubject(OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), eventEntity, adjustedTrust),
94
						Lists.newArrayList(subjectOther));
95
		final EventMessage event = asEvent(oaf.getEntity(), topic, payload, eventEntity, adjustedTrust);
96

    
97
		context.write(tKey, new Text(event.toString()));
98
		context.getCounter("event", topic.getValue()).increment(1);
99
	}
100

    
101
	private Topic getSubjectTopic(final StructuredProperty subject, boolean missingOrMore) {
102
		switch (subject.getQualifier().getClassid()) {
103
		case "mesheuropmc" :
104
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_MESHEUROPMC : Topic.ENRICH_MORE_SUBJECT_MESHEUROPMC;
105
		case "arxiv" :
106
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_ARXIV : Topic.ENRICH_MORE_SUBJECT_ARXIV;
107
		case "jel" :
108
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_JEL : Topic.ENRICH_MORE_SUBJECT_JEL;
109
		case "ddc" :
110
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_DDC : Topic.ENRICH_MORE_SUBJECT_DDC;
111
		case "acm" :
112
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_ACM : Topic.ENRICH_MORE_SUBJECT_ACM;
113
		case "rvk" :
114
			return missingOrMore ? Topic.ENRICH_MISSING_SUBJECT_RVK : Topic.ENRICH_MORE_SUBJECT_RVK;
115
		default:
116
			return null;
117
		}
118
	}
119

    
120
	private boolean hasSubjectValue(final Oaf oaf, final String classId, final String value) {
121
		return Iterables.any(oaf.getEntity().getResult().getMetadata().getSubjectList(), new Predicate<StructuredProperty>() {
122

    
123
			@Override
124
			public boolean apply(final StructuredProperty subject) {
125
				return subject.getValue().equalsIgnoreCase(value) & subject.getQualifier().getClassid().endsWith(classId);
126
			}
127
		});
128
	}
129

    
130
}
(12-12/13)