Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport;
2

    
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
7
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.Serializer;
8
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.OafProtos.Oaf;
11
import eu.dnetlib.data.proto.OafProtos.OafRel;
12
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
13
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18
import org.apache.hadoop.hbase.util.Bytes;
19
import org.apache.hadoop.io.Text;
20
import org.apache.log4j.Logger;
21

    
22
import java.io.IOException;
23
import java.text.ParseException;
24
import java.text.SimpleDateFormat;
25
import java.util.*;
26
import java.util.Map.Entry;
27

    
28
/**
29
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
30
 * export
31
 */
32
public class LodMapper extends TableMapper<Text, Text> {
33
	private static final String DATE_OF_TRANSFORMATION_PATTERN = "yyyy-MM-dd'T'HH:mm:ss";
34
	private static final String LAST_EXECUTION_DATE_PATTERN = "yyyy-MM-dd";
35
	private Logger log = Logger.getLogger(this.getClass());
36
	private EntityConfigTable entityConfigTable;
37
	private String lastExecutionDate = "";
38

    
39
	public static enum ENTITIES_COUNTER {
40
		RESULT, PROJECT, DATASOURCE, PERSON, ORGANIZATION, RESULT_NO_DATE, PROJECT_NO_DATE, DATASOURCE_NO_DATE,
41
		PERSON_NO_DATE, RGANIZATION_NO_DATE, DELETED_BY_INFERENCE, NOT_DELETED_BY_INFERENCE, TOTAL_ENTITIES,
42
		TOTAL_RELATIONS, UPDATED, NOT_UPDATED, NO_DATE
43

    
44
	}
45

    
46
	;
47

    
48
	private String DELIM;
49

    
50
	@Override
51
	protected void setup(Context context) throws IOException, InterruptedException {
52
		loadEntityConfig(context);
53
		DELIM = context.getConfiguration().get("lod.delim");
54
		lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
55

    
56
	}
57

    
58
	@Override
59
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context)
60
			throws IOException {
61

    
62
		final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
63
		final Type type = decoder.getType();
64
		final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
65

    
66
		if (isValid(oaf)) {
67

    
68
			if (deletedByInference(oaf)) {
69
				context.getCounter(ENTITIES_COUNTER.DELETED_BY_INFERENCE).increment(1);
70
			} else {
71
				context.getCounter(ENTITIES_COUNTER.NOT_DELETED_BY_INFERENCE).increment(1);
72
			}
73

    
74
			context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(1);
75
			try {
76
				emitProtos(context, result, oaf);
77
			} catch (ParseException e) {
78
				// TODO Auto-generated catch block
79
				e.printStackTrace();
80
			}
81
		}
82

    
83
	}
84

    
85
	private boolean isValid(Oaf oaf) {
86
		try {
87
			if (oaf != null && oaf.isInitialized()) {
88
				return true;
89
			}
90

    
91
		} catch (Exception e) {
92
			log.error("invalid proto", e);
93
		}
94

    
95
		return false;
96
	}
97

    
98
	private boolean isUpdated(Oaf oaf, Context context) throws IOException, ParseException {
99
		String dateOfTransformationString = "";
100
		try {
101
			SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_OF_TRANSFORMATION_PATTERN,
102
					Locale.getDefault());
103
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
104
			dateOfTransformationString = oaf.getEntity().getDateoftransformation();
105
			if (dateOfTransformationString == null || dateOfTransformationString.isEmpty()
106
					|| dateOfTransformationString.equals(" ")) {
107
				context.getCounter(ENTITIES_COUNTER.NO_DATE).increment(1);
108
				return true;
109
			}
110

    
111
			Date dateOfTransformation = simpleDateFormat.parse(dateOfTransformationString);
112

    
113
			SimpleDateFormat lastExecDateFormatter = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN);
114
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
115
			Date lastExecDate = lastExecDateFormatter.parse(lastExecutionDate);
116

    
117
			if (lastExecDate.before(dateOfTransformation)) {
118
				context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
119
				return true;
120
			}
121
		} catch (ParseException pe) {
122
			SimpleDateFormat simpleDateFormat = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN, Locale.getDefault());
123
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
124
			dateOfTransformationString = oaf.getEntity().getDateoftransformation();
125
			if (dateOfTransformationString == null || dateOfTransformationString.isEmpty()
126
					|| dateOfTransformationString.equals(" ")) {
127
				context.getCounter(ENTITIES_COUNTER.NO_DATE).increment(1);
128
				return true;
129
			}
130

    
131
			Date dateOfTransformation = simpleDateFormat.parse(dateOfTransformationString);
132
			SimpleDateFormat lastExecDateFormatter = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN);
133
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
134
			Date lastExecDate = lastExecDateFormatter.parse(lastExecutionDate);
135

    
136
			if (lastExecDate.before(dateOfTransformation)) {
137
				context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
138
				return true;
139
			}
140
		} catch (Exception e) {
141
			log.error("invalid date " + dateOfTransformationString, e);
142
			throw new IOException(e);
143
		}
144
		context.getCounter(ENTITIES_COUNTER.NOT_UPDATED).increment(1);
145
		return false;
146
	}
147

    
148
	private void emitProtos(Context context, Result result, Oaf oaf) throws IOException, ParseException {
149
		Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
150
		Type type = oaf.getEntity().getType();
151
		oafBuilder.setEntity(oaf.getEntity());
152
		// emit relation first so we can cache them to entity protos
153
		emitRelation(context, result, oaf, type, oafBuilder);
154

    
155
		if (isUpdated(oaf, context)) {
156
			emitEntity(context, oaf, oafBuilder);
157
		}
158
	}
159

    
160
	private void emitEntity(Context context, Oaf oaf, Oaf.Builder oafBuilder) {
161
		String serialized = Serializer.serialize(oafBuilder.build(), DELIM);
162

    
163
		if (serialized != null && !oaf.getEntity().getId().contains("dedup")) {
164

    
165
			try {
166
				context.getCounter(ENTITIES_COUNTER.valueOf(oaf.getEntity().getType().toString().toUpperCase()))
167
						.increment(1);
168
				// output entity types so we can have seperate files for each type
169
				Text TextKeyOut = new Text(oaf.getEntity().getType().toString());
170
				context.write((TextKeyOut), new Text(serialized));
171

    
172
				if (oaf.getEntity().getDateoftransformation().isEmpty()) {
173
					context.getCounter(
174
							ENTITIES_COUNTER.valueOf(oaf.getEntity().getType().toString().toUpperCase() + "_NO_DATE"))
175
							.increment(1);
176
//                    context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
177
				}
178
//                else {
179
//                    context.getCounter(ENTITIES_COUNTER.NOT_UPDATED).increment(1);
180
//                }
181

    
182
			} catch (Exception e) {
183
				log.error("Error writing entity to M/R output", e);
184
			}
185
		}
186

    
187
	}
188

    
189
	// may have multiple relations per each field
190
	private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
191
		// derived relations; ResultLanguages, resultConcepts etc are
192
		// created here
193

    
194
		if (!oaf.getEntity().getId().contains("dedup")) {
195
			// Existing Hbase relations are generated here
196
			if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
197
				for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
198
					try {
199

    
200
						final Map<byte[], byte[]> columnMap = result
201
								.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
202

    
203
						List<OafRel> relOaf = decodeRelation(oaf, columnMap, ld);
204

    
205
						for (OafRel rel : relOaf) {
206
							builder.getEntityBuilder().addCachedRel(rel);
207
							try {
208
								// keep all relations to one file
209
								Text TextKeyOut = new Text("relations");
210

    
211
								String buff = Serializer.serialize(rel, DELIM);
212

    
213
								if (!buff.isEmpty() && !rel.getTarget().contains("dedup")) {
214
									context.write((TextKeyOut), new Text(buff));
215
									context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
216
								}
217

    
218
							} catch (Exception e) {
219
								log.error("Error while writing Relation Proto to M/R output", e);
220
							}
221

    
222
						}
223

    
224
						relOaf.clear();
225
					} catch (Exception e) {
226
						log.error("Error while decoding Relation Proto from HBase Body", e);
227

    
228
					}
229

    
230
				}
231
			}
232
		}
233

    
234
		Set<String> relationsList = new HashSet<String>();
235

    
236
		Serializer.extractRelations(oaf, DELIM, relationsList);
237

    
238
		for (String rel : relationsList) {
239
			try {
240

    
241
				Text TextKeyOut = new Text("relations");
242

    
243
				if (!oaf.getEntity().getId().contains("dedup")) {
244
					if (!rel.contains("dedup")) {
245
						context.write((TextKeyOut), new Text(rel));
246
						context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
247

    
248
					}
249
				} else {
250
					// for dedup entities write only dedup relationships: all the permutations
251
					// of children
252
					if (rel.contains("dedup")) {
253
						context.write((TextKeyOut), new Text(rel));
254
						context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
255
					}
256

    
257
				}
258

    
259
			} catch (Exception e) {
260
				log.error("Error writing relations to output : " + rel);
261
			}
262
		}
263
		relationsList.clear();
264
	}
265

    
266
	private ArrayList<OafRel> decodeRelation(final Oaf body, Map<byte[], byte[]> columnMap, final LinkDescriptor ld)
267
			throws IOException, InterruptedException {
268

    
269
		ArrayList<OafRel> rels = new ArrayList<OafRel>();
270

    
271
		if (hasData(columnMap)) {
272

    
273
			for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
274

    
275
				final Oaf decodedOaf = decodeProto(e.getValue());
276
				if (isValid(decodedOaf)) {
277
					OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
278

    
279
					// skip dedups
280

    
281
					if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString())
282
							&& isDedupSelf(relBuilder)) {
283
						// log.error("invalid protto", e);
284

    
285
					} else {
286
						OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
287
						relBuilder.setCachedTarget(body.getEntity());
288
						rels.add(oafRel);
289
					}
290
				}
291

    
292
			}
293
		}
294

    
295
		return rels;
296
	}
297

    
298
	private Oaf decodeProto(final byte[] body) {
299
		try {
300
			return Oaf.parseFrom(body);
301
		} catch (InvalidProtocolBufferException e) {
302
			log.error("Invalid Protos", e);
303
		}
304
		return null;
305
	}
306

    
307
	private void loadEntityConfig(Context context) throws InterruptedException {
308
		String indexConf = context.getConfiguration().get("index.conf");
309

    
310
		if (indexConf == null || indexConf.isEmpty()) {
311
			log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
312
			throw new InterruptedException("Null enitity configuration in mappper");
313

    
314
		}
315

    
316
		entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
317

    
318
	}
319

    
320
	private boolean isDedupSelf(final OafRelOrBuilder rel) {
321
		return rel.getSource().contains(rel.getTarget());
322
	}
323

    
324
	private boolean hasData(final Map<byte[], byte[]> columnMap) {
325
		return columnMap != null && !columnMap.isEmpty();
326
	}
327

    
328
	private boolean deletedByInference(final Oaf oaf) {
329
		return oaf.getDataInfo().getDeletedbyinference();
330
	}
331

    
332
	@Override
333
	protected void cleanup(Context context) throws IOException, InterruptedException {
334
		super.cleanup(context);
335
	}
336

    
337
	public EntityConfigTable getEntityConfigTable() {
338
		return entityConfigTable;
339
	}
340

    
341
	public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
342
		this.entityConfigTable = entityConfigTable;
343
	}
344

    
345
}
(1-1/2)