1
|
package eu.dnetlib.data.mapreduce.hbase.broker.model;
|
2
|
|
3
|
import java.util.Date;
|
4
|
import java.util.stream.Collectors;
|
5
|
|
6
|
import org.apache.commons.codec.digest.DigestUtils;
|
7
|
import org.apache.hadoop.mapreduce.Mapper;
|
8
|
import org.apache.hadoop.mapreduce.Reducer;
|
9
|
|
10
|
//Utility class, it perfoms the conversion msg -> ES entinty and provides some and manages M/R logs
|
11
|
public class EventWrapper {
|
12
|
|
13
|
private final EventMessage event;
|
14
|
private final String highlightValue;
|
15
|
|
16
|
private final String counterGroupName;
|
17
|
private final String counterName;
|
18
|
|
19
|
public static EventWrapper newInstance(final EventMessage event, final String highlightValue,
|
20
|
final String counterGroupName, final String counterName) {
|
21
|
return new EventWrapper(event, highlightValue, counterGroupName, counterName);
|
22
|
}
|
23
|
|
24
|
private EventWrapper(EventMessage event, String highlightValue, String counterGroupName, String counterName) {
|
25
|
this.event = event;
|
26
|
this.highlightValue = highlightValue;
|
27
|
this.counterGroupName = counterGroupName;
|
28
|
this.counterName = counterName;
|
29
|
}
|
30
|
|
31
|
public Event asBrokerEvent() {
|
32
|
|
33
|
final Date now = new Date();
|
34
|
|
35
|
final Event res = new Event();
|
36
|
|
37
|
final String eventId = calculateEventId(event.getTopic(),
|
38
|
event.getMap().get("target_publication_id").toString(), highlightValue);
|
39
|
|
40
|
res.setEventId(eventId);
|
41
|
res.setProducerId(event.getProducerId());
|
42
|
res.setPayload(event.getPayload());
|
43
|
res.setMap(event.getMap().entrySet().stream().filter(e -> e.getValue().asObject() != null)
|
44
|
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().asObject())));
|
45
|
res.setTopic(event.getTopic());
|
46
|
res.setCreationDate(now);
|
47
|
res.setExpiryDate(calculateExpiryDate(now, event.getTthDays()));
|
48
|
res.setInstantMessage(event.getTthDays() == 0);
|
49
|
return res;
|
50
|
}
|
51
|
|
52
|
private String calculateEventId(final String topic, final String publicationId, final String value) {
|
53
|
return "event-" + DigestUtils.md5Hex(topic).substring(0, 6) + "-"
|
54
|
+ DigestUtils.md5Hex(publicationId).substring(0, 8) + "-" + DigestUtils.md5Hex(value).substring(0, 8);
|
55
|
}
|
56
|
|
57
|
private Date calculateExpiryDate(final Date now, final int ttlDays) {
|
58
|
if (ttlDays < 0) {
|
59
|
return null;
|
60
|
} else if (ttlDays == 0) {
|
61
|
return now;
|
62
|
} else {
|
63
|
return new Date(now.getTime() + ttlDays * 24 * 60 * 60 * 1000);
|
64
|
}
|
65
|
}
|
66
|
|
67
|
public void incrementCounter(Mapper<?, ?, ?, ?>.Context context) {
|
68
|
context.getCounter(counterGroupName, counterName).increment(1);
|
69
|
}
|
70
|
|
71
|
public void incrementCounter(Reducer<?, ?, ?, ?>.Context context) {
|
72
|
context.getCounter(counterGroupName, counterName).increment(1);
|
73
|
}
|
74
|
}
|