Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.model;
2

    
3
import java.util.Date;
4
import java.util.stream.Collectors;
5

    
6
import org.apache.commons.codec.digest.DigestUtils;
7
import org.apache.hadoop.mapreduce.Mapper;
8
import org.apache.hadoop.mapreduce.Reducer;
9

    
10
//Utility class, it perfoms the conversion msg -> ES entinty and provides some and manages M/R logs
11
public class EventWrapper {
12

    
13
	private final EventMessage event;
14
	private final String highlightValue;
15

    
16
	private final String counterGroupName;
17
	private final String counterName;
18

    
19
	public static EventWrapper newInstance(final EventMessage event, final String highlightValue,
20
			final String counterGroupName, final String counterName) {
21
		return new EventWrapper(event, highlightValue, counterGroupName, counterName);
22
	}
23

    
24
	private EventWrapper(EventMessage event, String highlightValue, String counterGroupName, String counterName) {
25
		this.event = event;
26
		this.highlightValue = highlightValue;
27
		this.counterGroupName = counterGroupName;
28
		this.counterName = counterName;
29
	}
30

    
31
	public Event asBrokerEvent() {
32

    
33
		final Date now = new Date();
34

    
35
		final Event res = new Event();
36

    
37
		final String eventId = calculateEventId(event.getTopic(),
38
				event.getMap().get("target_publication_id").toString(), highlightValue);
39

    
40
		res.setEventId(eventId);
41
		res.setProducerId(event.getProducerId());
42
		res.setPayload(event.getPayload());
43
		res.setMap(event.getMap().entrySet().stream().filter(e -> e.getValue().asObject() != null)
44
				.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().asObject())));
45
		res.setTopic(event.getTopic());
46
		res.setCreationDate(now);
47
		res.setExpiryDate(calculateExpiryDate(now, event.getTthDays()));
48
		res.setInstantMessage(event.getTthDays() == 0);
49
		return res;
50
	}
51

    
52
	private String calculateEventId(final String topic, final String publicationId, final String value) {
53
		return "event-" + DigestUtils.md5Hex(topic).substring(0, 6) + "-"
54
				+ DigestUtils.md5Hex(publicationId).substring(0, 8) + "-" + DigestUtils.md5Hex(value).substring(0, 8);
55
	}
56

    
57
	private Date calculateExpiryDate(final Date now, final int ttlDays) {
58
		if (ttlDays < 0) {
59
			return null;
60
		} else if (ttlDays == 0) {
61
			return now;
62
		} else {
63
			return new Date(now.getTime() + ttlDays * 24 * 60 * 60 * 1000);
64
		}
65
	}
66

    
67
	public void incrementCounter(Mapper<?, ?, ?, ?>.Context context) {
68
		context.getCounter(counterGroupName, counterName).increment(1);
69
	}
70

    
71
	public void incrementCounter(Reducer<?, ?, ?, ?>.Context context) {
72
		context.getCounter(counterGroupName, counterName).increment(1);
73
	}
74
}
(3-3/6)