1
|
package eu.dnetlib.iis.metadataextraction;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.io.InputStream;
|
5
|
import java.io.PrintWriter;
|
6
|
import java.io.StringWriter;
|
7
|
import java.util.ArrayList;
|
8
|
import java.util.Arrays;
|
9
|
import java.util.HashMap;
|
10
|
import java.util.HashSet;
|
11
|
import java.util.List;
|
12
|
import java.util.Map;
|
13
|
import java.util.Set;
|
14
|
|
15
|
import org.apache.avro.mapred.AvroKey;
|
16
|
import org.apache.commons.lang.StringUtils;
|
17
|
import org.apache.hadoop.io.NullWritable;
|
18
|
import org.apache.hadoop.mapreduce.Mapper;
|
19
|
import org.apache.log4j.Logger;
|
20
|
import org.jdom.Document;
|
21
|
import org.jdom.Element;
|
22
|
import org.jdom.JDOMException;
|
23
|
import org.jdom.output.Format;
|
24
|
import org.jdom.output.XMLOutputter;
|
25
|
|
26
|
import pl.edu.icm.cermine.ContentExtractor;
|
27
|
import pl.edu.icm.cermine.exception.AnalysisException;
|
28
|
import pl.edu.icm.cermine.exception.TransformationException;
|
29
|
|
30
|
import com.itextpdf.text.exceptions.InvalidPdfException;
|
31
|
|
32
|
import eu.dnetlib.iis.audit.schemas.Cause;
|
33
|
import eu.dnetlib.iis.audit.schemas.Fault;
|
34
|
import eu.dnetlib.iis.common.WorkflowRuntimeParameters;
|
35
|
import eu.dnetlib.iis.core.javamapreduce.MultipleOutputs;
|
36
|
import eu.dnetlib.iis.metadataextraction.schemas.DocumentText;
|
37
|
import eu.dnetlib.iis.metadataextraction.schemas.ExtractedDocumentMetadata;
|
38
|
|
39
|
/**
|
40
|
* Abstract class containing shared code of metadata extraction.
|
41
|
* @author mhorst
|
42
|
*
|
43
|
*/
|
44
|
public abstract class AbstractMetadataExtractorMapper<T> extends Mapper<AvroKey<T>, NullWritable, NullWritable, NullWritable> {
|
45
|
|
46
|
public static final String LOG_FAULT_PROCESSING_TIME_THRESHOLD_SECS = "log.fault.processing.time.threshold.secs";
|
47
|
|
48
|
public static final String FAULT_CODE_PROCESSING_TIME_THRESHOLD_EXCEEDED = "ProcessingTimeThresholdExceeded";
|
49
|
|
50
|
public static final String FAULT_SUPPLEMENTARY_DATA_PROCESSING_TIME = "processing_time";
|
51
|
|
52
|
protected final Logger log = Logger.getLogger(AbstractMetadataExtractorMapper.class);
|
53
|
|
54
|
|
55
|
/**
|
56
|
* Flag indicating {@link AnalysisException} should cause interruption.
|
57
|
*/
|
58
|
protected boolean analysisExceptionAsCritical = false;
|
59
|
|
60
|
/**
|
61
|
* Flag indicating any other {@link Exception} should cause interruption.
|
62
|
*/
|
63
|
protected boolean otherExceptionAsCritical = false;
|
64
|
|
65
|
/**
|
66
|
* Multiple outputs.
|
67
|
*/
|
68
|
protected MultipleOutputs mos = null;
|
69
|
|
70
|
/**
|
71
|
* Document metadata named output.
|
72
|
*/
|
73
|
protected String namedOutputMeta;
|
74
|
|
75
|
/**
|
76
|
* Document plaintext named output.
|
77
|
*/
|
78
|
protected String namedOutputPlaintext;
|
79
|
|
80
|
/**
|
81
|
* Fault named output.
|
82
|
*/
|
83
|
protected String namedOutputFault;
|
84
|
|
85
|
/**
|
86
|
* Progress log interval.
|
87
|
*/
|
88
|
protected int progresLogInterval = 100;
|
89
|
|
90
|
/**
|
91
|
* Current progress.
|
92
|
*/
|
93
|
protected int currentProgress = 0;
|
94
|
|
95
|
/**
|
96
|
* Interval time.
|
97
|
*/
|
98
|
private long intervalTime = 0;
|
99
|
|
100
|
/**
|
101
|
* Maximum content size in MegaBytes.
|
102
|
*/
|
103
|
protected long maxFileSizeKB = Long.MAX_VALUE;
|
104
|
|
105
|
/**
|
106
|
* Processing time threshold.
|
107
|
* When exceeded apropriate object will be written to error datastore.
|
108
|
*/
|
109
|
protected long processingTimeThreshold = Long.MAX_VALUE;
|
110
|
|
111
|
/**
|
112
|
* Set of object identifiers objects excluded from processing.
|
113
|
*/
|
114
|
protected Set<String> excludedIds;
|
115
|
|
116
|
|
117
|
/* (non-Javadoc)
|
118
|
* @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
|
119
|
*/
|
120
|
@Override
|
121
|
protected void setup(Context context) throws IOException,
|
122
|
InterruptedException {
|
123
|
namedOutputMeta = context.getConfiguration().get("output.meta");
|
124
|
if (namedOutputMeta==null || namedOutputMeta.isEmpty()) {
|
125
|
throw new RuntimeException("no named output provided for metadata");
|
126
|
}
|
127
|
namedOutputPlaintext = context.getConfiguration().get("output.plaintext");
|
128
|
if (namedOutputPlaintext==null || namedOutputPlaintext.isEmpty()) {
|
129
|
throw new RuntimeException("no named output provided for plaintext");
|
130
|
}
|
131
|
namedOutputFault = context.getConfiguration().get("output.fault");
|
132
|
if (namedOutputFault==null || namedOutputFault.isEmpty()) {
|
133
|
throw new RuntimeException("no named output provided for fault");
|
134
|
}
|
135
|
|
136
|
String excludedIdsCSV = context.getConfiguration().get("excluded.ids");
|
137
|
if (excludedIdsCSV!=null && !excludedIdsCSV.trim().isEmpty()
|
138
|
&& !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(excludedIdsCSV)) {
|
139
|
log.warn("got excluded ids: " + excludedIdsCSV);
|
140
|
excludedIds = new HashSet<String>(Arrays.asList(
|
141
|
StringUtils.split(excludedIdsCSV.trim(), ',')));
|
142
|
} else {
|
143
|
log.warn("got no excluded ids");
|
144
|
}
|
145
|
// handling maximum content size
|
146
|
String maxFileSizeMBStr = context.getConfiguration().get(
|
147
|
WorkflowRuntimeParameters.IMPORT_CONTENT_MAX_FILE_SIZE_MB);
|
148
|
if (maxFileSizeMBStr!=null && !maxFileSizeMBStr.trim().isEmpty()
|
149
|
&& !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(maxFileSizeMBStr)) {
|
150
|
this.maxFileSizeKB = 1024l * Integer.valueOf(maxFileSizeMBStr);
|
151
|
}
|
152
|
|
153
|
// handling processing time threshold
|
154
|
String processingTimeThresholdSecsStr = context.getConfiguration().get(
|
155
|
LOG_FAULT_PROCESSING_TIME_THRESHOLD_SECS);
|
156
|
if (processingTimeThresholdSecsStr!=null && !processingTimeThresholdSecsStr.trim().isEmpty()
|
157
|
&& !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(processingTimeThresholdSecsStr)) {
|
158
|
this.processingTimeThreshold = 1000l * Integer.valueOf(processingTimeThresholdSecsStr);
|
159
|
}
|
160
|
|
161
|
mos = new MultipleOutputs(context);
|
162
|
currentProgress = 0;
|
163
|
intervalTime = System.currentTimeMillis();
|
164
|
}
|
165
|
|
166
|
/**
|
167
|
* Processes content input stream. Closes stream at the end.
|
168
|
* @param documentId
|
169
|
* @param contentStream
|
170
|
* @param contentLengthKB content length in KB
|
171
|
* @throws IOException
|
172
|
* @throws InterruptedException
|
173
|
*/
|
174
|
protected void processStream(CharSequence documentId,
|
175
|
InputStream contentStream,
|
176
|
long contentLengthKB) throws IOException, InterruptedException {
|
177
|
try {
|
178
|
currentProgress++;
|
179
|
if (currentProgress>0 && currentProgress%progresLogInterval==0) {
|
180
|
// FIXME switch back to debug when setting debug level on oozie
|
181
|
log.warn("metadata extaction progress: " + currentProgress + ", time taken to process " +
|
182
|
progresLogInterval + " elements: " +
|
183
|
((System.currentTimeMillis() - intervalTime)/1000) + " secs");
|
184
|
intervalTime = System.currentTimeMillis();
|
185
|
}
|
186
|
if (excludedIds!=null && excludedIds.contains(documentId)) {
|
187
|
log.warn("skipping processing for excluded id " + documentId);
|
188
|
} else {
|
189
|
// handling maximum content size
|
190
|
if (contentLengthKB>maxFileSizeKB) {
|
191
|
log.warn("skipping processing for id " + documentId +
|
192
|
" due to max file size limit="+maxFileSizeKB+" KB exceeded: " + contentLengthKB + " KB");
|
193
|
try {
|
194
|
// writing empty metadata
|
195
|
mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
|
196
|
NlmToDocumentWithBasicMetadataConverter.convertFull(
|
197
|
documentId.toString(), null)));
|
198
|
// writing empty plaintext
|
199
|
mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
|
200
|
NlmToDocumentContentConverter.convert(
|
201
|
documentId.toString(), null)));
|
202
|
return;
|
203
|
} catch (TransformationException e2) {
|
204
|
log.debug("closing multiple outputs...");
|
205
|
mos.close();
|
206
|
log.debug("multiple outputs closed");
|
207
|
throw new RuntimeException(e2);
|
208
|
} catch (JDOMException e2) {
|
209
|
log.debug("closing multiple outputs...");
|
210
|
mos.close();
|
211
|
log.debug("multiple outputs closed");
|
212
|
throw new RuntimeException(e2);
|
213
|
} finally {
|
214
|
if (contentStream!=null) {
|
215
|
contentStream.close();
|
216
|
}
|
217
|
}
|
218
|
}
|
219
|
|
220
|
// TODO switch back to debug when setting debug level on oozie
|
221
|
log.warn("starting processing for id: " + documentId);
|
222
|
long startTime = System.currentTimeMillis();
|
223
|
ContentExtractor extractor = new ContentExtractor();
|
224
|
try {
|
225
|
extractor.uploadPDF(contentStream);
|
226
|
try {
|
227
|
Element resultElem = extractor.getNLMContent();
|
228
|
Document doc = new Document(resultElem);
|
229
|
XMLOutputter outputter = new XMLOutputter(Format.getPrettyFormat());
|
230
|
log.debug("got NLM content: \n" + outputter.outputString(resultElem));
|
231
|
mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
|
232
|
NlmToDocumentWithBasicMetadataConverter.convertFull(
|
233
|
documentId.toString(), doc)));
|
234
|
} catch (JDOMException e) {
|
235
|
log.debug("closing multiple outputs...");
|
236
|
mos.close();
|
237
|
log.debug("multiple outputs closed");
|
238
|
throw new RuntimeException(e);
|
239
|
} catch (TransformationException e) {
|
240
|
log.debug("closing multiple outputs...");
|
241
|
mos.close();
|
242
|
log.debug("multiple outputs closed");
|
243
|
throw new RuntimeException(e);
|
244
|
} catch (AnalysisException e) {
|
245
|
if (analysisExceptionAsCritical) {
|
246
|
log.debug("closing multiple outputs...");
|
247
|
mos.close();
|
248
|
log.debug("multiple outputs closed");
|
249
|
throw new RuntimeException(e);
|
250
|
} else {
|
251
|
if (e.getCause() instanceof InvalidPdfException) {
|
252
|
log.error("Invalid PDF file", e);
|
253
|
} else {
|
254
|
log.error("got unexpected analysis exception, just logging", e);
|
255
|
}
|
256
|
try {
|
257
|
// writing empty result
|
258
|
mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
|
259
|
NlmToDocumentWithBasicMetadataConverter.convertFull(
|
260
|
documentId.toString(), null)));
|
261
|
// writing fault result
|
262
|
mos.write(namedOutputFault, new AvroKey<Fault>(
|
263
|
exceptionToFault(documentId, e)));
|
264
|
} catch (TransformationException e2) {
|
265
|
log.debug("closing multiple outputs...");
|
266
|
mos.close();
|
267
|
log.debug("multiple outputs closed");
|
268
|
throw new RuntimeException(e2);
|
269
|
} catch (JDOMException e2) {
|
270
|
log.debug("closing multiple outputs...");
|
271
|
mos.close();
|
272
|
log.debug("multiple outputs closed");
|
273
|
throw new RuntimeException(e2);
|
274
|
}
|
275
|
}
|
276
|
} catch (Exception e) {
|
277
|
if (otherExceptionAsCritical) {
|
278
|
log.debug("closing multiple outputs...");
|
279
|
mos.close();
|
280
|
log.debug("multiple outputs closed");
|
281
|
throw new RuntimeException(e);
|
282
|
} else {
|
283
|
log.error("got unexpected exception, just logging", e);
|
284
|
try {
|
285
|
// writing empty result
|
286
|
mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
|
287
|
NlmToDocumentWithBasicMetadataConverter.convertFull(
|
288
|
documentId.toString(), null)));
|
289
|
// writing fault result
|
290
|
mos.write(namedOutputFault, new AvroKey<Fault>(
|
291
|
exceptionToFault(documentId, e)));
|
292
|
} catch (TransformationException e2) {
|
293
|
log.debug("closing multiple outputs...");
|
294
|
mos.close();
|
295
|
log.debug("multiple outputs closed");
|
296
|
throw new RuntimeException(e2);
|
297
|
} catch (JDOMException e2) {
|
298
|
log.debug("closing multiple outputs...");
|
299
|
mos.close();
|
300
|
log.debug("multiple outputs closed");
|
301
|
throw new RuntimeException(e2);
|
302
|
}
|
303
|
}
|
304
|
}
|
305
|
try {
|
306
|
mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
|
307
|
NlmToDocumentContentConverter.convert(
|
308
|
documentId.toString(), extractor.getRawFullText())));
|
309
|
} catch (AnalysisException e) {
|
310
|
if (analysisExceptionAsCritical) {
|
311
|
log.debug("closing multiple outputs...");
|
312
|
mos.close();
|
313
|
log.debug("multiple outputs closed");
|
314
|
throw new RuntimeException(e);
|
315
|
} else {
|
316
|
if (e.getCause() instanceof InvalidPdfException) {
|
317
|
log.error("Invalid PDF file when retrieving plaintext", e);
|
318
|
} else {
|
319
|
log.error("got unexpected analysis exception "
|
320
|
+ "when retrieving plaintext, just logging", e);
|
321
|
}
|
322
|
// writing empty result
|
323
|
mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
|
324
|
NlmToDocumentContentConverter.convert(
|
325
|
documentId.toString(), null)));
|
326
|
// writing fault result
|
327
|
mos.write(namedOutputFault, new AvroKey<Fault>(
|
328
|
exceptionToFault(documentId, e)));
|
329
|
}
|
330
|
}
|
331
|
} finally {
|
332
|
if (contentStream!=null) {
|
333
|
contentStream.close();
|
334
|
}
|
335
|
}
|
336
|
long processingTime = System.currentTimeMillis() - startTime;
|
337
|
if (processingTime > processingTimeThreshold) {
|
338
|
Map<CharSequence, CharSequence> supplementaryData = new HashMap<CharSequence, CharSequence>();
|
339
|
supplementaryData.put(FAULT_SUPPLEMENTARY_DATA_PROCESSING_TIME, String.valueOf(processingTime));
|
340
|
// writing fault result
|
341
|
mos.write(namedOutputFault, new AvroKey<Fault>(Fault.newBuilder().
|
342
|
setInputEntityId(documentId).setTimestamp(System.currentTimeMillis()).
|
343
|
setCode(FAULT_CODE_PROCESSING_TIME_THRESHOLD_EXCEEDED).
|
344
|
setSupplementaryData(supplementaryData).build()));
|
345
|
}
|
346
|
// TODO switch back to debug when setting debug level on oozie
|
347
|
log.warn("finished processing for id " + documentId + " in " +
|
348
|
(processingTime/1000) + " secs");
|
349
|
}
|
350
|
} catch (AnalysisException e) {
|
351
|
log.debug("closing multiple outputs...");
|
352
|
mos.close();
|
353
|
log.debug("multiple outputs closed");
|
354
|
throw new RuntimeException(e);
|
355
|
}
|
356
|
}
|
357
|
|
358
|
protected static Fault exceptionToFault(CharSequence entityId, Throwable e) {
|
359
|
Fault.Builder faultBuilder = Fault.newBuilder();
|
360
|
faultBuilder.setInputEntityId(entityId);
|
361
|
faultBuilder.setTimestamp(System.currentTimeMillis());
|
362
|
faultBuilder.setCode(e.getClass().getName());
|
363
|
faultBuilder.setMessage(e.getMessage());
|
364
|
StringWriter strWriter = new StringWriter();
|
365
|
PrintWriter pw = new PrintWriter(strWriter);
|
366
|
e.printStackTrace(pw);
|
367
|
pw.close();
|
368
|
faultBuilder.setStackTrace(strWriter.toString());
|
369
|
if (e.getCause()!=null) {
|
370
|
faultBuilder.setCauses(appendThrowableToCauses(
|
371
|
e.getCause(), new ArrayList<Cause>()));
|
372
|
}
|
373
|
return faultBuilder.build();
|
374
|
}
|
375
|
|
376
|
protected static List<Cause> appendThrowableToCauses(Throwable e, List<Cause> causes) {
|
377
|
Cause.Builder causeBuilder = Cause.newBuilder();
|
378
|
causeBuilder.setCode(e.getClass().getName());
|
379
|
causeBuilder.setMessage(e.getMessage());
|
380
|
causes.add(causeBuilder.build());
|
381
|
if (e.getCause()!=null) {
|
382
|
return appendThrowableToCauses(
|
383
|
e.getCause(),causes);
|
384
|
} else {
|
385
|
return causes;
|
386
|
}
|
387
|
}
|
388
|
|
389
|
/* (non-Javadoc)
|
390
|
* @see org.apache.hadoop.mapreduce.Mapper#cleanup(org.apache.hadoop.mapreduce.Mapper.Context)
|
391
|
*/
|
392
|
@Override
|
393
|
public void cleanup(Context context) throws IOException, InterruptedException {
|
394
|
log.debug("cleanup: closing multiple outputs...");
|
395
|
mos.close();
|
396
|
log.debug("cleanup: multiple outputs closed");
|
397
|
}
|
398
|
|
399
|
/**
|
400
|
* Sets flag indicating {@link AnalysisException} should cause interruption.
|
401
|
* @param analysisExceptionAsCritical
|
402
|
*/
|
403
|
public void setAnalysisExceptionAsCritical(boolean analysisExceptionAsCritical) {
|
404
|
this.analysisExceptionAsCritical = analysisExceptionAsCritical;
|
405
|
}
|
406
|
|
407
|
/**
|
408
|
* Sets flag indicating any other {@link Exception} should cause interruption.
|
409
|
* @param otherExceptionAsCritical
|
410
|
*/
|
411
|
public void setOtherExceptionAsCritical(boolean otherExceptionAsCritical) {
|
412
|
this.otherExceptionAsCritical = otherExceptionAsCritical;
|
413
|
}
|
414
|
|
415
|
public void setProgresLogInterval(int progresLogInterval) {
|
416
|
this.progresLogInterval = progresLogInterval;
|
417
|
}
|
418
|
|
419
|
}
|