Project

General

Profile

1
package eu.dnetlib.iis.metadataextraction;
2

    
3
import java.io.IOException;
4
import java.io.InputStream;
5
import java.io.PrintWriter;
6
import java.io.StringWriter;
7
import java.util.ArrayList;
8
import java.util.Arrays;
9
import java.util.HashMap;
10
import java.util.HashSet;
11
import java.util.List;
12
import java.util.Map;
13
import java.util.Set;
14

    
15
import org.apache.avro.mapred.AvroKey;
16
import org.apache.commons.lang.StringUtils;
17
import org.apache.hadoop.io.NullWritable;
18
import org.apache.hadoop.mapreduce.Mapper;
19
import org.apache.log4j.Logger;
20
import org.jdom.Document;
21
import org.jdom.Element;
22
import org.jdom.JDOMException;
23
import org.jdom.output.Format;
24
import org.jdom.output.XMLOutputter;
25

    
26
import pl.edu.icm.cermine.ContentExtractor;
27
import pl.edu.icm.cermine.exception.AnalysisException;
28
import pl.edu.icm.cermine.exception.TransformationException;
29

    
30
import com.itextpdf.text.exceptions.InvalidPdfException;
31

    
32
import eu.dnetlib.iis.audit.schemas.Cause;
33
import eu.dnetlib.iis.audit.schemas.Fault;
34
import eu.dnetlib.iis.common.WorkflowRuntimeParameters;
35
import eu.dnetlib.iis.core.javamapreduce.MultipleOutputs;
36
import eu.dnetlib.iis.metadataextraction.schemas.DocumentText;
37
import eu.dnetlib.iis.metadataextraction.schemas.ExtractedDocumentMetadata;
38

    
39
/**
40
 * Abstract class containing shared code of metadata extraction.
41
 * @author mhorst
42
 *
43
 */
44
public abstract class AbstractMetadataExtractorMapper<T> extends Mapper<AvroKey<T>, NullWritable, NullWritable, NullWritable> {
45

    
46
	public static final String LOG_FAULT_PROCESSING_TIME_THRESHOLD_SECS = "log.fault.processing.time.threshold.secs";
47
	
48
	public static final String FAULT_CODE_PROCESSING_TIME_THRESHOLD_EXCEEDED = "ProcessingTimeThresholdExceeded";
49
	
50
	public static final String FAULT_SUPPLEMENTARY_DATA_PROCESSING_TIME = "processing_time";
51
	
52
	protected final Logger log = Logger.getLogger(AbstractMetadataExtractorMapper.class);
53
	
54

    
55
	/**
56
	 * Flag indicating {@link AnalysisException} should cause interruption.
57
	 */
58
	protected boolean analysisExceptionAsCritical = false;
59
	
60
	/**
61
	 * Flag indicating any other {@link Exception} should cause interruption.
62
	 */
63
	protected boolean otherExceptionAsCritical = false;
64
	
65
	/**
66
	 * Multiple outputs.
67
	 */
68
	protected MultipleOutputs mos = null;
69
	
70
	/**
71
	 * Document metadata named output.
72
	 */
73
	protected String namedOutputMeta;
74
	
75
	/**
76
	 * Document plaintext named output.
77
	 */
78
	protected String namedOutputPlaintext;
79
	
80
	/**
81
	 * Fault named output.
82
	 */
83
	protected String namedOutputFault;
84
	
85
	/**
86
	 * Progress log interval.
87
	 */
88
	protected int progresLogInterval = 100;
89
	
90
	/**
91
	 * Current progress.
92
	 */
93
	protected int currentProgress = 0;
94
	
95
	/**
96
	 * Interval time.
97
	 */
98
	private long intervalTime = 0;
99
	
100
	/**
101
	 * Maximum content size in MegaBytes.
102
	 */
103
	protected long maxFileSizeKB = Long.MAX_VALUE;
104
	
105
	/**
106
	 * Processing time threshold. 
107
	 * When exceeded apropriate object will be written to error datastore. 
108
	 */
109
	protected long processingTimeThreshold = Long.MAX_VALUE;
110
	
111
	/**
112
	 * Set of object identifiers objects excluded from processing.
113
	 */
114
	protected Set<String> excludedIds;
115

    
116

    
117
	/* (non-Javadoc)
118
	 * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
119
	 */
120
	@Override
121
	protected void setup(Context context) throws IOException,
122
			InterruptedException {
123
		namedOutputMeta = context.getConfiguration().get("output.meta");
124
		if (namedOutputMeta==null || namedOutputMeta.isEmpty()) {
125
			throw new RuntimeException("no named output provided for metadata");
126
		}
127
		namedOutputPlaintext = context.getConfiguration().get("output.plaintext");
128
		if (namedOutputPlaintext==null || namedOutputPlaintext.isEmpty()) {
129
			throw new RuntimeException("no named output provided for plaintext");
130
		}
131
		namedOutputFault = context.getConfiguration().get("output.fault");
132
		if (namedOutputFault==null || namedOutputFault.isEmpty()) {
133
			throw new RuntimeException("no named output provided for fault");
134
		}
135
		
136
		String excludedIdsCSV = context.getConfiguration().get("excluded.ids");
137
		if (excludedIdsCSV!=null && !excludedIdsCSV.trim().isEmpty() 
138
				&& !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(excludedIdsCSV)) {
139
			log.warn("got excluded ids: " + excludedIdsCSV);
140
			excludedIds = new HashSet<String>(Arrays.asList(
141
					StringUtils.split(excludedIdsCSV.trim(), ',')));
142
		} else {
143
			log.warn("got no excluded ids");
144
		}
145
//		handling maximum content size
146
		String maxFileSizeMBStr = context.getConfiguration().get(
147
				WorkflowRuntimeParameters.IMPORT_CONTENT_MAX_FILE_SIZE_MB);
148
		if (maxFileSizeMBStr!=null && !maxFileSizeMBStr.trim().isEmpty() 
149
				&& !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(maxFileSizeMBStr)) { 
150
			this.maxFileSizeKB = 1024l * Integer.valueOf(maxFileSizeMBStr);		
151
		}
152

    
153
//		handling processing time threshold
154
		String processingTimeThresholdSecsStr = context.getConfiguration().get(
155
				LOG_FAULT_PROCESSING_TIME_THRESHOLD_SECS);
156
		if (processingTimeThresholdSecsStr!=null && !processingTimeThresholdSecsStr.trim().isEmpty() 
157
				&& !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(processingTimeThresholdSecsStr)) { 
158
			this.processingTimeThreshold = 1000l * Integer.valueOf(processingTimeThresholdSecsStr);		
159
		}
160
		
161
		mos = new MultipleOutputs(context);
162
		currentProgress = 0;
163
		intervalTime = System.currentTimeMillis();
164
	}
165
	
166
	/**
167
	 * Processes content input stream. Closes stream at the end.
168
	 * @param documentId
169
	 * @param contentStream
170
	 * @param contentLengthKB content length in KB
171
	 * @throws IOException
172
	 * @throws InterruptedException
173
	 */
174
	protected void processStream(CharSequence documentId, 
175
			InputStream contentStream,
176
			long contentLengthKB) throws IOException, InterruptedException {
177
		try {
178
			currentProgress++;
179
			if (currentProgress>0 && currentProgress%progresLogInterval==0) {
180
//				FIXME switch back to debug when setting debug level on oozie
181
					log.warn("metadata extaction progress: " + currentProgress + ", time taken to process " +
182
							progresLogInterval + " elements: " +
183
						((System.currentTimeMillis() - intervalTime)/1000) + " secs");
184
				intervalTime = System.currentTimeMillis();
185
			}
186
			if (excludedIds!=null && excludedIds.contains(documentId)) {
187
				log.warn("skipping processing for excluded id " + documentId);
188
			} else {
189
//				handling maximum content size
190
				if (contentLengthKB>maxFileSizeKB) {
191
					log.warn("skipping processing for id " + documentId + 
192
							" due to max file size limit="+maxFileSizeKB+" KB exceeded: " + contentLengthKB + " KB");
193
					try {
194
//    					writing empty metadata
195
    					mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
196
							NlmToDocumentWithBasicMetadataConverter.convertFull(
197
									documentId.toString(), null)));
198
//    					writing empty plaintext
199
    					mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
200
								NlmToDocumentContentConverter.convert(
201
										documentId.toString(), null)));
202
    					return;
203
    				} catch (TransformationException e2) {
204
            			log.debug("closing multiple outputs...");
205
            			mos.close();
206
            			log.debug("multiple outputs closed");
207
            			throw new RuntimeException(e2);
208
    				} catch (JDOMException e2) {
209
            			log.debug("closing multiple outputs...");
210
            			mos.close();
211
            			log.debug("multiple outputs closed");
212
            			throw new RuntimeException(e2);
213
            		} finally {
214
            			if (contentStream!=null) {
215
    						contentStream.close();	
216
    					}
217
            		}
218
				}
219
				
220
//				TODO switch back to debug when setting debug level on oozie
221
				log.warn("starting processing for id: " + documentId);
222
				long startTime = System.currentTimeMillis();
223
				ContentExtractor extractor = new ContentExtractor();
224
				try {
225
                    extractor.uploadPDF(contentStream);
226
                    try {
227
                    	Element resultElem = extractor.getNLMContent();
228
                        Document doc = new Document(resultElem);
229
    					XMLOutputter outputter = new XMLOutputter(Format.getPrettyFormat());
230
    					log.debug("got NLM content: \n" + outputter.outputString(resultElem));
231
						mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
232
								NlmToDocumentWithBasicMetadataConverter.convertFull(
233
										documentId.toString(), doc)));
234
                    } catch (JDOMException e) {
235
            			log.debug("closing multiple outputs...");
236
            			mos.close();
237
            			log.debug("multiple outputs closed");
238
            			throw new RuntimeException(e);
239
            		} catch (TransformationException e) {
240
            			log.debug("closing multiple outputs...");
241
            			mos.close();
242
            			log.debug("multiple outputs closed");
243
            			throw new RuntimeException(e);
244
            		} catch (AnalysisException e) {
245
            			if (analysisExceptionAsCritical) {
246
            				log.debug("closing multiple outputs...");
247
            				mos.close();
248
            				log.debug("multiple outputs closed");
249
            				throw new RuntimeException(e);
250
            			} else {
251
            				if (e.getCause() instanceof InvalidPdfException) {
252
            					log.error("Invalid PDF file", e);
253
            				} else {
254
            					log.error("got unexpected analysis exception, just logging", e);	
255
            				}
256
            				try {
257
//            					writing empty result
258
            					mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
259
    								NlmToDocumentWithBasicMetadataConverter.convertFull(
260
    										documentId.toString(), null)));
261
//            					writing fault result
262
            					mos.write(namedOutputFault, new AvroKey<Fault>(
263
            							exceptionToFault(documentId, e)));
264
            				} catch (TransformationException e2) {
265
                    			log.debug("closing multiple outputs...");
266
                    			mos.close();
267
                    			log.debug("multiple outputs closed");
268
                    			throw new RuntimeException(e2);
269
            				} catch (JDOMException e2) {
270
                    			log.debug("closing multiple outputs...");
271
                    			mos.close();
272
                    			log.debug("multiple outputs closed");
273
                    			throw new RuntimeException(e2);
274
                    		} 
275
            			}
276
            		} catch (Exception e) {
277
            			if (otherExceptionAsCritical) {
278
            				log.debug("closing multiple outputs...");
279
            				mos.close();
280
            				log.debug("multiple outputs closed");
281
            				throw new RuntimeException(e);
282
            			} else {
283
           					log.error("got unexpected exception, just logging", e);
284
           					try {
285
//            					writing empty result
286
            					mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
287
    								NlmToDocumentWithBasicMetadataConverter.convertFull(
288
    										documentId.toString(), null)));
289
//            					writing fault result
290
            					mos.write(namedOutputFault, new AvroKey<Fault>(
291
            							exceptionToFault(documentId, e)));
292
            				} catch (TransformationException e2) {
293
                    			log.debug("closing multiple outputs...");
294
                    			mos.close();
295
                    			log.debug("multiple outputs closed");
296
                    			throw new RuntimeException(e2);
297
            				} catch (JDOMException e2) {
298
                    			log.debug("closing multiple outputs...");
299
                    			mos.close();
300
                    			log.debug("multiple outputs closed");
301
                    			throw new RuntimeException(e2);
302
                    		} 
303
            			}
304
            		}
305
					try {
306
						mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
307
								NlmToDocumentContentConverter.convert(
308
										documentId.toString(), extractor.getRawFullText())));
309
					} catch (AnalysisException e) {
310
						if (analysisExceptionAsCritical) {
311
            				log.debug("closing multiple outputs...");
312
            				mos.close();
313
            				log.debug("multiple outputs closed");
314
            				throw new RuntimeException(e);
315
            			} else {
316
            				if (e.getCause() instanceof InvalidPdfException) {
317
            					log.error("Invalid PDF file when retrieving plaintext", e);
318
            				} else {
319
            					log.error("got unexpected analysis exception "
320
            							+ "when retrieving plaintext, just logging", e);	
321
            				}
322
//        					writing empty result
323
        					mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
324
    								NlmToDocumentContentConverter.convert(
325
    										documentId.toString(), null)));
326
//        					writing fault result
327
        					mos.write(namedOutputFault, new AvroKey<Fault>(
328
        							exceptionToFault(documentId, e)));
329
            			}
330
					}
331
				} finally {
332
					if (contentStream!=null) {
333
						contentStream.close();	
334
					}
335
				}
336
				long processingTime = System.currentTimeMillis() - startTime;
337
				if (processingTime > processingTimeThreshold) {
338
					Map<CharSequence, CharSequence> supplementaryData = new HashMap<CharSequence, CharSequence>();
339
					supplementaryData.put(FAULT_SUPPLEMENTARY_DATA_PROCESSING_TIME, String.valueOf(processingTime));
340
//					writing fault result
341
					mos.write(namedOutputFault, new AvroKey<Fault>(Fault.newBuilder().
342
							setInputEntityId(documentId).setTimestamp(System.currentTimeMillis()).
343
							setCode(FAULT_CODE_PROCESSING_TIME_THRESHOLD_EXCEEDED).
344
							setSupplementaryData(supplementaryData).build()));
345
				}
346
//				TODO switch back to debug when setting debug level on oozie
347
				log.warn("finished processing for id " + documentId + " in " +
348
						(processingTime/1000) + " secs");
349
			}
350
		} catch (AnalysisException e) {
351
			log.debug("closing multiple outputs...");
352
			mos.close();
353
			log.debug("multiple outputs closed");
354
			throw new RuntimeException(e);
355
		}
356
	}
357
	
358
	protected static Fault exceptionToFault(CharSequence entityId, Throwable e) {
359
		Fault.Builder faultBuilder = Fault.newBuilder();
360
		faultBuilder.setInputEntityId(entityId);
361
		faultBuilder.setTimestamp(System.currentTimeMillis());
362
		faultBuilder.setCode(e.getClass().getName());
363
		faultBuilder.setMessage(e.getMessage());
364
		StringWriter strWriter = new StringWriter();
365
		PrintWriter pw = new PrintWriter(strWriter);
366
		e.printStackTrace(pw);
367
		pw.close();
368
		faultBuilder.setStackTrace(strWriter.toString());
369
		if (e.getCause()!=null) {
370
			faultBuilder.setCauses(appendThrowableToCauses(
371
					e.getCause(), new ArrayList<Cause>()));
372
		}
373
		return faultBuilder.build();
374
	}
375
	
376
	protected static List<Cause> appendThrowableToCauses(Throwable e, List<Cause> causes) {
377
		Cause.Builder causeBuilder = Cause.newBuilder();
378
		causeBuilder.setCode(e.getClass().getName());
379
		causeBuilder.setMessage(e.getMessage());
380
		causes.add(causeBuilder.build());
381
		if (e.getCause()!=null) {
382
			return appendThrowableToCauses(
383
					e.getCause(),causes);
384
		} else {
385
			return causes;	
386
		}
387
	}
388
	
389
	/* (non-Javadoc)
390
	 * @see org.apache.hadoop.mapreduce.Mapper#cleanup(org.apache.hadoop.mapreduce.Mapper.Context)
391
	 */
392
	@Override
393
    public void cleanup(Context context) throws IOException, InterruptedException {
394
		log.debug("cleanup: closing multiple outputs...");
395
        mos.close();
396
        log.debug("cleanup: multiple outputs closed");
397
    }
398
	
399
	/**
400
	 * Sets flag indicating {@link AnalysisException} should cause interruption.
401
	 * @param analysisExceptionAsCritical
402
	 */
403
	public void setAnalysisExceptionAsCritical(boolean analysisExceptionAsCritical) {
404
		this.analysisExceptionAsCritical = analysisExceptionAsCritical;
405
	}
406

    
407
	/**
408
	 * Sets flag indicating any other {@link Exception} should cause interruption.
409
	 * @param otherExceptionAsCritical
410
	 */
411
	public void setOtherExceptionAsCritical(boolean otherExceptionAsCritical) {
412
		this.otherExceptionAsCritical = otherExceptionAsCritical;
413
	}
414

    
415
	public void setProgresLogInterval(int progresLogInterval) {
416
		this.progresLogInterval = progresLogInterval;
417
	}
418

    
419
}
(1-1/8)