Project

General

Profile

1 40371 eri.katsar
package eu.dnetlib.data.mapreduce.hbase.lodExport;
2
3
import com.google.protobuf.InvalidProtocolBufferException;
4
import eu.dnetlib.data.mapreduce.hbase.index.config.EntityConfigTable;
5
import eu.dnetlib.data.mapreduce.hbase.index.config.IndexConfig;
6
import eu.dnetlib.data.mapreduce.hbase.index.config.LinkDescriptor;
7 45699 eri.katsar
import eu.dnetlib.data.mapreduce.hbase.lodExport.utils.Serializer;
8 40371 eri.katsar
import eu.dnetlib.data.mapreduce.util.OafRowKeyDecoder;
9
import eu.dnetlib.data.mapreduce.util.UpdateMerger;
10
import eu.dnetlib.data.proto.OafProtos.Oaf;
11
import eu.dnetlib.data.proto.OafProtos.OafRel;
12
import eu.dnetlib.data.proto.OafProtos.OafRelOrBuilder;
13
import eu.dnetlib.data.proto.RelTypeProtos.SubRelType;
14
import eu.dnetlib.data.proto.TypeProtos.Type;
15
import org.apache.hadoop.hbase.client.Result;
16
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
17
import org.apache.hadoop.hbase.mapreduce.TableMapper;
18
import org.apache.hadoop.hbase.util.Bytes;
19
import org.apache.hadoop.io.Text;
20
import org.apache.log4j.Logger;
21
22
import java.io.IOException;
23 53915 giorgos.al
import java.text.ParseException;
24 45962 eri.katsar
import java.text.SimpleDateFormat;
25 42280 eri.katsar
import java.util.*;
26 40371 eri.katsar
import java.util.Map.Entry;
27
28
/**
29 40597 eri.katsar
 * Mapper Class that reads HBASE contents and prepares them for the StatsDB
30
 * export
31 40371 eri.katsar
 */
32 45699 eri.katsar
public class LodMapper extends TableMapper<Text, Text> {
33 53915 giorgos.al
	private static final String DATE_OF_TRANSFORMATION_PATTERN = "yyyy-MM-dd'T'HH:mm:ss";
34
	private static final String LAST_EXECUTION_DATE_PATTERN = "yyyy-MM-dd";
35
	private Logger log = Logger.getLogger(this.getClass());
36
	private EntityConfigTable entityConfigTable;
37
	private String lastExecutionDate = "";
38 41216 eri.katsar
39 53915 giorgos.al
	public static enum ENTITIES_COUNTER {
40
		RESULT, PROJECT, DATASOURCE, PERSON, ORGANIZATION, RESULT_NO_DATE, PROJECT_NO_DATE, DATASOURCE_NO_DATE,
41
		PERSON_NO_DATE, RGANIZATION_NO_DATE, DELETED_BY_INFERENCE, NOT_DELETED_BY_INFERENCE, TOTAL_ENTITIES,
42
		TOTAL_RELATIONS, UPDATED, NOT_UPDATED, NO_DATE
43 45971 eri.katsar
44 53915 giorgos.al
	}
45 41251 eri.katsar
46 53915 giorgos.al
	;
47 40371 eri.katsar
48 53915 giorgos.al
	private String DELIM;
49 41216 eri.katsar
50 53915 giorgos.al
	@Override
51
	protected void setup(Context context) throws IOException, InterruptedException {
52
		loadEntityConfig(context);
53
		DELIM = context.getConfiguration().get("lod.delim");
54
		lastExecutionDate = context.getConfiguration().get("lod.lastExecutionDate");
55 41216 eri.katsar
56 53915 giorgos.al
	}
57 40371 eri.katsar
58 53915 giorgos.al
	@Override
59
	protected void map(final ImmutableBytesWritable keyIn, final Result result, final Context context)
60
			throws IOException {
61 40371 eri.katsar
62 53915 giorgos.al
		final OafRowKeyDecoder decoder = OafRowKeyDecoder.decode(keyIn.copyBytes());
63
		final Type type = decoder.getType();
64
		final Oaf oaf = UpdateMerger.mergeBodyUpdates(context, result.getFamilyMap(Bytes.toBytes(type.toString())));
65 40371 eri.katsar
66 53915 giorgos.al
		if (isValid(oaf)) {
67 40371 eri.katsar
68 53915 giorgos.al
			if (deletedByInference(oaf)) {
69
				context.getCounter(ENTITIES_COUNTER.DELETED_BY_INFERENCE).increment(1);
70
			} else {
71
				context.getCounter(ENTITIES_COUNTER.NOT_DELETED_BY_INFERENCE).increment(1);
72
			}
73 40371 eri.katsar
74 53915 giorgos.al
			context.getCounter(ENTITIES_COUNTER.TOTAL_ENTITIES).increment(1);
75
			try {
76
				emitProtos(context, result, oaf);
77
			} catch (ParseException e) {
78
				// TODO Auto-generated catch block
79
				e.printStackTrace();
80
			}
81
		}
82 41267 eri.katsar
83 53915 giorgos.al
	}
84 40371 eri.katsar
85 53915 giorgos.al
	private boolean isValid(Oaf oaf) {
86
		try {
87
			if (oaf != null && oaf.isInitialized()) {
88
				return true;
89
			}
90 40371 eri.katsar
91 53915 giorgos.al
		} catch (Exception e) {
92
			log.error("invalid proto", e);
93
		}
94 40371 eri.katsar
95 53915 giorgos.al
		return false;
96
	}
97 40935 eri.katsar
98 53915 giorgos.al
	private boolean isUpdated(Oaf oaf, Context context) throws IOException, ParseException {
99
		String dateOfTransformationString = "";
100
		try {
101
			SimpleDateFormat simpleDateFormat = new SimpleDateFormat(DATE_OF_TRANSFORMATION_PATTERN,
102
					Locale.getDefault());
103
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
104
			dateOfTransformationString = oaf.getEntity().getDateoftransformation();
105
			if (dateOfTransformationString == null || dateOfTransformationString.isEmpty()
106
					|| dateOfTransformationString.equals(" ")) {
107
				context.getCounter(ENTITIES_COUNTER.NO_DATE).increment(1);
108
				return true;
109
			}
110 40935 eri.katsar
111 53915 giorgos.al
			Date dateOfTransformation = simpleDateFormat.parse(dateOfTransformationString);
112 40935 eri.katsar
113 53915 giorgos.al
			SimpleDateFormat lastExecDateFormatter = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN);
114
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
115
			Date lastExecDate = lastExecDateFormatter.parse(lastExecutionDate);
116 40935 eri.katsar
117 53915 giorgos.al
			if (lastExecDate.before(dateOfTransformation)) {
118
				context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
119
				return true;
120
			}
121
		} catch (ParseException pe) {
122
			SimpleDateFormat simpleDateFormat = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN, Locale.getDefault());
123
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
124
			dateOfTransformationString = oaf.getEntity().getDateoftransformation();
125
			if (dateOfTransformationString == null || dateOfTransformationString.isEmpty()
126
					|| dateOfTransformationString.equals(" ")) {
127
				context.getCounter(ENTITIES_COUNTER.NO_DATE).increment(1);
128
				return true;
129
			}
130 40935 eri.katsar
131 53915 giorgos.al
			Date dateOfTransformation = simpleDateFormat.parse(dateOfTransformationString);
132
			SimpleDateFormat lastExecDateFormatter = new SimpleDateFormat(LAST_EXECUTION_DATE_PATTERN);
133
			simpleDateFormat.setTimeZone(TimeZone.getTimeZone("UTC"));
134
			Date lastExecDate = lastExecDateFormatter.parse(lastExecutionDate);
135 40824 eri.katsar
136 53915 giorgos.al
			if (lastExecDate.before(dateOfTransformation)) {
137
				context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
138
				return true;
139
			}
140
		} catch (Exception e) {
141
			log.error("invalid date " + dateOfTransformationString, e);
142
			throw new IOException(e);
143
		}
144
		context.getCounter(ENTITIES_COUNTER.NOT_UPDATED).increment(1);
145
		return false;
146
	}
147 40371 eri.katsar
148 53915 giorgos.al
	private void emitProtos(Context context, Result result, Oaf oaf) throws IOException, ParseException {
149
		Oaf.Builder oafBuilder = Oaf.newBuilder().setKind(oaf.getKind()).setDataInfo(oaf.getDataInfo());
150
		Type type = oaf.getEntity().getType();
151
		oafBuilder.setEntity(oaf.getEntity());
152
		// emit relation first so we can cache them to entity protos
153
		emitRelation(context, result, oaf, type, oafBuilder);
154 40371 eri.katsar
155 53915 giorgos.al
		if (isUpdated(oaf, context)) {
156
			emitEntity(context, oaf, oafBuilder);
157
		}
158
	}
159 40824 eri.katsar
160 53915 giorgos.al
	private void emitEntity(Context context, Oaf oaf, Oaf.Builder oafBuilder) {
161
		String serialized = Serializer.serialize(oafBuilder.build(), DELIM);
162 40371 eri.katsar
163 53915 giorgos.al
		if (serialized != null && !oaf.getEntity().getId().contains("dedup")) {
164 40371 eri.katsar
165 53915 giorgos.al
			try {
166
				context.getCounter(ENTITIES_COUNTER.valueOf(oaf.getEntity().getType().toString().toUpperCase()))
167
						.increment(1);
168
				// output entity types so we can have seperate files for each type
169
				Text TextKeyOut = new Text(oaf.getEntity().getType().toString());
170
				context.write((TextKeyOut), new Text(serialized));
171 42280 eri.katsar
172 53915 giorgos.al
				if (oaf.getEntity().getDateoftransformation().isEmpty()) {
173
					context.getCounter(
174
							ENTITIES_COUNTER.valueOf(oaf.getEntity().getType().toString().toUpperCase() + "_NO_DATE"))
175
							.increment(1);
176
//                    context.getCounter(ENTITIES_COUNTER.UPDATED).increment(1);
177
				}
178
//                else {
179
//                    context.getCounter(ENTITIES_COUNTER.NOT_UPDATED).increment(1);
180
//                }
181 42711 eri.katsar
182 53915 giorgos.al
			} catch (Exception e) {
183
				log.error("Error writing entity to M/R output", e);
184
			}
185
		}
186 40371 eri.katsar
187 53915 giorgos.al
	}
188 40371 eri.katsar
189 53915 giorgos.al
	// may have multiple relations per each field
190
	private void emitRelation(Context context, Result result, Oaf oaf, Type type, Oaf.Builder builder) {
191
		// derived relations; ResultLanguages, resultConcepts etc are
192
		// created here
193 40371 eri.katsar
194 53915 giorgos.al
		if (!oaf.getEntity().getId().contains("dedup")) {
195
			// Existing Hbase relations are generated here
196
			if (entityConfigTable.getDescriptors(type) != null && !entityConfigTable.getDescriptors(type).isEmpty()) {
197
				for (LinkDescriptor ld : entityConfigTable.getDescriptors(type)) {
198
					try {
199 40371 eri.katsar
200 53915 giorgos.al
						final Map<byte[], byte[]> columnMap = result
201
								.getFamilyMap(Bytes.toBytes(ld.getRelDescriptor().getIt()));
202 40371 eri.katsar
203 53915 giorgos.al
						List<OafRel> relOaf = decodeRelation(oaf, columnMap, ld);
204 40371 eri.katsar
205 53915 giorgos.al
						for (OafRel rel : relOaf) {
206
							builder.getEntityBuilder().addCachedRel(rel);
207
							try {
208
								// keep all relations to one file
209
								Text TextKeyOut = new Text("relations");
210 42280 eri.katsar
211 53915 giorgos.al
								String buff = Serializer.serialize(rel, DELIM);
212 42280 eri.katsar
213 53915 giorgos.al
								if (!buff.isEmpty() && !rel.getTarget().contains("dedup")) {
214
									context.write((TextKeyOut), new Text(buff));
215
									context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
216
								}
217 40371 eri.katsar
218 53915 giorgos.al
							} catch (Exception e) {
219
								log.error("Error while writing Relation Proto to M/R output", e);
220
							}
221 41266 eri.katsar
222 53915 giorgos.al
						}
223 40371 eri.katsar
224 53915 giorgos.al
						relOaf.clear();
225
					} catch (Exception e) {
226
						log.error("Error while decoding Relation Proto from HBase Body", e);
227 42266 eri.katsar
228 53915 giorgos.al
					}
229 40597 eri.katsar
230 53915 giorgos.al
				}
231
			}
232
		}
233 40597 eri.katsar
234 53915 giorgos.al
		Set<String> relationsList = new HashSet<String>();
235 40616 eri.katsar
236 53915 giorgos.al
		Serializer.extractRelations(oaf, DELIM, relationsList);
237 41348 eri.katsar
238 53915 giorgos.al
		for (String rel : relationsList) {
239
			try {
240 42266 eri.katsar
241 53915 giorgos.al
				Text TextKeyOut = new Text("relations");
242 42266 eri.katsar
243 53915 giorgos.al
				if (!oaf.getEntity().getId().contains("dedup")) {
244
					if (!rel.contains("dedup")) {
245
						context.write((TextKeyOut), new Text(rel));
246
						context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
247 42280 eri.katsar
248 53915 giorgos.al
					}
249
				} else {
250
					// for dedup entities write only dedup relationships: all the permutations
251
					// of children
252
					if (rel.contains("dedup")) {
253
						context.write((TextKeyOut), new Text(rel));
254
						context.getCounter(ENTITIES_COUNTER.TOTAL_RELATIONS).increment(1);
255
					}
256 42280 eri.katsar
257 53915 giorgos.al
				}
258 42280 eri.katsar
259 53915 giorgos.al
			} catch (Exception e) {
260
				log.error("Error writing relations to output : " + rel);
261
			}
262
		}
263
		relationsList.clear();
264
	}
265 41348 eri.katsar
266 53915 giorgos.al
	private ArrayList<OafRel> decodeRelation(final Oaf body, Map<byte[], byte[]> columnMap, final LinkDescriptor ld)
267
			throws IOException, InterruptedException {
268 41266 eri.katsar
269 53915 giorgos.al
		ArrayList<OafRel> rels = new ArrayList<OafRel>();
270 40371 eri.katsar
271 53915 giorgos.al
		if (hasData(columnMap)) {
272 40371 eri.katsar
273 53915 giorgos.al
			for (Entry<byte[], byte[]> e : columnMap.entrySet()) {
274 41266 eri.katsar
275 53915 giorgos.al
				final Oaf decodedOaf = decodeProto(e.getValue());
276
				if (isValid(decodedOaf)) {
277
					OafRel.Builder relBuilder = OafRel.newBuilder(decodedOaf.getRel());
278 40371 eri.katsar
279 53915 giorgos.al
					// skip dedups
280 40371 eri.katsar
281 53915 giorgos.al
					if (ld.getRelDescriptor().getIt().contains(SubRelType.dedup.toString())
282
							&& isDedupSelf(relBuilder)) {
283
						// log.error("invalid protto", e);
284 40371 eri.katsar
285 53915 giorgos.al
					} else {
286
						OafRel oafRel = relBuilder.setRelType(ld.getRelDescriptor().getRelType()).build();
287
						relBuilder.setCachedTarget(body.getEntity());
288
						rels.add(oafRel);
289
					}
290
				}
291 42280 eri.katsar
292 53915 giorgos.al
			}
293
		}
294 41348 eri.katsar
295 53915 giorgos.al
		return rels;
296
	}
297 45708 eri.katsar
298 53915 giorgos.al
	private Oaf decodeProto(final byte[] body) {
299
		try {
300
			return Oaf.parseFrom(body);
301
		} catch (InvalidProtocolBufferException e) {
302
			log.error("Invalid Protos", e);
303
		}
304
		return null;
305
	}
306 40371 eri.katsar
307 53915 giorgos.al
	private void loadEntityConfig(Context context) throws InterruptedException {
308
		String indexConf = context.getConfiguration().get("index.conf");
309 40371 eri.katsar
310 53915 giorgos.al
		if (indexConf == null || indexConf.isEmpty()) {
311
			log.error("NULL ENTITY CONFIGURATION TABLE IN MAPPER  : ");
312
			throw new InterruptedException("Null enitity configuration in mappper");
313 40371 eri.katsar
314 53915 giorgos.al
		}
315 40371 eri.katsar
316 53915 giorgos.al
		entityConfigTable = IndexConfig.load(indexConf).getConfigMap();
317 40371 eri.katsar
318 53915 giorgos.al
	}
319 40371 eri.katsar
320 53915 giorgos.al
	private boolean isDedupSelf(final OafRelOrBuilder rel) {
321
		return rel.getSource().contains(rel.getTarget());
322
	}
323 40371 eri.katsar
324 53915 giorgos.al
	private boolean hasData(final Map<byte[], byte[]> columnMap) {
325
		return columnMap != null && !columnMap.isEmpty();
326
	}
327 40371 eri.katsar
328 53915 giorgos.al
	private boolean deletedByInference(final Oaf oaf) {
329
		return oaf.getDataInfo().getDeletedbyinference();
330
	}
331 40371 eri.katsar
332 53915 giorgos.al
	@Override
333
	protected void cleanup(Context context) throws IOException, InterruptedException {
334
		super.cleanup(context);
335
	}
336 40371 eri.katsar
337 53915 giorgos.al
	public EntityConfigTable getEntityConfigTable() {
338
		return entityConfigTable;
339
	}
340 40371 eri.katsar
341 53915 giorgos.al
	public void setEntityConfigTable(EntityConfigTable entityConfigTable) {
342
		this.entityConfigTable = entityConfigTable;
343
	}
344 40371 eri.katsar
345 53915 giorgos.al
}