Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.statsExport;
2

    
3
import java.io.ByteArrayInputStream;
4
import java.io.IOException;
5
import java.io.InputStream;
6
import java.util.ArrayList;
7
import java.util.HashMap;
8
import java.util.List;
9
import java.util.Map;
10
import java.util.Map.Entry;
11
import java.util.Properties;
12

    
13
import org.apache.hadoop.hbase.client.Result;
14
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
15
import org.apache.hadoop.hbase.mapreduce.TableMapper;
16
import org.apache.hadoop.hbase.util.Bytes;
17
import org.apache.hadoop.io.Text;
18
import org.apache.log4j.Logger;
19

    
20
import com.google.protobuf.InvalidProtocolBufferException;
21

    
22
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
23
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
24
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
25
import eu.dnetlib.data.mapreduce.hbase.statsExport.utils.Serializer;
26
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
27
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
28
import eu.dnetlib.data.proto.OafProtos.Oaf;
29
import eu.dnetlib.data.proto.OafProtos.OafRel;
30
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
31
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
32
import eu.dnetlib.data.proto.TypeProtos.Type;
33

    
34
/**
35
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
36
 * export
37
 */
38
public class StatsMapper extends TableMapper<Text, ImmutableBytesWritable> {
39
	private Logger log = Logger.getLogger(this.getClass());
40

    
41
	private Serializer serializer;
42

    
43
	private EntityConfigTable entityConfigTable;
44

    
45
	private Properties tableMappings;
46
	private long writtenRecs = 0;
47

    
48
	private long threshold = 100000;
49

    
50
	public static enum STATS_COUNTER {
51
		INPUT_RELATIONS, WRITTEN_RELATIONS, OUTPUT_DERIVED_RELATIONS, INPUT_RECS, OUTPUT_RECS, ROTTEN_RECORDS, WRITTEN_ENTITIES, VALID_RECS, INPUT_ENTITIES
52
	};
53

    
54
	public static enum PROTOS_COUNTER {
55
		RESULT, PROJECT, DATASOURCE, ORGANIZATION, DATASOURCEORGANIZATION, DATASOURCETOPIC, DATASOURCELANGUAGE, PROJECTORGANIZATION, RESULTCLAIM, RESULTCLASSIFICATION, RESULTCONCEPT, RESULTLANGUAGE, RESULTORGANIZATION, RESULTRESULT, RESULTPROJECT, RESULTTOPIC, RESULTDATASOURCE
56

    
57
	};
58

    
59
	@Override
60
	protected void setup(Context context) throws IOException, InterruptedException {
61
		loadTableMap(context);
62
		loadEntityConfig(context);
63

    
64
		serializer = new Serializer();
65
		serializer.setDELIM(context.getConfiguration().get("stats.delim"));
66
		serializer.setNULL_NUM(context.getConfiguration().get("stats.nullNum"));
67
		serializer.setNULL_STRING(context.getConfiguration().get("stats.nullString"));
68
		serializer.setENCLOSED(context.getConfiguration().get("stats.enclChar"));
69

    
70
	}
71

    
72
	@Override
73
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context) throws IOException {
74

    
75
		final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
76

    
77
		context.getCounter(STATS_COUNTER.INPUT_RECS).increment(1);
78

    
79
		final Type type = decoder.getType();
80

    
81
		final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
82

    
83
		if (isValid(oaf, context))
84

    
85
		{
86
			context.getCounter(STATS_COUNTER.VALID_RECS).increment(1);
87
			if (!deletedByInference(oaf) || entityConfigTable.includeDuplicates(type)) {
88
				emitProtos(context, result, oaf);
89
			}
90
		} else
91

    
92
		{
93
			context.getCounter(STATS_COUNTER.ROTTEN_RECORDS).increment(1);
94

    
95
		}
96

    
97
	}
98

    
99
	private boolean isValid(Oaf oaf, Context context) {
100
		try {
101

    
102
			if (oaf != null && oaf.isInitialized()) {
103
				return true;
104
			}
105

    
106
		} catch (Exception e) {
107
			log.error("OAF NOT INITIALIZED ");
108
		}
109

    
110
		return false;
111
	}
112

    
113
	private void emitProtos(Context context, Result result, Oaf oaf) {
114
		Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo()).setTimestamp(oaf.getTimestamp());
115

    
116
		Type type = oaf.getEntity().getType();
117
		oafBuilder.setEntity(oaf.getEntity());
118

    
119
		// emit relation first so we can cache them to entity protos
120
		emitRelation(context, result, oaf, type, oafBuilder);
121
		emitEntity(context, oaf, type, oafBuilder);
122

    
123
	}
124

    
125
	private void emitEntity(Context context, Oaf oaf, Type type, Oaf.Builder oafBuilder) {
126
		// ONLY EMIT ENTITIES THAT WE WANT TO IMPORT IN STATS DB
127
		context.getCounter(STATS_COUNTER.INPUT_ENTITIES).increment(1);
128
		if (tableMappings.containsKey(type.toString())) {
129
			String serialized = serializer.serialize(oafBuilder.build());
130
			if (serialized != null) {
131

    
132
				try {
133
					Text TextKeyOut = new Text(type + "," + serializer.getId(oaf));
134
					context.getCounter(STATS_COUNTER.WRITTEN_ENTITIES).increment(1);
135
					context.getCounter(PROTOS_COUNTER.valueOf(type.toString().toUpperCase())).increment(1);
136

    
137
					context.write((TextKeyOut), new ImmutableBytesWritable(serialized.getBytes()));
138

    
139
				} catch (IOException e) {
140
					log.error("Error writing entity to M/R output", e);
141
				} catch (InterruptedException e) {
142
					log.error("Error writing entity to M/R output: Job Interrupted.", e);
143
				}
144
			}
145
		}
146
	}
147

    
148
	// may have multiple relations per each field
149
	private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
150

    
151
		try {
152
			// derived relations; ResultLanguages, resultConcepts etc are
153
			// created here
154
			HashMap<String, List<String>> relMap = serializer.extractRelations(oaf);
155
			if (relMap != null && !relMap.isEmpty()) {
156

    
157
				for (Entry<String, List<String>> rel : relMap.entrySet()) {
158

    
159
					Text TextKeyOut = new Text(rel.getKey() + "," + serializer.getId(oaf));
160

    
161
					for (String relVals : rel.getValue()) {
162

    
163
						if (relVals != null && !relVals.isEmpty()) {
164

    
165
							context.getCounter(STATS_COUNTER.OUTPUT_DERIVED_RELATIONS).increment(1);
166
							context.write((TextKeyOut), new ImmutableBytesWritable(relVals.getBytes()));
167

    
168
						}
169
					}
170
				}
171

    
172
			}
173

    
174
		} catch (Exception e) {
175
			log.error("Error writing relation to M/R output", e);
176
		}
177

    
178
		// Existing Hbase relations are generated here
179
		if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
180

    
181
			for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
182

    
183
				try {
184

    
185
					final Map<byte[], byte[]> columnMap = result.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
186

    
187
					ArrayList<OafRel> relOaf = decodeRelation(oaf, context, columnMap, ld);
188

    
189
					context.getCounter(STATS_COUNTER.INPUT_RELATIONS).increment(1);
190
					for (OafRel rel : relOaf) {
191

    
192
						builder.getEntityBuilder().addCachedRel(rel);
193

    
194
						try {
195

    
196
							// TODO skip tables what we dont want to import (
197
							// like personresults)
198
							if (tableMappings.containsKey(rel.getRelType().toString()))
199

    
200
							{
201
								Text TextKeyOut = new Text(ld.getRelDescriptor().getRelType().toString() + "," + serializer.getId(rel));
202
								String buff = serializer.serialize(rel);
203
								context.getCounter(STATS_COUNTER.WRITTEN_RELATIONS).increment(1);
204
								// context.getCounter(PROTOS_COUNTER.valueOf(ld.getRelDescriptor().getRelType().toString().toUpperCase())).increment(1);
205

    
206
								context.write((TextKeyOut), new ImmutableBytesWritable(buff.getBytes()));
207

    
208
							}
209

    
210
						} catch (Exception e) {
211
							log.error("Error while writing Relation Proto to M/R output", e);
212
						}
213

    
214
					}
215
				} catch (Exception e) {
216
					log.error("Error while decoding Relation Proto from HBase Body", e);
217

    
218
				}
219

    
220
			}
221
		}
222

    
223
	}
224

    
225
	private ArrayList<OafRel> decodeRelation(final Oaf body, final Context context, Map<byte[], byte[]> columnMap, final LinkDescriptor ld) throws IOException, InterruptedException {
226

    
227
		ArrayList<OafRel> rels = new ArrayList<OafRel>();
228
		if (isValid(columnMap)) {
229

    
230
			for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
231

    
232
				final Oaf decodedOaf = decodeProto(context, e.getValue());
233
				if (decodedOaf != null && !deletedByInference(decodedOaf)) {
234

    
235
					OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
236

    
237
					// skip dedups
238

    
239
					if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString()) && isDedupSelf(relBuilder)) {
240
						log.info("avoid to emit dedup self: " + relBuilder.getSource());
241

    
242
					}
243

    
244
					else {
245

    
246
						OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
247
						relBuilder.setCachedTarget(body.getEntity());
248
						rels.add(oafRel);
249

    
250
					}
251
				}
252

    
253
			}
254
		}
255

    
256
		return rels;
257
	}
258

    
259
	private Oaf decodeProto(final Context context, final byte[] body) {
260
		try {
261
			return Oaf.parseFrom(body);
262
		} catch (InvalidProtocolBufferException e) {
263
			context.getCounter("decodeProto", e.getClass().getName()).increment(1);
264
			log.error("Corrupted proto ", e);
265

    
266
		}
267
		return null;
268
	}
269

    
270
	private void loadTableMap(Context context) throws IOException {
271
		tableMappings = new Properties();
272
		String tables = context.getConfiguration().get("stats.dbTablesMap");
273
		if (tables == null) {
274
			log.error("NULL TABLE MAP CONFIG  IN MAPPER  : ");
275
		}
276
		tables = tables.replaceAll(",", "\n");
277
		InputStream stream = new ByteArrayInputStream(tables.getBytes());
278

    
279
		tableMappings.load(stream);
280
		stream.close();
281
	}
282

    
283
	private void loadEntityConfig(Context context) {
284
		String indexConf = context.getConfiguration().get("stats.indexConf");
285

    
286
		if (indexConf == null || indexConf.isEmpty()) {
287
			log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
288

    
289
		}
290

    
291
		entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
292

    
293
	}
294

    
295
	private boolean isDedupSelf(final OafRelOrBuilder rel) {
296
		return rel.getSource().contains(rel.getTarget());
297
	}
298

    
299
	private boolean isValid(final Map<byte[], byte[]> columnMap) {
300
		return columnMap != null && !columnMap.isEmpty();
301
	}
302

    
303
	private boolean deletedByInference(final Oaf oaf) {
304
		return oaf.getDataInfo().getDeletedbyinference();
305
	}
306

    
307
	@Override
308
	protected void cleanup(Context context) throws IOException, InterruptedException {
309

    
310
		super.cleanup(context);
311
	}
312

    
313
	public Serializer getSerializer() {
314
		return serializer;
315
	}
316

    
317
	public void setSerializer(Serializer serializer) {
318
		this.serializer = serializer;
319
	}
320

    
321
	public EntityConfigTable getEntityConfigTable() {
322
		return entityConfigTable;
323
	}
324

    
325
	public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
326
		this.entityConfigTable = entityConfigTable;
327
	}
328

    
329
	public Properties getTableMappings() {
330
		return tableMappings;
331
	}
332

    
333
	public void setTableMappings(Properties tableMappings) {
334
		this.tableMappings = tableMappings;
335
	}
336

    
337
	public long getWrittenRecs() {
338
		return writtenRecs;
339
	}
340

    
341
	public void setWrittenRecs(long writtenRecs) {
342
		this.writtenRecs = writtenRecs;
343
	}
344

    
345
	public long getThreshold() {
346
		return threshold;
347
	}
348

    
349
	public void setThreshold(long threshold) {
350
		this.threshold = threshold;
351
	}
352

    
353
}
(2-2/3)