1
|
package eu.dnetlib.data.mapreduce.hbase.broker.mapping;
|
2
|
|
3
|
import java.text.ParseException;
|
4
|
import java.util.List;
|
5
|
import java.util.Map;
|
6
|
|
7
|
import com.google.common.collect.Maps;
|
8
|
import eu.dnetlib.broker.objects.OpenAireEventPayload;
|
9
|
import eu.dnetlib.data.mapreduce.hbase.broker.Topic;
|
10
|
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
|
11
|
import eu.dnetlib.data.mapreduce.hbase.broker.model.MapValue;
|
12
|
import eu.dnetlib.data.mapreduce.hbase.broker.model.MapValueType;
|
13
|
import eu.dnetlib.data.proto.OafProtos.OafEntity;
|
14
|
import org.apache.commons.collections.CollectionUtils;
|
15
|
import org.apache.commons.lang.StringUtils;
|
16
|
import org.apache.commons.lang.time.DateUtils;
|
17
|
|
18
|
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.*;
|
19
|
|
20
|
/**
|
21
|
* Created by claudio on 20/07/16.
|
22
|
*/
|
23
|
public class EventFactory {
|
24
|
|
25
|
private final static String PRODUCER_ID = "OpenAIRE";
|
26
|
|
27
|
private final static String[] datePatterns = { "yyyy-MM-dd", "yyyy-MM-dd" };
|
28
|
|
29
|
public static EventMessage asEvent(final OafEntity oaf, final Topic topic, final OpenAireEventPayload payload, final OafEntity source, final float trust) {
|
30
|
return asEvent(oaf, topic, payload, getKey(source.getCollectedfromList()), getValue(source.getCollectedfromList()), trust,
|
31
|
listValues(source.getOriginalIdList()));
|
32
|
}
|
33
|
|
34
|
public static EventMessage asEvent(final OafEntity oaf,
|
35
|
final Topic topic,
|
36
|
final OpenAireEventPayload payload,
|
37
|
final String sourceDatasourceId,
|
38
|
final String sourceDatasourceName,
|
39
|
final float trust) {
|
40
|
return asEvent(oaf, topic, payload, sourceDatasourceId, sourceDatasourceName, trust, null);
|
41
|
}
|
42
|
|
43
|
public static EventMessage asEvent(final OafEntity oaf,
|
44
|
final Topic topic,
|
45
|
final OpenAireEventPayload payload,
|
46
|
final String sourceDatasourceId,
|
47
|
final String sourceDatasourceName,
|
48
|
final float trust,
|
49
|
final List<String> sourceOriginalIds) {
|
50
|
final Map<String, MapValue> map = Maps.newHashMap();
|
51
|
|
52
|
// TARGET INFO
|
53
|
final String targetDatasourceId = getKey(oaf.getCollectedfromList());
|
54
|
if (StringUtils.isNotBlank(targetDatasourceId)) {
|
55
|
map.put("target_datasource_id", new MapValue(MapValueType.STRING, targetDatasourceId));
|
56
|
}
|
57
|
|
58
|
final String targetDatasourceName = getValue(oaf.getCollectedfromList());
|
59
|
if (StringUtils.isNotBlank(targetDatasourceName)) {
|
60
|
map.put("target_datasource_name", new MapValue(MapValueType.STRING, targetDatasourceName));
|
61
|
}
|
62
|
|
63
|
final String title = getValue(oaf.getResult().getMetadata().getTitleList());
|
64
|
if (StringUtils.isNotBlank(title)) {
|
65
|
map.put("target_publication_title", new MapValue(MapValueType.STRING, title));
|
66
|
}
|
67
|
|
68
|
final String date = oaf.getResult().getMetadata().getDateofacceptance().getValue();
|
69
|
if (isValidDate(date)) {
|
70
|
map.put("target_dateofacceptance", new MapValue(MapValueType.DATE, date));
|
71
|
}
|
72
|
|
73
|
final List<String> subjects = listValues(oaf.getResult().getMetadata().getSubjectList());
|
74
|
if (CollectionUtils.isNotEmpty(subjects)) {
|
75
|
map.put("target_publication_subject_list", new MapValue(MapValueType.LIST_STRING, subjects));
|
76
|
}
|
77
|
|
78
|
final List<String> authors = listValues(oaf.getResult().getAuthorList());
|
79
|
if (CollectionUtils.isNotEmpty(authors)) {
|
80
|
map.put("target_publication_author_list", new MapValue(MapValueType.LIST_STRING, authors));
|
81
|
}
|
82
|
|
83
|
map.put("target_publication_id", new MapValue(MapValueType.STRING, getValue(oaf.getOriginalIdList())));
|
84
|
map.put("trust", new MapValue(MapValueType.FLOAT, trust));
|
85
|
|
86
|
// PROVENANCE INFO
|
87
|
if (StringUtils.isNotBlank(sourceDatasourceName)) {
|
88
|
map.put("provenance_datasource_name", new MapValue(MapValueType.STRING, sourceDatasourceName));
|
89
|
}
|
90
|
|
91
|
if (StringUtils.isNotBlank(sourceDatasourceId)) {
|
92
|
map.put("provenance_datasource_id", new MapValue(MapValueType.STRING, sourceDatasourceId));
|
93
|
}
|
94
|
|
95
|
if ((sourceOriginalIds != null) && !sourceOriginalIds.isEmpty()) {
|
96
|
map.put("provenance_publication_id_list", new MapValue(MapValueType.LIST_STRING, sourceOriginalIds));
|
97
|
}
|
98
|
|
99
|
// final OpenAireEventPayload p = new OpenAireEventPayload(oaf.getEntity().getId());
|
100
|
|
101
|
return new EventMessage(PRODUCER_ID, topic, payload.toJSON(), EventMessage.TTH_INFINITE, map);
|
102
|
}
|
103
|
|
104
|
private static boolean isValidDate(final String date) {
|
105
|
if (StringUtils.isBlank(date)) {
|
106
|
return false;
|
107
|
}
|
108
|
try {
|
109
|
DateUtils.parseDate(date, datePatterns);
|
110
|
return true;
|
111
|
} catch (ParseException e) {
|
112
|
return false;
|
113
|
}
|
114
|
}
|
115
|
|
116
|
}
|