Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.add;
2

    
3
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
4
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues;
5
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.listKeys;
6

    
7
import java.io.IOException;
8
import java.util.HashSet;
9
import java.util.Map;
10
import java.util.Set;
11

    
12
import org.apache.commons.collections.MapUtils;
13
import org.apache.commons.lang.StringUtils;
14
import org.apache.commons.lang.math.RandomUtils;
15
import org.apache.hadoop.hbase.client.HTable;
16
import org.apache.hadoop.hbase.client.Result;
17
import org.apache.hadoop.hbase.client.ResultScanner;
18
import org.apache.hadoop.hbase.client.Scan;
19
import org.apache.hadoop.hbase.filter.FilterList;
20
import org.apache.hadoop.hbase.filter.FilterList.Operator;
21
import org.apache.hadoop.hbase.filter.PrefixFilter;
22
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
23
import org.apache.hadoop.hbase.mapreduce.TableMapper;
24
import org.apache.hadoop.hbase.util.Bytes;
25
import org.apache.hadoop.io.Text;
26

    
27
import com.google.common.base.Function;
28
import com.google.common.collect.Iterables;
29
import com.google.common.collect.Maps;
30
import com.google.common.collect.Sets;
31

    
32
import eu.dnetlib.broker.objects.OpenAireEventPayload;
33
import eu.dnetlib.data.mapreduce.hbase.broker.Topic;
34
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
35
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
36
import eu.dnetlib.data.mapreduce.util.OafDecoder;
37
import eu.dnetlib.data.proto.OafProtos.Oaf;
38
import eu.dnetlib.data.proto.OafProtos.OafEntity;
39

    
40
/**
41
 * Created by claudio on 08/07/16.
42
 */
43
public class AdditionMapper extends TableMapper<Text, Text> {
44

    
45
	private Text outKey;
46
	private Text outValue;
47

    
48
	/**
49
	 * Map ProjectID -> Set of Organization info (id, name)
50
	 */
51
	private Map<String, Set<EntityInfo>> projectOrganization;
52

    
53
	/**
54
	 * Map OrganizationID -> Set of Datasource info (id, name)
55
	 */
56
	private Map<String, Set<EntityInfo>> organizationDatasource;
57

    
58
	private Set<String> organizationPrefixBlacklist = Sets.newHashSet();
59

    
60
	// White list for datasource typologies.
61
	private Set<String> dsTypeWhitelist = Sets.newHashSet();
62

    
63
	@Override
64
	protected void setup(final Context context) throws IOException {
65

    
66
		organizationPrefixBlacklist = Sets.newHashSet("nsf_________");
67
		dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist"));
68

    
69
		projectOrganization = getRelMap(context, "20", "organization", "projectOrganization_participation_isParticipant", organizationPrefixBlacklist);
70
		organizationDatasource = getRelMap(context, "10", "datasource", "datasourceOrganization_provision_provides", dsTypeWhitelist);
71

    
72
		outKey = new Text("");
73
		outValue = new Text();
74
	}
75

    
76
	class EntityInfo {
77

    
78
		private String id;
79
		private String name;
80

    
81
		public EntityInfo(final String id, final String name) {
82
			this.id = id;
83
			this.name = name;
84
		}
85

    
86
		@Override
87
		public int hashCode() {
88
			return getId().hashCode();
89
		}
90

    
91
		@Override
92
		public boolean equals(final Object obj) {
93
			return getId().equals(obj);
94
		}
95

    
96
		public String getId() {
97
			return id;
98
		}
99

    
100
		public void setId(final String id) {
101
			this.id = id;
102
		}
103

    
104
		public String getName() {
105
			return name;
106
		}
107

    
108
		public void setName(final String name) {
109
			this.name = name;
110
		}
111
	}
112

    
113
	@Override
114
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
115
		try {
116
			final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes("result"));
117
			final byte[] bodyB = map.get(Bytes.toBytes("body"));
118

    
119
			if (MapUtils.isEmpty(map) || (bodyB == null)) {
120
				context.getCounter("result", "empty body").increment(1);
121
				return;
122
			}
123

    
124
			final Oaf oaf = Oaf.parseFrom(bodyB);
125

    
126
			if (oaf.getDataInfo().getDeletedbyinference()) {
127
				context.getCounter("result", "deletedbyinference = true").increment(1);
128
				return;
129
			}
130

    
131
			final Set<String> currentDatasourceIds = Sets.newHashSet(listKeys(oaf.getEntity().getCollectedfromList()));
132

    
133
			final Map<byte[], byte[]> resultProject = value.getFamilyMap(Bytes.toBytes("resultProject_outcome_isProducedBy"));
134
			if (!MapUtils.isEmpty(resultProject)) {
135

    
136
				for (final String projectId : asStringID(resultProject.keySet())) {
137

    
138
					final Set<EntityInfo> organizations = projectOrganization.get(projectId);
139

    
140
					if ((organizations != null) && !organizations.isEmpty()) {
141

    
142
						for (final EntityInfo organization : organizations) {
143

    
144
							final Set<EntityInfo> datasources = organizationDatasource.get(organization.getId());
145

    
146
							if ((datasources != null) && !datasources.isEmpty()) {
147

    
148
								for (final EntityInfo datasource : datasources) {
149

    
150
									if (!currentDatasourceIds.contains(datasource.getId())) {
151

    
152
										// emit event for datasourceId
153
										final float trust = RandomUtils.nextFloat();
154
										final OpenAireEventPayload payload = OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), oaf.getEntity(), trust);
155
										// event.setPayload(HighlightFactory.highlightEnrichPid(payload,
156
										// Lists.newArrayList(pids)).toJSON());
157
										final EventMessage event =
158
												asEvent(oaf.getEntity(), Topic.ADD_BY_PROJECT, payload, datasource.getId(), datasource.getName(), trust);
159

    
160
										emit(event, context);
161

    
162
										context.getCounter("event", Topic.ADD_BY_PROJECT.getValue()).increment(1);
163
									}
164
								}
165
							}
166
						}
167
					}
168
				}
169
			}
170
		} catch (final Exception e) {
171
			throw new RuntimeException(e);
172
		}
173
	}
174

    
175
	private void emit(final EventMessage e, final Context context) throws IOException, InterruptedException {
176
		// tKey.set(e.getMap().get("id"));
177
		outValue.set(e.toString());
178
		context.write(outKey, outValue);
179
	}
180

    
181
	private Iterable<String> asStringID(final Iterable<byte[]> in) {
182
		return Iterables.transform(in, new Function<byte[], String>() {
183

    
184
			@Override
185
			public String apply(final byte[] input) {
186
				return getID(new String(input));
187
			}
188
		});
189
	}
190

    
191
	private Map<String, Set<EntityInfo>> getRelMap(final Context context,
192
			final String prefixFilter,
193
			final String entity,
194
			final String columnFamily,
195
			final Set<String> filter) throws IOException {
196
		System.out.println(String.format("loading %s, %s", entity, columnFamily));
197

    
198
		final Map<String, Set<EntityInfo>> out = Maps.newHashMap();
199
		final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
200

    
201
		System.out.println(String.format("table name: '%s'", tableName));
202

    
203
		try (final HTable table = new HTable(context.getConfiguration(), tableName);
204
				final ResultScanner res = scanTable(table, prefixFilter, entity, columnFamily)) {
205

    
206
			for (final Result r : res) {
207

    
208
				final byte[] bodyB = r.getValue(Bytes.toBytes(entity), Bytes.toBytes("body"));
209

    
210
				if (bodyB == null) {
211
					context.getCounter("missing body", entity).increment(1);
212
				} else {
213

    
214
					final OafEntity oafEntity = OafDecoder.decode(bodyB).getEntity();
215
					final EntityInfo kv = getEntityInfo(oafEntity, filter);
216
					if (kv != null) {
217

    
218
						final Map<byte[], byte[]> relMap = r.getFamilyMap(Bytes.toBytes(columnFamily));
219

    
220
						if (MapUtils.isNotEmpty(relMap)) {
221
							for (final String id : asStringID(relMap.keySet())) {
222

    
223
								if (!out.containsKey(id)) {
224
									out.put(id, new HashSet<EntityInfo>());
225
								}
226
								out.get(id).add(kv);
227
							}
228
						} else {
229
							context.getCounter("skipped", entity).increment(1);
230
						}
231
					}
232
				}
233
			}
234
		}
235

    
236
		System.out.println(String.format("loaded map for %s, %s, size: %s", entity, columnFamily, out.size()));
237
		return out;
238
	}
239

    
240
	private EntityInfo getEntityInfo(final OafEntity entity, final Set<String> filter) {
241

    
242
		final String id = getID(entity.getId());
243
		switch (entity.getType()) {
244
		case datasource:
245
			final String dsType = entity.getDatasource().getMetadata().getDatasourcetype().getClassid();
246
			if (!filter.contains(dsType)) { return null; }
247
			return new EntityInfo(id, entity.getDatasource().getMetadata().getOfficialname().getValue());
248
		case organization:
249
			if (filter.contains(prefix(id))) { return null; }
250
			return new EntityInfo(id, entity.getOrganization().getMetadata().getLegalname().getValue());
251
		default:
252
			throw new IllegalArgumentException("invalid entity: " + entity);
253
		}
254
	}
255

    
256
	private ResultScanner scanTable(final HTable table, final String prefixFilter, final String entity, final String columnFamily) throws IOException {
257
		final Scan scan = new Scan();
258
		final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
259
		fl.addFilter(new PrefixFilter(Bytes.toBytes(prefixFilter)));
260
		scan.setFilter(fl);
261
		scan.addFamily(Bytes.toBytes(entity));
262
		scan.addFamily(Bytes.toBytes(columnFamily));
263

    
264
		return table.getScanner(scan);
265
	}
266

    
267
	private String getID(final String s) {
268
		return StringUtils.substringAfter(s, "|");
269
	}
270

    
271
	private String prefix(final String s) {
272
		return StringUtils.substringBefore(s, "::");
273
	}
274

    
275
}
    (1-1/1)