1
|
package eu.dnetlib.iis.metadataextraction;
|
2
|
|
3
|
import java.io.IOException;
|
4
|
import java.io.InputStream;
|
5
|
import java.util.Arrays;
|
6
|
import java.util.HashSet;
|
7
|
import java.util.Set;
|
8
|
|
9
|
import org.apache.avro.mapred.AvroKey;
|
10
|
import org.apache.commons.lang.StringUtils;
|
11
|
import org.apache.hadoop.io.NullWritable;
|
12
|
import org.apache.hadoop.mapreduce.Mapper;
|
13
|
import org.apache.log4j.Logger;
|
14
|
import org.jdom.Document;
|
15
|
import org.jdom.Element;
|
16
|
import org.jdom.JDOMException;
|
17
|
import org.jdom.output.Format;
|
18
|
import org.jdom.output.XMLOutputter;
|
19
|
|
20
|
import pl.edu.icm.cermine.ContentExtractor;
|
21
|
import pl.edu.icm.cermine.exception.AnalysisException;
|
22
|
import pl.edu.icm.cermine.exception.TransformationException;
|
23
|
|
24
|
import com.itextpdf.text.exceptions.InvalidPdfException;
|
25
|
|
26
|
import eu.dnetlib.iis.common.WorkflowRuntimeParameters;
|
27
|
import eu.dnetlib.iis.core.javamapreduce.MultipleOutputs;
|
28
|
import eu.dnetlib.iis.metadataextraction.schemas.DocumentText;
|
29
|
import eu.dnetlib.iis.metadataextraction.schemas.ExtractedDocumentMetadata;
|
30
|
|
31
|
/**
|
32
|
* Abstract class containing shared code of metadata extraction.
|
33
|
* @author mhorst
|
34
|
*
|
35
|
*/
|
36
|
public abstract class AbstractMetadataExtractorMapper<T> extends Mapper<AvroKey<T>, NullWritable, NullWritable, NullWritable> {
|
37
|
|
38
|
|
39
|
protected final Logger log = Logger.getLogger(AbstractMetadataExtractorMapper.class);
|
40
|
|
41
|
|
42
|
/**
|
43
|
* Flag indicating {@link AnalysisException} should cause interruption.
|
44
|
*/
|
45
|
protected boolean analysisExceptionAsCritical = false;
|
46
|
|
47
|
/**
|
48
|
* Flag indicating any other {@link Exception} should cause interruption.
|
49
|
*/
|
50
|
protected boolean otherExceptionAsCritical = false;
|
51
|
|
52
|
/**
|
53
|
* Multiple outputs.
|
54
|
*/
|
55
|
protected MultipleOutputs mos = null;
|
56
|
|
57
|
/**
|
58
|
* Document metadata named output.
|
59
|
*/
|
60
|
protected String namedOutputMeta;
|
61
|
|
62
|
/**
|
63
|
* Document plaintext named output.
|
64
|
*/
|
65
|
protected String namedOutputPlaintext;
|
66
|
|
67
|
/**
|
68
|
* Progress log interval.
|
69
|
*/
|
70
|
protected int progresLogInterval = 100;
|
71
|
|
72
|
/**
|
73
|
* Current progress.
|
74
|
*/
|
75
|
protected int currentProgress = 0;
|
76
|
|
77
|
/**
|
78
|
* Interval time.
|
79
|
*/
|
80
|
protected long intervalTime = 0;
|
81
|
|
82
|
/**
|
83
|
* Maximum content size in MegaBytes.
|
84
|
*/
|
85
|
protected long maxFileSize = Long.MAX_VALUE;
|
86
|
|
87
|
/**
|
88
|
* Set of object identifiers objects excluded from processing.
|
89
|
*/
|
90
|
protected Set<String> excludedIds;
|
91
|
|
92
|
|
93
|
/* (non-Javadoc)
|
94
|
* @see org.apache.hadoop.mapreduce.Mapper#setup(org.apache.hadoop.mapreduce.Mapper.Context)
|
95
|
*/
|
96
|
@Override
|
97
|
protected void setup(Context context) throws IOException,
|
98
|
InterruptedException {
|
99
|
namedOutputMeta = context.getConfiguration().get("output.meta");
|
100
|
if (namedOutputMeta==null || namedOutputMeta.isEmpty()) {
|
101
|
throw new RuntimeException("no named output provided for metadata");
|
102
|
}
|
103
|
namedOutputPlaintext = context.getConfiguration().get("output.plaintext");
|
104
|
if (namedOutputPlaintext==null || namedOutputPlaintext.isEmpty()) {
|
105
|
throw new RuntimeException("no named output provided for plaintext");
|
106
|
}
|
107
|
String excludedIdsCSV = context.getConfiguration().get("excluded.ids");
|
108
|
if (excludedIdsCSV!=null && !excludedIdsCSV.trim().isEmpty()
|
109
|
&& !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(excludedIdsCSV)) {
|
110
|
log.warn("got excluded ids: " + excludedIdsCSV);
|
111
|
excludedIds = new HashSet<String>(Arrays.asList(
|
112
|
StringUtils.split(excludedIdsCSV.trim(), ',')));
|
113
|
} else {
|
114
|
log.warn("got no excluded ids");
|
115
|
}
|
116
|
// handling maximum content size
|
117
|
String maxFileSizeMBStr = context.getConfiguration().get(
|
118
|
WorkflowRuntimeParameters.IMPORT_CONTENT_MAX_FILE_SIZE_MB);
|
119
|
if (maxFileSizeMBStr!=null && !maxFileSizeMBStr.trim().isEmpty()
|
120
|
&& !WorkflowRuntimeParameters.UNDEFINED_NONEMPTY_VALUE.equals(maxFileSizeMBStr)) {
|
121
|
this.maxFileSize = 1048576l * Integer.valueOf(maxFileSizeMBStr);
|
122
|
}
|
123
|
|
124
|
mos = new MultipleOutputs(context);
|
125
|
currentProgress = 0;
|
126
|
intervalTime = System.currentTimeMillis();
|
127
|
}
|
128
|
|
129
|
/**
|
130
|
* Processes content input stream. Closes stream at the end.
|
131
|
* @param documentId
|
132
|
* @param contentStream
|
133
|
* @param contentLength
|
134
|
* @throws IOException
|
135
|
* @throws InterruptedException
|
136
|
*/
|
137
|
protected void processStream(CharSequence documentId,
|
138
|
InputStream contentStream,
|
139
|
long contentLength) throws IOException, InterruptedException {
|
140
|
try {
|
141
|
currentProgress++;
|
142
|
if (currentProgress>0 && currentProgress%progresLogInterval==0) {
|
143
|
// FIXME switch back to debug when setting debug level on oozie
|
144
|
log.warn("metadata extaction progress: " + currentProgress + ", time taken to process " +
|
145
|
progresLogInterval + " elements: " +
|
146
|
((System.currentTimeMillis() - intervalTime)/1000) + " secs");
|
147
|
intervalTime = System.currentTimeMillis();
|
148
|
}
|
149
|
if (excludedIds!=null && excludedIds.contains(documentId)) {
|
150
|
log.warn("skipping processing for excluded id " + documentId);
|
151
|
} else {
|
152
|
// handling maximum content size
|
153
|
if (contentLength>maxFileSize) {
|
154
|
log.warn("skipping processing for id " + documentId +
|
155
|
" due to max file size limit="+maxFileSize+" exceeded: " + contentLength);
|
156
|
try {
|
157
|
// writing empty metadata
|
158
|
mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
|
159
|
NlmToDocumentWithBasicMetadataConverter.convertFull(
|
160
|
documentId.toString(), null)));
|
161
|
// writing empty plaintext
|
162
|
mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
|
163
|
NlmToDocumentContentConverter.convert(
|
164
|
documentId.toString(), null)));
|
165
|
return;
|
166
|
} catch (TransformationException e2) {
|
167
|
log.debug("closing multiple outputs...");
|
168
|
mos.close();
|
169
|
log.debug("multiple outputs closed");
|
170
|
throw new RuntimeException(e2);
|
171
|
} catch (JDOMException e2) {
|
172
|
log.debug("closing multiple outputs...");
|
173
|
mos.close();
|
174
|
log.debug("multiple outputs closed");
|
175
|
throw new RuntimeException(e2);
|
176
|
} finally {
|
177
|
if (contentStream!=null) {
|
178
|
contentStream.close();
|
179
|
}
|
180
|
}
|
181
|
}
|
182
|
|
183
|
// TODO switch back to debug when setting debug level on oozie
|
184
|
log.warn("starting processing for id: " + documentId);
|
185
|
long startTime = System.currentTimeMillis();
|
186
|
ContentExtractor extractor = new ContentExtractor();
|
187
|
try {
|
188
|
extractor.uploadPDF(contentStream);
|
189
|
try {
|
190
|
Element resultElem = extractor.getNLMContent();
|
191
|
Document doc = new Document(resultElem);
|
192
|
XMLOutputter outputter = new XMLOutputter(Format.getPrettyFormat());
|
193
|
log.debug("got NLM content: \n" + outputter.outputString(resultElem));
|
194
|
mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
|
195
|
NlmToDocumentWithBasicMetadataConverter.convertFull(
|
196
|
documentId.toString(), doc)));
|
197
|
} catch (JDOMException e) {
|
198
|
log.debug("closing multiple outputs...");
|
199
|
mos.close();
|
200
|
log.debug("multiple outputs closed");
|
201
|
throw new RuntimeException(e);
|
202
|
} catch (TransformationException e) {
|
203
|
log.debug("closing multiple outputs...");
|
204
|
mos.close();
|
205
|
log.debug("multiple outputs closed");
|
206
|
throw new RuntimeException(e);
|
207
|
} catch (AnalysisException e) {
|
208
|
if (analysisExceptionAsCritical) {
|
209
|
log.debug("closing multiple outputs...");
|
210
|
mos.close();
|
211
|
log.debug("multiple outputs closed");
|
212
|
throw new RuntimeException(e);
|
213
|
} else {
|
214
|
if (e.getCause() instanceof InvalidPdfException) {
|
215
|
log.error("Invalid PDF file", e);
|
216
|
} else {
|
217
|
log.error("got unexpected analysis exception, just logging", e);
|
218
|
}
|
219
|
try {
|
220
|
// writing empty result
|
221
|
mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
|
222
|
NlmToDocumentWithBasicMetadataConverter.convertFull(
|
223
|
documentId.toString(), null)));
|
224
|
} catch (TransformationException e2) {
|
225
|
log.debug("closing multiple outputs...");
|
226
|
mos.close();
|
227
|
log.debug("multiple outputs closed");
|
228
|
throw new RuntimeException(e2);
|
229
|
} catch (JDOMException e2) {
|
230
|
log.debug("closing multiple outputs...");
|
231
|
mos.close();
|
232
|
log.debug("multiple outputs closed");
|
233
|
throw new RuntimeException(e2);
|
234
|
}
|
235
|
}
|
236
|
} catch (Exception e) {
|
237
|
if (otherExceptionAsCritical) {
|
238
|
log.debug("closing multiple outputs...");
|
239
|
mos.close();
|
240
|
log.debug("multiple outputs closed");
|
241
|
throw new RuntimeException(e);
|
242
|
} else {
|
243
|
log.error("got unexpected exception, just logging", e);
|
244
|
try {
|
245
|
// writing empty result
|
246
|
mos.write(namedOutputMeta, new AvroKey<ExtractedDocumentMetadata>(
|
247
|
NlmToDocumentWithBasicMetadataConverter.convertFull(
|
248
|
documentId.toString(), null)));
|
249
|
} catch (TransformationException e2) {
|
250
|
log.debug("closing multiple outputs...");
|
251
|
mos.close();
|
252
|
log.debug("multiple outputs closed");
|
253
|
throw new RuntimeException(e2);
|
254
|
} catch (JDOMException e2) {
|
255
|
log.debug("closing multiple outputs...");
|
256
|
mos.close();
|
257
|
log.debug("multiple outputs closed");
|
258
|
throw new RuntimeException(e2);
|
259
|
}
|
260
|
}
|
261
|
}
|
262
|
try {
|
263
|
mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
|
264
|
NlmToDocumentContentConverter.convert(
|
265
|
documentId.toString(), extractor.getRawFullText())));
|
266
|
} catch (AnalysisException e) {
|
267
|
if (analysisExceptionAsCritical) {
|
268
|
log.debug("closing multiple outputs...");
|
269
|
mos.close();
|
270
|
log.debug("multiple outputs closed");
|
271
|
throw new RuntimeException(e);
|
272
|
} else {
|
273
|
if (e.getCause() instanceof InvalidPdfException) {
|
274
|
log.error("Invalid PDF file when retrieving plaintext", e);
|
275
|
} else {
|
276
|
log.error("got unexpected analysis exception "
|
277
|
+ "when retrieving plaintext, just logging", e);
|
278
|
}
|
279
|
// writing empty result
|
280
|
mos.write(namedOutputPlaintext, new AvroKey<DocumentText>(
|
281
|
NlmToDocumentContentConverter.convert(
|
282
|
documentId.toString(), null)));
|
283
|
}
|
284
|
}
|
285
|
} finally {
|
286
|
if (contentStream!=null) {
|
287
|
contentStream.close();
|
288
|
}
|
289
|
}
|
290
|
// TODO switch back to debug when setting debug level on oozie
|
291
|
log.warn("finished processing for id " + documentId + " in " +
|
292
|
((System.currentTimeMillis() - startTime)/1000) + " secs");
|
293
|
}
|
294
|
} catch (AnalysisException e) {
|
295
|
log.debug("closing multiple outputs...");
|
296
|
mos.close();
|
297
|
log.debug("multiple outputs closed");
|
298
|
throw new RuntimeException(e);
|
299
|
}
|
300
|
}
|
301
|
|
302
|
/* (non-Javadoc)
|
303
|
* @see org.apache.hadoop.mapreduce.Mapper#cleanup(org.apache.hadoop.mapreduce.Mapper.Context)
|
304
|
*/
|
305
|
@Override
|
306
|
public void cleanup(Context context) throws IOException, InterruptedException {
|
307
|
log.debug("cleanup: closing multiple outputs...");
|
308
|
mos.close();
|
309
|
log.debug("cleanup: multiple outputs closed");
|
310
|
}
|
311
|
|
312
|
/**
|
313
|
* Sets flag indicating {@link AnalysisException} should cause interruption.
|
314
|
* @param analysisExceptionAsCritical
|
315
|
*/
|
316
|
public void setAnalysisExceptionAsCritical(boolean analysisExceptionAsCritical) {
|
317
|
this.analysisExceptionAsCritical = analysisExceptionAsCritical;
|
318
|
}
|
319
|
|
320
|
/**
|
321
|
* Sets flag indicating any other {@link Exception} should cause interruption.
|
322
|
* @param otherExceptionAsCritical
|
323
|
*/
|
324
|
public void setOtherExceptionAsCritical(boolean otherExceptionAsCritical) {
|
325
|
this.otherExceptionAsCritical = otherExceptionAsCritical;
|
326
|
}
|
327
|
|
328
|
public void setProgresLogInterval(int progresLogInterval) {
|
329
|
this.progresLogInterval = progresLogInterval;
|
330
|
}
|
331
|
|
332
|
// public static void main(String[] args) throws Exception {
|
333
|
//// testing interruption
|
334
|
//// String fileLoc = "/home/azio/Downloads/cermine/hal/4f5cc34f137de4dc89766a9366ca66de.pdf";
|
335
|
// String fileLoc = "/home/azio/Downloads/cermine/4119";
|
336
|
// final BufferedInputStream bis = new BufferedInputStream(new FileInputStream(new File(fileLoc)));
|
337
|
// try {
|
338
|
// ExecutorService executor = Executors.newSingleThreadExecutor();
|
339
|
// Future<ContentExtractorResult> futureResult = executor.submit(new Callable<ContentExtractorResult>() {
|
340
|
// @Override
|
341
|
// public ContentExtractorResult call() throws Exception {
|
342
|
// ContentExtractor extractor = new ContentExtractor();
|
343
|
// extractor.uploadPDF(bis);
|
344
|
// Element resultElem = extractor.getNLMContent();
|
345
|
// XMLOutputter outputter = new XMLOutputter(Format.getPrettyFormat());
|
346
|
// System.out.println("got NLM content: \n" + outputter.outputString(resultElem));
|
347
|
// return new ContentExtractorResult(null,
|
348
|
// DocumentText.newBuilder().setId("1234").setText("xxx").build());
|
349
|
// }
|
350
|
// });
|
351
|
// System.out.println("before get");
|
352
|
// try {
|
353
|
// ContentExtractorResult result = futureResult.get(5, TimeUnit.SECONDS);
|
354
|
// System.out.println("got result: " + result.text);
|
355
|
// System.out.println("after get, before shutdown");
|
356
|
// } catch(TimeoutException e) {
|
357
|
// System.out.println("timeout! before shutdown");
|
358
|
// executor.shutdown();
|
359
|
// System.out.println("after shutdown");
|
360
|
// }
|
361
|
// } finally {
|
362
|
// bis.close();
|
363
|
// }
|
364
|
// }
|
365
|
|
366
|
}
|