Project

General

Profile

1
package eu.dnetlib.iis.importer.acm;
2

    
3
import java.io.InputStream;
4
import java.security.InvalidParameterException;
5
import java.util.Collections;
6
import java.util.HashMap;
7
import java.util.Map;
8

    
9
import javax.xml.parsers.SAXParser;
10
import javax.xml.parsers.SAXParserFactory;
11

    
12
import org.apache.avro.file.DataFileWriter;
13
import org.apache.hadoop.fs.FileStatus;
14
import org.apache.hadoop.fs.FileSystem;
15
import org.apache.hadoop.fs.Path;
16

    
17
import eu.dnetlib.iis.citationmatching.schemas.DocumentMetadata;
18
import eu.dnetlib.iis.core.java.HadoopContext;
19
import eu.dnetlib.iis.core.java.PortBindings;
20
import eu.dnetlib.iis.core.java.Process;
21
import eu.dnetlib.iis.core.java.io.DataStore;
22
import eu.dnetlib.iis.core.java.io.FileSystemPath;
23
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
24
import eu.dnetlib.iis.core.java.porttype.PortType;
25
import eu.dnetlib.iis.importer.dataset.DataFileRecordReceiver;
26

    
27
/**
28
 * Process module importing ACM records from xml dump
29
 * and writing output to avro datastore.
30
 * @author mhorst
31
 *
32
 */
33
public class AcmXmlImporter implements Process {
34
	
35
	private static final String PORT_OUT_DOCUMENT_METADATA = "document_metadata";
36
	
37
	public static final String PARAM_ACM_XML_DUMP_PATH = "import.acm.xmldump.path";
38

    
39
	private static final Map<String, PortType> outputPorts = new HashMap<String, PortType>();
40
	
41
	{
42
		outputPorts.put(PORT_OUT_DOCUMENT_METADATA, 
43
				new AvroPortType(DocumentMetadata.SCHEMA$));
44
	}
45
	
46
	@Override
47
	public Map<String, PortType> getInputPorts() {
48
		return Collections.emptyMap();
49
	}
50

    
51
	@Override
52
	public Map<String, PortType> getOutputPorts() {
53
		return outputPorts;
54
	}
55

    
56
	@Override
57
	public void run(PortBindings portBindings, HadoopContext context,
58
			Map<String, String> parameters) throws Exception {
59
		FileSystem fs = FileSystem.get(context.getConfiguration());
60
		if (parameters.containsKey(PARAM_ACM_XML_DUMP_PATH)) {
61
			DataFileWriter<DocumentMetadata> datasetRefWriter = null;
62
			try {
63
				datasetRefWriter = DataStore.create(
64
						new FileSystemPath(fs, portBindings.getOutput().get(PORT_OUT_DOCUMENT_METADATA)), 
65
						DocumentMetadata.SCHEMA$);
66
				processNode(fs, 
67
						new Path(parameters.get(PARAM_ACM_XML_DUMP_PATH)),
68
						datasetRefWriter);
69
			} finally {
70
				if (datasetRefWriter!=null) {
71
					datasetRefWriter.close();	
72
				}	
73
			}		
74
		} else {
75
			throw new InvalidParameterException("required parameter '" + 
76
					PARAM_ACM_XML_DUMP_PATH + "' is missing!");
77
		}
78
	}
79

    
80
	protected void processNode(FileSystem fs, Path currentPath,
81
			DataFileWriter<DocumentMetadata> datasetRefWriter) throws Exception {
82
		if (fs.isDirectory(currentPath)) {
83
			for (FileStatus fileStatus : fs.listStatus(currentPath)) {
84
				processNode(fs, fileStatus.getPath(), 
85
						datasetRefWriter);
86
			}
87
		} else {
88
			InputStream inputStream = null;
89
			SAXParser saxParser = null;
90
			try {
91
				saxParser = SAXParserFactory.newInstance().newSAXParser();
92
				saxParser.parse(inputStream = fs.open(
93
						currentPath),
94
						new AcmDumpXmlHandler( 
95
								new DataFileRecordReceiver<DocumentMetadata>(datasetRefWriter)));	
96
			} finally {
97
				if (inputStream!=null) {
98
					inputStream.close();
99
				}	
100
			}
101
		}
102
	}
103
	
104
}
(2-2/2)