Project

General

Profile

1
package eu.dnetlib.iis.metadataextraction;
2

    
3
import java.io.IOException;
4
import java.io.InputStream;
5
import java.util.Arrays;
6
import java.util.HashSet;
7
import java.util.Set;
8

    
9
import org.apache.avro.mapred.AvroKey;
10
import org.apache.commons.lang.StringUtils;
11
import org.apache.hadoop.io.NullWritable;
12
import org.apache.hadoop.mapreduce.Mapper;
13
import org.apache.log4j.Logger;
14
import org.jdom.Document;
15
import org.jdom.Element;
16
import org.jdom.JDOMException;
17
import org.jdom.output.Format;
18
import org.jdom.output.XMLOutputter;
19

    
20
import pl.edu.icm.cermine.ContentExtractor;
21
import pl.edu.icm.cermine.exception.AnalysisException;
22
import pl.edu.icm.cermine.exception.TransformationException;
23

    
24
import com.itextpdf.text.exceptions.InvalidPdfException;
25

    
26
import eu.dnetlib.iis.common.WorkflowRuntimeParameters;
27
import eu.dnetlib.iis.core.javamapreduce.MultipleOutputs;
28
import eu.dnetlib.iis.metadataextraction.schemas.DocumentText;
29
import eu.dnetlib.iis.metadataextraction.schemas.ExtractedDocumentMetadata;
30

    
31
/**
32
 * Abstract class containing shared code of metadata extraction.
33
 * @author mhorst
34
 *
35
 */
36
public abstract class AbstractMetadataExtractorMapper<T> extends Mapper<AvroKey<T>, NullWritable, NullWritable, NullWritable> {
37

    
38
	
39
	protected final Logger log = Logger.getLogger(AbstractMetadataExtractorMapper.class);
40
	
41

    
42
	/**
43
	 * Flag indicating {@link AnalysisException} should cause interruption.
44
	 */
45
	protected boolean analysisExceptionAsCritical = false;
46
	
47
	/**
48
	 * Flag indicating any other {@link Exception} should cause interruption.
49
	 */
50
	protected boolean otherExceptionAsCritical = false;
51
	
52
	/**
53
	 * Multiple outputs.
54
	 */
55
	protected MultipleOutputs mos = null;
56
	
57
	/**
58
	 * Document metadata named output.
59
	 */
60
	protected String namedOutputMeta;
61
	
62
	/**
63
	 * Document plaintext named output.
64
	 */
65
	protected String namedOutputPlaintext;
66
	
67
	/**
68
	 * Progress log interval.
69
	 */
70
	protected int progresLogInterval = 100;
71
	
72
	/**
73
	 * Current progress.
74
	 */
75
	protected int currentProgress = 0;
76
	
77
	/**
78
	 * Interval time.
79
	 */
80
	protected long intervalTime = 0;
81
	
82
	/**
83
	 * Maximum content size in MegaBytes.
84
	 */
85
	protected long maxFileSize = Long.MAX_VALUE;
86
	
87
	/**
88
	 * Set of object identifiers objects excluded from processing.
89
	 */
90
	protected Set<String> excludedIds;
91

    
92

    
93
	/* (non-Javadoc)
94
	 * @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
95
	 */
96
	@Override
97
	protected void setup(Context context) throws IOException,
98
			InterruptedException {
99
		namedOutputMeta = context.getConfiguration().get("output.meta");
100
		if (namedOutputMeta==null || namedOutputMeta.isEmpty()) {
101
			throw new RuntimeException("no named output provided for metadata");
102
		}
103
		namedOutputPlaintext = context.getConfiguration().get("output.plaintext");
104
		if (namedOutputPlaintext==null || namedOutputPlaintext.isEmpty()) {
105
			throw new RuntimeException("no named output provided for plaintext");
106
		}
107
		String excludedIdsCSV = context.getConfiguration().get("excluded.ids");
108
		if (excludedIdsCSV!=null && !excludedIdsCSV.trim().isEmpty() 
109
				&& !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(excludedIdsCSV)) {
110
			log.warn("got excluded ids: " + excludedIdsCSV);
111
			excludedIds = new HashSet<String>(Arrays.asList(
112
					StringUtils.split(excludedIdsCSV.trim(), ',')));
113
		} else {
114
			log.warn("got no excluded ids");
115
		}
116
//		handling maximum content size
117
		String maxFileSizeMBStr = context.getConfiguration().get(
118
				WorkflowRuntimeParameters.IMPORT_CONTENT_MAX_FILE_SIZE_MB);
119
		if (maxFileSizeMBStr!=null && !maxFileSizeMBStr.trim().isEmpty() 
120
				&& !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(maxFileSizeMBStr)) { 
121
			this.maxFileSize = 1048576l * Integer.valueOf(maxFileSizeMBStr);		
122
		}
123
		
124
		mos = new MultipleOutputs(context);
125
		currentProgress = 0;
126
		intervalTime = System.currentTimeMillis();
127
	}
128
	
129
	/**
130
	 * Processes content input stream. Closes stream at the end.
131
	 * @param documentId
132
	 * @param contentStream
133
	 * @param contentLength
134
	 * @throws IOException
135
	 * @throws InterruptedException
136
	 */
137
	protected void processStream(CharSequence documentId, 
138
			InputStream contentStream,
139
			long contentLength) throws IOException, InterruptedException {
140
		try {
141
			currentProgress++;
142
			if (currentProgress>0 && currentProgress%progresLogInterval==0) {
143
//				FIXME switch back to debug when setting debug level on oozie
144
					log.warn("metadata extaction progress: " + currentProgress + ", time taken to process " +
145
							progresLogInterval + " elements: " +
146
						((System.currentTimeMillis() - intervalTime)/1000) + " secs");
147
				intervalTime = System.currentTimeMillis();
148
			}
149
			if (excludedIds!=null && excludedIds.contains(documentId)) {
150
				log.warn("skipping processing for excluded id " + documentId);
151
			} else {
152
//				handling maximum content size
153
				if (contentLength>maxFileSize) {
154
					log.warn("skipping processing for id " + documentId + 
155
							" due to max file size limit="+maxFileSize+" exceeded: " + contentLength);
156
					try {
157
//    					writing empty metadata
158
    					mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
159
							NlmToDocumentWithBasicMetadataConverter.convertFull(
160
									documentId.toString(), null)));
161
//    					writing empty plaintext
162
    					mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
163
								NlmToDocumentContentConverter.convert(
164
										documentId.toString(), null)));
165
    					return;
166
    				} catch (TransformationException e2) {
167
            			log.debug("closing multiple outputs...");
168
            			mos.close();
169
            			log.debug("multiple outputs closed");
170
            			throw new RuntimeException(e2);
171
    				} catch (JDOMException e2) {
172
            			log.debug("closing multiple outputs...");
173
            			mos.close();
174
            			log.debug("multiple outputs closed");
175
            			throw new RuntimeException(e2);
176
            		} finally {
177
            			if (contentStream!=null) {
178
    						contentStream.close();	
179
    					}
180
            		}
181
				}
182
				
183
//				TODO switch back to debug when setting debug level on oozie
184
				log.warn("starting processing for id: " + documentId);
185
				long startTime = System.currentTimeMillis();
186
				ContentExtractor extractor = new ContentExtractor();
187
				try {
188
                    extractor.uploadPDF(contentStream);
189
                    try {
190
                    	Element resultElem = extractor.getNLMContent();
191
                        Document doc = new Document(resultElem);
192
    					XMLOutputter outputter = new XMLOutputter(Format.getPrettyFormat());
193
    					log.debug("got NLM content: \n" + outputter.outputString(resultElem));
194
						mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
195
								NlmToDocumentWithBasicMetadataConverter.convertFull(
196
										documentId.toString(), doc)));
197
                    } catch (JDOMException e) {
198
            			log.debug("closing multiple outputs...");
199
            			mos.close();
200
            			log.debug("multiple outputs closed");
201
            			throw new RuntimeException(e);
202
            		} catch (TransformationException e) {
203
            			log.debug("closing multiple outputs...");
204
            			mos.close();
205
            			log.debug("multiple outputs closed");
206
            			throw new RuntimeException(e);
207
            		} catch (AnalysisException e) {
208
            			if (analysisExceptionAsCritical) {
209
            				log.debug("closing multiple outputs...");
210
            				mos.close();
211
            				log.debug("multiple outputs closed");
212
            				throw new RuntimeException(e);
213
            			} else {
214
            				if (e.getCause() instanceof InvalidPdfException) {
215
            					log.error("Invalid PDF file", e);
216
            				} else {
217
            					log.error("got unexpected analysis exception, just logging", e);	
218
            				}
219
            				try {
220
//            					writing empty result
221
            					mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
222
    								NlmToDocumentWithBasicMetadataConverter.convertFull(
223
    										documentId.toString(), null)));
224
            				} catch (TransformationException e2) {
225
                    			log.debug("closing multiple outputs...");
226
                    			mos.close();
227
                    			log.debug("multiple outputs closed");
228
                    			throw new RuntimeException(e2);
229
            				} catch (JDOMException e2) {
230
                    			log.debug("closing multiple outputs...");
231
                    			mos.close();
232
                    			log.debug("multiple outputs closed");
233
                    			throw new RuntimeException(e2);
234
                    		} 
235
            			}
236
            		} catch (Exception e) {
237
            			if (otherExceptionAsCritical) {
238
            				log.debug("closing multiple outputs...");
239
            				mos.close();
240
            				log.debug("multiple outputs closed");
241
            				throw new RuntimeException(e);
242
            			} else {
243
           					log.error("got unexpected exception, just logging", e);
244
           					try {
245
//            					writing empty result
246
            					mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
247
    								NlmToDocumentWithBasicMetadataConverter.convertFull(
248
    										documentId.toString(), null)));
249
            				} catch (TransformationException e2) {
250
                    			log.debug("closing multiple outputs...");
251
                    			mos.close();
252
                    			log.debug("multiple outputs closed");
253
                    			throw new RuntimeException(e2);
254
            				} catch (JDOMException e2) {
255
                    			log.debug("closing multiple outputs...");
256
                    			mos.close();
257
                    			log.debug("multiple outputs closed");
258
                    			throw new RuntimeException(e2);
259
                    		} 
260
            			}
261
            		}
262
					try {
263
						mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
264
								NlmToDocumentContentConverter.convert(
265
										documentId.toString(), extractor.getRawFullText())));
266
					} catch (AnalysisException e) {
267
						if (analysisExceptionAsCritical) {
268
            				log.debug("closing multiple outputs...");
269
            				mos.close();
270
            				log.debug("multiple outputs closed");
271
            				throw new RuntimeException(e);
272
            			} else {
273
            				if (e.getCause() instanceof InvalidPdfException) {
274
            					log.error("Invalid PDF file when retrieving plaintext", e);
275
            				} else {
276
            					log.error("got unexpected analysis exception "
277
            							+ "when retrieving plaintext, just logging", e);	
278
            				}
279
//        					writing empty result
280
        					mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
281
    								NlmToDocumentContentConverter.convert(
282
    										documentId.toString(), null)));
283
            			}
284
					}
285
				} finally {
286
					if (contentStream!=null) {
287
						contentStream.close();	
288
					}
289
				}
290
//				TODO switch back to debug when setting debug level on oozie
291
				log.warn("finished processing for id " + documentId + " in " +
292
						((System.currentTimeMillis() - startTime)/1000) + " secs");
293
			}
294
		} catch (AnalysisException e) {
295
			log.debug("closing multiple outputs...");
296
			mos.close();
297
			log.debug("multiple outputs closed");
298
			throw new RuntimeException(e);
299
		}
300
	}
301
	
302
	/* (non-Javadoc)
303
	 * @see org.apache.hadoop.mapreduce.Mapper#cleanup(org.apache.hadoop.mapreduce.Mapper.Context)
304
	 */
305
	@Override
306
    public void cleanup(Context context) throws IOException, InterruptedException {
307
		log.debug("cleanup: closing multiple outputs...");
308
        mos.close();
309
        log.debug("cleanup: multiple outputs closed");
310
    }
311
	
312
	/**
313
	 * Sets flag indicating {@link AnalysisException} should cause interruption.
314
	 * @param analysisExceptionAsCritical
315
	 */
316
	public void setAnalysisExceptionAsCritical(boolean analysisExceptionAsCritical) {
317
		this.analysisExceptionAsCritical = analysisExceptionAsCritical;
318
	}
319

    
320
	/**
321
	 * Sets flag indicating any other {@link Exception} should cause interruption.
322
	 * @param otherExceptionAsCritical
323
	 */
324
	public void setOtherExceptionAsCritical(boolean otherExceptionAsCritical) {
325
		this.otherExceptionAsCritical = otherExceptionAsCritical;
326
	}
327

    
328
	public void setProgresLogInterval(int progresLogInterval) {
329
		this.progresLogInterval = progresLogInterval;
330
	}
331
	
332
//	public static void main(String[] args) throws Exception {
333
////		testing interruption
334
////		String fileLoc = "/home/azio/Downloads/cermine/hal/4f5cc34f137de4dc89766a9366ca66de.pdf";
335
//		String fileLoc = "/home/azio/Downloads/cermine/4119";
336
//		final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(new File(fileLoc)));
337
//		try {
338
//			ExecutorService executor = Executors.newSingleThreadExecutor();
339
//			Future<ContentExtractorResult> futureResult = executor.submit(new Callable<ContentExtractorResult>() {
340
//				@Override
341
//				public ContentExtractorResult call() throws Exception {
342
//					ContentExtractor extractor = new ContentExtractor();
343
//				    extractor.uploadPDF(bis);
344
//					Element resultElem = extractor.getNLMContent();
345
//					XMLOutputter outputter = new XMLOutputter(Format.getPrettyFormat());
346
//					System.out.println("got NLM content: \n" + outputter.outputString(resultElem));
347
//					return new ContentExtractorResult(null, 
348
//							DocumentText.newBuilder().setId("1234").setText("xxx").build());
349
//				}
350
//			});
351
//			System.out.println("before get");
352
//			try {
353
//				ContentExtractorResult result = futureResult.get(5, TimeUnit.SECONDS);
354
//				System.out.println("got result: " + result.text);
355
//				System.out.println("after get, before shutdown");
356
//			} catch(TimeoutException e) {
357
//				System.out.println("timeout! before shutdown");
358
//				executor.shutdown();
359
//				System.out.println("after shutdown");	
360
//			}
361
//		} finally {
362
//			bis.close();
363
//		}
364
//	}
365

    
366
}
(1-1/8)