Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.broker.add;
2

    
3
import java.io.IOException;
4
import java.util.HashSet;
5
import java.util.Map;
6
import java.util.Set;
7

    
8
import com.google.common.base.Function;
9
import com.google.common.collect.Iterables;
10
import com.google.common.collect.Maps;
11
import com.google.common.collect.Sets;
12
import eu.dnetlib.data.broker.model.openaire.OpenAireEventPayload;
13
import eu.dnetlib.data.mapreduce.hbase.broker.Topic;
14
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
15
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
16
import eu.dnetlib.data.mapreduce.util.OafDecoder;
17
import eu.dnetlib.data.proto.OafProtos.Oaf;
18
import eu.dnetlib.data.proto.OafProtos.OafEntity;
19
import org.apache.commons.collections.MapUtils;
20
import org.apache.commons.lang.StringUtils;
21
import org.apache.hadoop.hbase.client.HTable;
22
import org.apache.hadoop.hbase.client.Result;
23
import org.apache.hadoop.hbase.client.ResultScanner;
24
import org.apache.hadoop.hbase.client.Scan;
25
import org.apache.hadoop.hbase.filter.FilterList;
26
import org.apache.hadoop.hbase.filter.FilterList.Operator;
27
import org.apache.hadoop.hbase.filter.PrefixFilter;
28
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
29
import org.apache.hadoop.hbase.mapreduce.TableMapper;
30
import org.apache.hadoop.hbase.util.Bytes;
31
import org.apache.hadoop.io.Text;
32

    
33
import static eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory.asEvent;
34
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.listKeys;
35
import static eu.dnetlib.data.mapreduce.util.OafHbaseUtils.getPropertyValues;
36

    
37
/**
38
 * Created by claudio on 08/07/16.
39
 */
40
public class AdditionMapper extends TableMapper<Text, Text> {
41

    
42
	private Text outKey;
43
	private Text outValue;
44

    
45
	/**
46
	 * Map ProjectID -> Set of Organization info (id, name)
47
	 */
48
	private Map<String, Set<EntityInfo>> projectOrganization;
49

    
50
	/**
51
	 * Map OrganizationID -> Set of Datasource info (id, name)
52
	 */
53
	private Map<String, Set<EntityInfo>> organizationDatasource;
54

    
55
	private Set<String> organizationPrefixBlacklist = Sets.newHashSet();
56

    
57
	// White list for datasource typologies.
58
	private Set<String> dsTypeWhitelist = Sets.newHashSet();
59

    
60
	@Override
61
	protected void setup(final Context context) throws IOException {
62

    
63
		organizationPrefixBlacklist = Sets.newHashSet("nsf_________");
64
		dsTypeWhitelist.addAll(getPropertyValues(context, "broker.datasource.type.whitelist"));
65

    
66
		projectOrganization = getRelMap(context, "20", "organization", "projectOrganization_participation_isParticipant", organizationPrefixBlacklist);
67
		organizationDatasource = getRelMap(context, "10", "datasource", "datasourceOrganization_provision_provides", dsTypeWhitelist);
68

    
69
		outKey = new Text("");
70
		outValue = new Text();
71
	}
72

    
73
	class EntityInfo {
74
		private String id;
75
		private String name;
76

    
77
		public EntityInfo(final String id, final String name) {
78
			this.id = id;
79
			this.name = name;
80
		}
81

    
82
		@Override
83
		public int hashCode() {
84
			return getId().hashCode();
85
		}
86

    
87
		@Override
88
		public boolean equals(final Object obj) {
89
			return getId().equals(obj);
90
		}
91

    
92
		public String getId() {
93
			return id;
94
		}
95

    
96
		public void setId(final String id) {
97
			this.id = id;
98
		}
99

    
100
		public String getName() {
101
			return name;
102
		}
103

    
104
		public void setName(final String name) {
105
			this.name = name;
106
		}
107
	}
108

    
109
	@Override
110
	protected void map(final ImmutableBytesWritable key, final Result value, final Context context) throws IOException, InterruptedException {
111

    
112
		final Map<byte[], byte[]> map = value.getFamilyMap(Bytes.toBytes("result"));
113
		final byte[] bodyB = map.get(Bytes.toBytes("body"));
114

    
115
		if (MapUtils.isEmpty(map) || bodyB == null) {
116
			context.getCounter("result", "empty body").increment(1);
117
			return;
118
		}
119

    
120
		final Oaf oaf = Oaf.parseFrom(bodyB);
121

    
122
		if (oaf.getDataInfo().getDeletedbyinference()) {
123
			context.getCounter("result", "deletedbyinference = true").increment(1);
124
			return;
125
		}
126

    
127
		final Set<String> currentDatasourceIds = Sets.newHashSet(listKeys(oaf.getEntity().getCollectedfromList()));
128

    
129
		final Map<byte[], byte[]> resultProject = value.getFamilyMap(Bytes.toBytes("resultProject_outcome_isProducedBy"));
130
		if (!MapUtils.isEmpty(resultProject)) {
131

    
132
			for (String projectId : asStringID(resultProject.keySet())) {
133

    
134
				final Set<EntityInfo> organizations = projectOrganization.get(projectId);
135

    
136
				if (organizations != null && !organizations.isEmpty()) {
137

    
138
					for (EntityInfo organization : organizations) {
139

    
140
						final Set<EntityInfo> datasources = organizationDatasource.get(organization.getId());
141

    
142
						if (datasources != null && !datasources.isEmpty()) {
143

    
144
							for (EntityInfo datasource : datasources) {
145

    
146
								if (!currentDatasourceIds.contains(datasource.getId())) {
147

    
148
									//emit event for datasourceId
149

    
150
									final EventMessage event = asEvent(oaf.getEntity(), Topic.ADD_PROJECT, datasource.getId(), datasource.getName());
151
									final OpenAireEventPayload payload = OpenAireEventPayloadFactory.fromOAF(oaf.getEntity(), oaf.getEntity());
152
									//event.setPayload(HighlightFactory.highlightEnrichPid(payload, Lists.newArrayList(pids)).toJSON());
153
									event.setPayload(payload.toJSON());
154

    
155
									emit(event, context);
156

    
157
									context.getCounter("event", Topic.ADD_PROJECT.getValue()).increment(1);
158
								}
159
							}
160
						}
161
					}
162
				}
163
			}
164
		}
165
	}
166

    
167

    
168
	private void emit(final EventMessage e, final Context context) throws IOException, InterruptedException {
169
		//tKey.set(e.getMap().get("id"));
170
		outValue.set(e.toString());
171
		context.write(outKey, outValue);
172
	}
173

    
174
	private Iterable<String> asStringID(final Iterable<byte[]> in) {
175
		return Iterables.transform(in, new Function<byte[], String>() {
176
			@Override
177
			public String apply(final byte[] input) {
178
				return getID(new String(input));
179
			}
180
		});
181
	}
182

    
183
	private Map<String, Set<EntityInfo>> getRelMap(final Context context, final String prefixFilter, final String entity, final String columnFamily, final Set<String> filter) throws IOException {
184
		System.out.println(String.format("loading %s, %s", entity, columnFamily));
185

    
186
		final Map<String, Set<EntityInfo>> out = Maps.newHashMap();
187

    
188
		final ResultScanner res = scanTable(context, prefixFilter, entity, columnFamily);
189

    
190
		for(Result r : res) {
191

    
192
			final byte[] bodyB = r.getValue(Bytes.toBytes(entity), Bytes.toBytes("body"));
193

    
194
			if (bodyB == null) {
195
				context.getCounter("missing body", entity).increment(1);
196
			} else {
197

    
198
				final OafEntity oafEntity = OafDecoder.decode(bodyB).getEntity();
199
				final EntityInfo kv = getEntityInfo(oafEntity, filter);
200
				if (kv != null) {
201

    
202
					final Map<byte[], byte[]> relMap = r.getFamilyMap(Bytes.toBytes(columnFamily));
203

    
204
					if (MapUtils.isNotEmpty(relMap)) {
205
						for (String id : asStringID(relMap.keySet())) {
206

    
207
							if (!out.containsKey(id)) {
208
								out.put(id, new HashSet<EntityInfo>());
209
							}
210
							out.get(id).add(kv);
211
						}
212
					} else {
213
						context.getCounter("skipped", entity).increment(1);
214
					}
215
				}
216
			}
217
		}
218

    
219
		res.close();
220

    
221
		System.out.println(String.format("loaded map for %s, %s, size: %s", entity, columnFamily, out.size()));
222
		return out;
223
	}
224

    
225
	private EntityInfo getEntityInfo(final OafEntity entity, final Set<String> filter) {
226

    
227
		final String id = getID(entity.getId());
228
		switch (entity.getType()) {
229
		case datasource:
230
			final String dsType = entity.getDatasource().getMetadata().getDatasourcetype().getClassid();
231
			if(!filter.contains(dsType)) {
232
				return null;
233
			}
234
			return new EntityInfo(id, entity.getDatasource().getMetadata().getOfficialname().getValue());
235
		case organization:
236
			if (filter.contains(prefix(id))) {
237
				return null;
238
			}
239
			return new EntityInfo(id, entity.getOrganization().getMetadata().getLegalname().getValue());
240
		default:
241
			throw new IllegalArgumentException("invalid entity: " + entity);
242
		}
243
	}
244

    
245
	private ResultScanner scanTable(final Context context, final String prefixFilter, final String entity, final String columnFamily) throws IOException {
246
		final Scan scan = new Scan();
247
		final FilterList fl = new FilterList(Operator.MUST_PASS_ALL);
248
		fl.addFilter(new PrefixFilter(Bytes.toBytes(prefixFilter)));
249
		scan.setFilter(fl);
250
		scan.addFamily(Bytes.toBytes(entity));
251
		scan.addFamily(Bytes.toBytes(columnFamily));
252

    
253
		final String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
254

    
255
		System.out.println(String.format("table name: '%s'", tableName));
256

    
257
		final HTable table = new HTable(context.getConfiguration(), tableName);
258

    
259
		return table.getScanner(scan);
260
	}
261

    
262
	private String getID(final String s) {
263
		return StringUtils.substringAfter(s, "|");
264
	}
265

    
266
	private String prefix(final String s) {
267
		return StringUtils.substringBefore(s, "::");
268
	}
269

    
270
}
    (1-1/1)