Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Set;
6
import java.util.stream.Collectors;
7

    
8
import com.google.common.base.Predicate;
9
import com.google.common.collect.Iterables;
10
import eu.dnetlib.broker.objects.OpenAireEventPayload;
11
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
12
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
13
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
14
import eu.dnetlib.data.proto.OafProtos.Oaf;
15
import eu.dnetlib.data.proto.ResultProtos.Result.Instance;
16
import org.apache.commons.lang.StringUtils;
17
import org.apache.hadoop.io.Text;
18
import org.apache.hadoop.mapreduce.Reducer.Context;
19
import org.dom4j.DocumentException;
20

    
21
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
22
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getKey;
23

    
24
/**
25
 * Created by claudio on 26/07/16.
26
 */
27
public class OAVersionEventFactory {
28

    
29
	protected Text tKey = new Text("");
30

    
31
	public static void process(final Context context, final Oaf current, final Oaf other, final float trust, final Set<String> untrustedOaDsList)
32
			throws IOException, InterruptedException, DocumentException {
33
		new OAVersionEventFactory().processOAVersion(context, current, other, trust, untrustedOaDsList);
34
	}
35

    
36
	private void processOAVersion(final Context context, final Oaf current, final Oaf other, final float trust, final Set<String> untrustedOaDsList)
37
			throws IOException, InterruptedException, DocumentException {
38
		final String otherDsId = StringUtils.substringAfter(getKey(other.getEntity().getCollectedfromList()), "|");
39

    
40
		// OPEN ACCESS VERSION
41
		if (!untrustedOaDsList.contains(otherDsId)) {
42

    
43
			if (hasAccess(other, "OPEN", false)) {
44

    
45
				// MORE
46
				doProcessOAVersion(context, current, other, Topic.ENRICH_MORE_OA_VERSION, trust);
47

    
48
				// MISSING
49
				if (!hasAccess(current, "OPEN", true)) {
50
					doProcessOAVersion(context, current, other, Topic.ENRICH_MISSING_OA_VERSION, trust);
51
				}
52
			}
53
		}
54

    
55
	}
56

    
57
	private void doProcessOAVersion(final Context context, final Oaf current, final Oaf other, final Topic topic, final float trust)
58
			throws IOException, InterruptedException, DocumentException {
59
		final Oaf.Builder prototype = Oaf.newBuilder(current);
60

    
61
		final List<Instance> instances =
62
				other.getEntity().getResult().getInstanceList().stream()
63
						.filter(i ->"OPEN".equalsIgnoreCase(i.getAccessright().getClassid()))
64
						.collect(Collectors.toList());
65
		prototype.getEntityBuilder().getResultBuilder().addAllInstance(instances);
66

    
67
		final Oaf oaf = prototype.build();
68

    
69
		final OpenAireEventPayload payload =
70
				HighlightFactory.highlightEnrichOa(OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), other.getEntity(), trust), instances);
71
		final EventMessage event = asEvent(oaf.getEntity(), topic, payload, other.getEntity(), trust);
72

    
73
		context.write(tKey, new Text(event.toString()));
74
		context.getCounter("event", topic.getValue()).increment(1);
75
	}
76

    
77
	private boolean hasAccess(final Oaf oaf, final String access, final boolean strict) {
78
		final Predicate<Instance> p = i -> access.equalsIgnoreCase(i.getAccessright().getClassid());
79
		final List<Instance> i = oaf.getEntity().getResult().getInstanceList();
80
		return strict ? Iterables.all(i, p) : Iterables.any(i, p);
81
	}
82

    
83
}
(2-2/8)