Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.mapping;
2

    
3
import java.text.ParseException;
4
import java.util.List;
5
import java.util.Map;
6

    
7
import com.google.common.collect.Maps;
8
import eu.dnetlib.broker.objects.OpenAireEventPayload;
9
import eu.dnetlib.data.mapreduce.hbase.broker.Topic;
10
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
11
import eu.dnetlib.data.mapreduce.hbase.broker.model.MapValue;
12
import eu.dnetlib.data.mapreduce.hbase.broker.model.MapValueType;
13
import eu.dnetlib.data.proto.OafProtos.OafEntity;
14
import org.apache.commons.collections.CollectionUtils;
15
import org.apache.commons.lang.StringUtils;
16
import org.apache.commons.lang.time.DateUtils;
17

    
18
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.*;
19

    
20
/**
21
 * Created by claudio on 20/07/16.
22
 */
23
public class EventFactory {
24

    
25
	private final static String PRODUCER_ID = "OpenAIRE";
26

    
27
	private final static String[] datePatterns = { "yyyy-MM-dd", "yyyy-MM-dd" };
28

    
29
	public static EventMessage asEvent(final OafEntity oaf, final Topic topic, final OpenAireEventPayload payload, final OafEntity source, final float trust) {
30
		return asEvent(oaf, topic, payload, getKey(source.getCollectedfromList()), getValue(source.getCollectedfromList()), trust,
31
				listValues(source.getOriginalIdList()));
32
	}
33

    
34
	public static EventMessage asEvent(final OafEntity oaf,
35
			final Topic topic,
36
			final OpenAireEventPayload payload,
37
			final String sourceDatasourceId,
38
			final String sourceDatasourceName,
39
			final float trust) {
40
		return asEvent(oaf, topic, payload, sourceDatasourceId, sourceDatasourceName, trust, null);
41
	}
42

    
43
	public static EventMessage asEvent(final OafEntity oaf,
44
			final Topic topic,
45
			final OpenAireEventPayload payload,
46
			final String sourceDatasourceId,
47
			final String sourceDatasourceName,
48
			final float trust,
49
			final List<String> sourceOriginalIds) {
50
		final Map<String, MapValue> map = Maps.newHashMap();
51

    
52
		// TARGET INFO
53
		final String targetDatasourceId = getKey(oaf.getCollectedfromList());
54
		if (StringUtils.isNotBlank(targetDatasourceId)) {
55
			map.put("target_datasource_id", new MapValue(MapValueType.STRING, targetDatasourceId));
56
		}
57

    
58
		final String targetDatasourceName = getValue(oaf.getCollectedfromList());
59
		if (StringUtils.isNotBlank(targetDatasourceName)) {
60
			map.put("target_datasource_name", new MapValue(MapValueType.STRING, targetDatasourceName));
61
		}
62

    
63
		final String title = getValue(oaf.getResult().getMetadata().getTitleList());
64
		if (StringUtils.isNotBlank(title)) {
65
			map.put("target_publication_title", new MapValue(MapValueType.STRING, title));
66
		}
67

    
68
		final String date = oaf.getResult().getMetadata().getDateofacceptance().getValue();
69
		if (isValidDate(date)) {
70
			map.put("target_dateofacceptance", new MapValue(MapValueType.DATE, date));
71
		}
72

    
73
		final List<String> subjects = listValues(oaf.getResult().getMetadata().getSubjectList());
74
		if (CollectionUtils.isNotEmpty(subjects)) {
75
			map.put("target_publication_subject_list", new MapValue(MapValueType.LIST_STRING, subjects));
76
		}
77

    
78
		final List<String> authors = listValues(oaf.getResult().getAuthorList());
79
		if (CollectionUtils.isNotEmpty(authors)) {
80
			map.put("target_publication_author_list", new MapValue(MapValueType.LIST_STRING, authors));
81
		}
82

    
83
		map.put("target_publication_id", new MapValue(MapValueType.STRING, getValue(oaf.getOriginalIdList())));
84
		map.put("trust", new MapValue(MapValueType.FLOAT, trust));
85

    
86
		// PROVENANCE INFO
87
		if (StringUtils.isNotBlank(sourceDatasourceName)) {
88
			map.put("provenance_datasource_name", new MapValue(MapValueType.STRING, sourceDatasourceName));
89
		}
90

    
91
		if (StringUtils.isNotBlank(sourceDatasourceId)) {
92
			map.put("provenance_datasource_id", new MapValue(MapValueType.STRING, sourceDatasourceId));
93
		}
94

    
95
		if ((sourceOriginalIds != null) && !sourceOriginalIds.isEmpty()) {
96
			map.put("provenance_publication_id_list", new MapValue(MapValueType.LIST_STRING, sourceOriginalIds));
97
		}
98

    
99
		// final OpenAireEventPayload p = new OpenAireEventPayload(oaf.getEntity().getId());
100

    
101
		return new EventMessage(PRODUCER_ID, topic, payload.toJSON(), EventMessage.TTH_INFINITE, map);
102
	}
103

    
104
	private static boolean isValidDate(final String date) {
105
		if (StringUtils.isBlank(date)) {
106
			return false;
107
		}
108
		try {
109
			DateUtils.parseDate(date, datePatterns);
110
			return true;
111
		} catch (ParseException e) {
112
			return false;
113
		}
114
	}
115

    
116
}
(1-1/4)