Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker;
2

    
3
import java.io.IOException;
4

    
5
import com.google.common.base.Predicate;
6
import com.google.common.collect.Iterables;
7
import com.google.common.collect.Lists;
8
import eu.dnetlib.data.broker.model.openaire.OpenAireEventPayload;
9
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
10
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
11
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
12
import eu.dnetlib.data.proto.FieldTypeProtos.StructuredProperty;
13
import eu.dnetlib.data.proto.OafProtos.Oaf;
14
import org.apache.hadoop.io.Text;
15
import org.apache.hadoop.mapreduce.Reducer.Context;
16

    
17
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
18

    
19
/**
20
 * Created by claudio on 26/07/16.
21
 */
22
public class PIDEventFactory {
23

    
24
	protected Text tKey = new Text("");
25

    
26
	public static void process(final Context context, final Oaf current, final Oaf other) throws IOException, InterruptedException {
27
		new PIDEventFactory().processPID(context, current, other);
28
	}
29

    
30
	public void processPID(final Context context, final Oaf current, final Oaf other) throws IOException, InterruptedException {
31
		//PIDS
32
		if (current.getEntity().getPidList().isEmpty()) {
33
			for(StructuredProperty pidOther : other.getEntity().getPidList()) {
34
				doProcessPID(context, current, other, pidOther, Topic.MISSING_PID);
35
			}
36
		}
37

    
38
		for(StructuredProperty pidOther : other.getEntity().getPidList()) {
39
			if(!hasPidValue(current, pidOther.getValue())) {
40
				doProcessPID(context, current, other, pidOther, Topic.MORE_PID);
41
			}
42
		}
43
	}
44

    
45
	private void doProcessPID(final Context context, final Oaf current, final Oaf other, final StructuredProperty pidOther, final Topic topic)
46
			throws IOException, InterruptedException {
47
		final Oaf.Builder prototype = Oaf.newBuilder(current);
48
		prototype.getEntityBuilder().addPid(pidOther);
49
		final Oaf oaf = prototype.build();
50

    
51
		final EventMessage event = asEvent(oaf.getEntity(), topic, other.getEntity());
52
		final OpenAireEventPayload payload = OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), other.getEntity());
53
		event.setPayload(HighlightFactory.highlightEnrichPid(payload, Lists.newArrayList(pidOther)).toJSON());
54

    
55
		context.write(tKey, new Text(event.toString()));
56
		context.getCounter("event", topic.getValue()).increment(1);
57
	}
58

    
59
	private boolean hasPidValue(final Oaf oaf, final String value) {
60
		return Iterables.any(oaf.getEntity().getPidList(), new Predicate<StructuredProperty>() {
61
			@Override
62
			public boolean apply(final StructuredProperty pid) {
63
				return pid.getValue().equalsIgnoreCase(value) ;
64
			}
65
		});
66
	}
67
}
(8-8/11)