Project

General

Profile

« Previous | Next » 

Revision 52989

partial implementation of the broker events

View differences:

modules/dnet-mapreduce-jobs/branches/broker_events/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/model/Event.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.model;
2

  
3
import java.util.Date;
4
import java.util.Map;
5

  
6
import com.google.gson.Gson;
7
import com.google.gson.GsonBuilder;
8

  
9
// This class represents the ElasticSearch Entity
10
public class Event {
11

  
12
	public static final String DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'";
13

  
14
	private String eventId;
15

  
16
	private String producerId;
17

  
18
	private String topic;
19

  
20
	private String payload;
21

  
22
	private Date creationDate;
23

  
24
	private Date expiryDate;
25

  
26
	private boolean instantMessage;
27

  
28
	private Map<String, Object> map;
29

  
30
	public Event() {
31
	}
32

  
33
	public Event(final String producerId, final String eventId, final String topic, final String payload,
34
			final Date creationDate, final Date expiryDate, final boolean instantMessage,
35
			final Map<String, Object> map) {
36
		this.producerId = producerId;
37
		this.eventId = eventId;
38
		this.topic = topic;
39
		this.payload = payload;
40
		this.creationDate = creationDate;
41
		this.expiryDate = expiryDate;
42
		this.instantMessage = instantMessage;
43
		this.map = map;
44
	}
45

  
46
	public String getProducerId() {
47
		return this.producerId;
48
	}
49

  
50
	public void setProducerId(final String producerId) {
51
		this.producerId = producerId;
52
	}
53

  
54
	public String getEventId() {
55
		return this.eventId;
56
	}
57

  
58
	public void setEventId(final String eventId) {
59
		this.eventId = eventId;
60
	}
61

  
62
	public String getTopic() {
63
		return this.topic;
64
	}
65

  
66
	public void setTopic(final String topic) {
67
		this.topic = topic;
68
	}
69

  
70
	public String getPayload() {
71
		return this.payload;
72
	}
73

  
74
	public void setPayload(final String payload) {
75
		this.payload = payload;
76
	}
77

  
78
	public Date getCreationDate() {
79
		return this.creationDate;
80
	}
81

  
82
	public void setCreationDate(final Date creationDate) {
83
		this.creationDate = creationDate;
84
	}
85

  
86
	public Date getExpiryDate() {
87
		return this.expiryDate;
88
	}
89

  
90
	public void setExpiryDate(final Date expiryDate) {
91
		this.expiryDate = expiryDate;
92
	}
93

  
94
	public boolean isInstantMessage() {
95
		return this.instantMessage;
96
	}
97

  
98
	public void setInstantMessage(final boolean instantMessage) {
99
		this.instantMessage = instantMessage;
100
	}
101

  
102
	public Map<String, Object> getMap() {
103
		return this.map;
104
	}
105

  
106
	public void setMap(final Map<String, Object> map) {
107
		this.map = map;
108
	}
109

  
110
	public String asJson() {
111
		final Gson gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create();
112
		return gson.toJson(this);
113
	}
114
}
modules/dnet-mapreduce-jobs/branches/broker_events/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/model/EventWrapper.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.model;
2

  
3
import java.util.Date;
4
import java.util.stream.Collectors;
5

  
6
import org.apache.commons.codec.digest.DigestUtils;
7
import org.apache.hadoop.mapreduce.Mapper;
8
import org.apache.hadoop.mapreduce.Reducer;
9

  
10
//Utility class, it perfoms the conversion msg -> ES entinty and provides some and manages M/R logs
11
public class EventWrapper {
12

  
13
	private final EventMessage event;
14
	private final String highlightValue;
15

  
16
	private final String counterGroupName;
17
	private final String counterName;
18

  
19
	public static EventWrapper newInstance(final EventMessage event, final String highlightValue,
20
			final String counterGroupName, final String counterName) {
21
		return new EventWrapper(event, highlightValue, counterGroupName, counterName);
22
	}
23

  
24
	private EventWrapper(EventMessage event, String highlightValue, String counterGroupName, String counterName) {
25
		this.event = event;
26
		this.highlightValue = highlightValue;
27
		this.counterGroupName = counterGroupName;
28
		this.counterName = counterName;
29
	}
30

  
31
	public Event asBrokerEvent() {
32

  
33
		final Date now = new Date();
34

  
35
		final Event res = new Event();
36

  
37
		final String eventId = calculateEventId(event.getTopic(),
38
				event.getMap().get("target_publication_id").toString(), highlightValue);
39

  
40
		res.setEventId(eventId);
41
		res.setProducerId(event.getProducerId());
42
		res.setPayload(event.getPayload());
43
		res.setMap(event.getMap().entrySet().stream().filter(e -> e.getValue().asObject() != null)
44
				.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().asObject())));
45
		res.setTopic(event.getTopic());
46
		res.setCreationDate(now);
47
		res.setExpiryDate(calculateExpiryDate(now, event.getTthDays()));
48
		res.setInstantMessage(event.getTthDays() == 0);
49
		return res;
50
	}
51

  
52
	private String calculateEventId(final String topic, final String publicationId, final String value) {
53
		return "event-" + DigestUtils.md5Hex(topic).substring(0, 6) + "-"
54
				+ DigestUtils.md5Hex(publicationId).substring(0, 8) + "-" + DigestUtils.md5Hex(value).substring(0, 8);
55
	}
56

  
57
	private Date calculateExpiryDate(final Date now, final int ttlDays) {
58
		if (ttlDays < 0) {
59
			return null;
60
		} else if (ttlDays == 0) {
61
			return now;
62
		} else {
63
			return new Date(now.getTime() + ttlDays * 24 * 60 * 60 * 1000);
64
		}
65
	}
66

  
67
	public void incrementCounter(Mapper<?, ?, ?, ?>.Context context) {
68
		context.getCounter(counterGroupName, counterName).increment(1);
69
	}
70

  
71
	public void incrementCounter(Reducer<?, ?, ?, ?>.Context context) {
72
		context.getCounter(counterGroupName, counterName).increment(1);
73
	}
74
}
modules/dnet-mapreduce-jobs/branches/broker_events/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/model/EventMessage.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.model;
2

  
3
import java.util.Map;
4

  
5
import com.google.gson.Gson;
6

  
7
/**
8
 * Created by claudio on 08/07/16.
9
 */
10

  
11
// This class represents the Event message as required by the Original API
12
// (rabbitMQ)
13
public class EventMessage {
14

  
15
	public static final int TTH_INFINITE = -1;
16

  
17
	private String producerId;
18
	private String topic;
19
	private String payload;
20
	private int tthDays = TTH_INFINITE;
21
	private Map<String, MapValue> map;
22

  
23
	public EventMessage() {
24
	}
25

  
26
	public EventMessage(final String producerId, final Topic topic, final String payload, final int tthDays,
27
			final Map<String, MapValue> map) {
28
		this.producerId = producerId;
29
		this.topic = topic.getValue();
30
		this.payload = payload;
31
		this.tthDays = tthDays;
32
		this.map = map;
33
	}
34

  
35
	public String getProducerId() {
36
		return this.producerId;
37
	}
38

  
39
	public void setProducerId(final String producerId) {
40
		this.producerId = producerId;
41
	}
42

  
43
	public String getTopic() {
44
		return this.topic;
45
	}
46

  
47
	public void setTopic(final String topic) {
48
		this.topic = topic;
49
	}
50

  
51
	public String getPayload() {
52
		return this.payload;
53
	}
54

  
55
	public void setPayload(final String payload) {
56
		this.payload = payload;
57
	}
58

  
59
	public Map<String, MapValue> getMap() {
60
		return this.map;
61
	}
62

  
63
	public void setMap(final Map<String, MapValue> map) {
64
		this.map = map;
65
	}
66

  
67
	public boolean hasNulls() {
68
		return (this.producerId == null) || (this.topic == null) || (this.payload == null) || (this.map == null);
69
	}
70

  
71
	public int getTthDays() {
72
		return this.tthDays;
73
	}
74

  
75
	public void setTthDays(final int tthDays) {
76
		this.tthDays = tthDays;
77
	}
78

  
79
	@Override
80
	public String toString() {
81
		return new Gson().toJson(this);
82
	}
83
}
modules/dnet-mapreduce-jobs/branches/broker_events/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/model/MapValue.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.model;
2

  
3
import java.util.List;
4
import java.util.stream.Collectors;
5

  
6
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.DateParser;
7
import org.apache.commons.lang3.BooleanUtils;
8
import org.apache.commons.lang3.StringUtils;
9
import org.apache.commons.lang3.math.NumberUtils;
10
import org.apache.commons.logging.Log;
11
import org.apache.commons.logging.LogFactory;
12

  
13
public class MapValue {
14

  
15
	private MapValueType type = MapValueType.STRING;
16
	private Object value = "";
17

  
18
	private static final Log log = LogFactory.getLog(MapValue.class);
19

  
20
	public MapValue() {}
21

  
22
	public MapValue(final MapValueType type, final Object value) {
23
		this.type = type;
24
		this.value = value;
25
	}
26

  
27
	public MapValueType getType() {
28
		return this.type;
29
	}
30

  
31
	public void setType(final MapValueType type) {
32
		this.type = type;
33
	}
34

  
35
	public Object getValue() {
36
		return this.value;
37
	}
38

  
39
	public void setValue(final Object value) {
40
		this.value = value;
41
	}
42

  
43
	public Object asObject() {
44
		if (this.value == null) {
45
			log.warn("Object is NULL");
46
			return null;
47
		}
48
		try {
49
			switch (this.type) {
50
			case STRING:
51
			case INTEGER:
52
			case FLOAT:
53
			case BOOLEAN:
54
			case DATE:
55
				return asSimpleObject(getType(), getValue().toString());
56
			case LIST_STRING:
57
				return ((List<?>) getValue()).stream().map(o -> asSimpleObject(MapValueType.STRING, o.toString())).collect(Collectors.toList());
58
			case LIST_INTEGER:
59
				return ((List<?>) getValue()).stream().map(o -> asSimpleObject(MapValueType.INTEGER, o.toString())).collect(Collectors.toList());
60
			case LIST_FLOAT:
61
				return ((List<?>) getValue()).stream().map(o -> asSimpleObject(MapValueType.FLOAT, o.toString())).collect(Collectors.toList());
62
			case LIST_BOOLEAN:
63
				return ((List<?>) getValue()).stream().map(o -> asSimpleObject(MapValueType.BOOLEAN, o.toString())).collect(Collectors.toList());
64
			case LIST_DATE:
65
				return ((List<?>) getValue()).stream().map(o -> asSimpleObject(MapValueType.DATE, o.toString())).collect(Collectors.toList());
66
			default:
67
				log.warn("Invalid type: " + this.type);
68
				return null;
69
			}
70
		} catch (final Exception e) {
71
			log.warn("Error parsing value: " + this);
72
			return null;
73
		}
74
	}
75

  
76
	private Object asSimpleObject(final MapValueType type, final String s) {
77
		try {
78
			switch (type) {
79
			case STRING:
80
				return s.toString();
81
			case INTEGER:
82
				return s.contains(".") ? NumberUtils.toLong(StringUtils.substringBefore(s, "."), 0) : NumberUtils.toLong(s, 0);
83
			case FLOAT:
84
				return NumberUtils.toDouble(s, 0);
85
			case BOOLEAN:
86
				return BooleanUtils.toBoolean(s);
87
			case DATE:
88
				return DateParser.parse(s);
89
			default:
90
				log.warn("Unmamaged type: " + type);
91
				return null;
92
			}
93
		} catch (final Exception e) {
94
			log.warn("Error parsing value: " + s + " - type: " + type);
95
			return null;
96
		}
97
	}
98

  
99
	@Override
100
	public String toString() {
101
		return String.format("[ type; %s, value: %s ]", getType(), getValue());
102
	}
103
	}
modules/dnet-mapreduce-jobs/branches/broker_events/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/model/MapValueType.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.model;
2

  
3
/**
4
 * Created by claudio on 02/09/16.
5
 */
6
public enum MapValueType {
7
	STRING, INTEGER, FLOAT, DATE, BOOLEAN, LIST_STRING, LIST_INTEGER, LIST_FLOAT, LIST_DATE, LIST_BOOLEAN
8
}
modules/dnet-mapreduce-jobs/branches/broker_events/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/model/Topic.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.model;
2

  
3
public enum Topic {
4

  
5
	// ENRICHMENT MISSING
6
	ENRICH_MISSING_ARTICLE("ENRICH/MISSING/ARTICLE"),
7

  
8
	// ENRICHMENT MORE
9
	ENRICH_MORE_ARTICLE("ENRICH/MORE/ARTICLE");
10

  
11
	Topic(final String value) {
12
		this.value = value;
13
	}
14

  
15
	protected String value;
16

  
17
	public String getValue() {
18
		return this.value;
19
	}
20
}
modules/dnet-mapreduce-jobs/branches/broker_events/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/enrich/DliArticleEnrichmentReducer.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

  
3
import java.io.IOException;
4
import java.util.List;
5
import java.util.Objects;
6

  
7
import org.apache.commons.logging.Log;
8
import org.apache.commons.logging.LogFactory;
9
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
10
import org.apache.hadoop.io.Text;
11
import org.apache.hadoop.mapreduce.Reducer;
12

  
13
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventWrapper;
14

  
15
public class DliArticleEnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
16

  
17
	private static final int LIMIT = 1000;
18
	private Text tKey = new Text("");
19

  
20
	private static final Log log = LogFactory.getLog(DliArticleEnrichmentReducer.class);
21

  
22
	@Override
23
	protected void setup(final Context context) throws IOException, InterruptedException {
24
		super.setup(context);
25

  
26
		System.out.println("LIMIT: " + LIMIT);
27

  
28
	}
29

  
30
	@Override
31
	protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Context context)
32
			throws IOException, InterruptedException {
33
		// TODO Auto-generated method stub
34
		super.reduce(key, values, context);
35
	}
36

  
37
	private void emit(final List<EventWrapper> eventWrappers, final Context context) {
38
		eventWrappers.stream().filter(Objects::nonNull).forEach(ew -> emit(ew, context));
39
	}
40

  
41
	private void emit(final EventWrapper eventWrapper, final Context context) {
42
		try {
43
			final Text valueout = new Text(eventWrapper.asBrokerEvent().asJson());
44
			context.write(tKey, valueout);
45
			eventWrapper.incrementCounter(context);
46
		} catch (Exception e) {
47
			throw new RuntimeException(e);
48
		}
49
	}
50

  
51
}
modules/dnet-mapreduce-jobs/branches/broker_events/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/enrich/DliArticleEnrichmentMapper.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;
2

  
3
import java.io.IOException;
4

  
5
import org.apache.commons.logging.Log;
6
import org.apache.commons.logging.LogFactory;
7
import org.apache.hadoop.hbase.client.Result;
8
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
9
import org.apache.hadoop.hbase.mapreduce.TableMapper;
10

  
11
public class DliArticleEnrichmentMapper extends TableMapper<ImmutableBytesWritable, ImmutableBytesWritable> {
12

  
13
	protected ImmutableBytesWritable outValue;
14

  
15
	protected ImmutableBytesWritable outKey;
16

  
17
	private static final Log log = LogFactory.getLog(DliArticleEnrichmentMapper.class);
18

  
19
	@Override
20
	protected void setup(final Context context) {
21
		outKey = new ImmutableBytesWritable();
22
		outValue = new ImmutableBytesWritable();
23
	}
24

  
25
	@Override
26
	protected void map(ImmutableBytesWritable key, Result value, Context context)
27
			throws IOException, InterruptedException {
28
		// TODO Auto-generated method stub
29

  
30
		// DNGF d =
31
		super.map(key, value, context);
32
	}
33

  
34
	private void emit(final Context context, final byte[] key, final byte[] value, final String entityType) {
35
		outKey.set(key);
36
		outValue.set(value);
37
		try {
38
			context.write(outKey, outValue);
39
			context.getCounter("Broker Enrichment DLI articles", "entity type: " + entityType).increment(1);
40
		} catch (Exception e) {
41
			throw new IllegalArgumentException(e);
42
		}
43
	}
44

  
45
}
modules/dnet-mapreduce-jobs/branches/broker_events/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/mapping/HighlightFactory.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.mapping;
2

  
3
import eu.dnetlib.broker.objects.OpenAireEventPayload;
4

  
5
public class HighlightFactory {
6
	public static OpenAireEventPayload highlightArticle(final OpenAireEventPayload p, final String article) {
7
		// p.getHighlight().setInstances(mapInstances(instances));
8
		return p;
9
	}
10
}
modules/dnet-mapreduce-jobs/branches/broker_events/src/main/java/eu/dnetlib/data/mapreduce/hbase/broker/mapping/DateParser.java
1
package eu.dnetlib.data.mapreduce.hbase.broker.mapping;
2

  
3
import java.text.ParseException;
4

  
5
import org.apache.commons.lang3.time.DateUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8

  
9
public class DateParser {
10

  
11
	private static final Log log = LogFactory.getLog(DateParser.class);
12

  
13
	public static Long parse(final String s) {
14
		if (s == null) {
15
			log.warn("Date is NULL");
16
			return null;
17
		}
18

  
19
		try {
20
			return DateUtils.parseDate(s, "yyyy-MM-dd", "yyyy-MM-dd HH:mm:ss").getTime();
21
		} catch (final ParseException e) {
22
			log.warn("Invalid Date: " + s);
23
			return null;
24
		}
25

  
26
	}
27
}
modules/dnet-mapreduce-jobs/branches/broker_events/pom.xml
1 1
<?xml version="1.0" ?>
2
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns="http://maven.apache.org/POM/4.0.0" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
2
<project xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
3
	xmlns="http://maven.apache.org/POM/4.0.0"
4
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
3 5
	<parent>
4 6
		<groupId>eu.dnetlib</groupId>
5 7
		<artifactId>dnet-parent</artifactId>
......
436 438
			<version>3.4.5-cdh4.3.0</version>
437 439
			<scope>test</scope>
438 440
		</dependency>
439

  
440
		<!--
441 441
		<dependency>
442
			<groupId>org.json</groupId>
443
			<artifactId>json</artifactId>
444
			<version>20160212</version>
445
			<scope>test</scope>
442
			<groupId>eu.dnetlib</groupId>
443
			<artifactId>dnet-openaire-broker-common</artifactId>
444
			<version>[1.0.0,2.0.0)</version>
446 445
		</dependency>
447
		-->
448 446

  
447
		<!-- <dependency> <groupId>org.json</groupId> <artifactId>json</artifactId> 
448
			<version>20160212</version> <scope>test</scope> </dependency> -->
449

  
449 450
	</dependencies>
450 451
</project>

Also available in: Unified diff