Project

General

Profile

« Previous | Next » 

Revision 29515

#527 introducing ACM XML dump importer module importing bibliographic references for further citation-matching analysis

View differences:

modules/icm-iis-import/trunk/src/main/java/eu/dnetlib/iis/importer/acm/AcmXmlImporter.java
1
package eu.dnetlib.iis.importer.acm;
2

  
3
import java.io.InputStream;
4
import java.security.InvalidParameterException;
5
import java.util.Collections;
6
import java.util.HashMap;
7
import java.util.Map;
8

  
9
import javax.xml.parsers.SAXParser;
10
import javax.xml.parsers.SAXParserFactory;
11

  
12
import org.apache.avro.file.DataFileWriter;
13
import org.apache.hadoop.fs.FileStatus;
14
import org.apache.hadoop.fs.FileSystem;
15
import org.apache.hadoop.fs.Path;
16

  
17
import eu.dnetlib.iis.citationmatching.schemas.DocumentMetadata;
18
import eu.dnetlib.iis.core.java.HadoopContext;
19
import eu.dnetlib.iis.core.java.PortBindings;
20
import eu.dnetlib.iis.core.java.Process;
21
import eu.dnetlib.iis.core.java.io.DataStore;
22
import eu.dnetlib.iis.core.java.io.FileSystemPath;
23
import eu.dnetlib.iis.core.java.porttype.AvroPortType;
24
import eu.dnetlib.iis.core.java.porttype.PortType;
25
import eu.dnetlib.iis.importer.dataset.DataFileRecordReceiver;
26

  
27
/**
28
 * Process module importing ACM records from xml dump
29
 * and writing output to avro datastore.
30
 * @author mhorst
31
 *
32
 */
33
public class AcmXmlImporter implements Process {
34
	
35
	private static final String PORT_OUT_DOCUMENT_METADATA = "document_metadata";
36
	
37
	public static final String PARAM_ACM_XML_DUMP_PATH = "import.acm.xmldump.path";
38

  
39
	private static final Map<String, PortType> outputPorts = new HashMap<String, PortType>();
40
	
41
	{
42
		outputPorts.put(PORT_OUT_DOCUMENT_METADATA, 
43
				new AvroPortType(DocumentMetadata.SCHEMA$));
44
	}
45
	
46
	@Override
47
	public Map<String, PortType> getInputPorts() {
48
		return Collections.emptyMap();
49
	}
50

  
51
	@Override
52
	public Map<String, PortType> getOutputPorts() {
53
		return outputPorts;
54
	}
55

  
56
	@Override
57
	public void run(PortBindings portBindings, HadoopContext context,
58
			Map<String, String> parameters) throws Exception {
59
		FileSystem fs = FileSystem.get(context.getConfiguration());
60
		if (parameters.containsKey(PARAM_ACM_XML_DUMP_PATH)) {
61
			DataFileWriter<DocumentMetadata> datasetRefWriter = null;
62
			try {
63
				datasetRefWriter = DataStore.create(
64
						new FileSystemPath(fs, portBindings.getOutput().get(PORT_OUT_DOCUMENT_METADATA)), 
65
						DocumentMetadata.SCHEMA$);
66
				processNode(fs, 
67
						new Path(parameters.get(PARAM_ACM_XML_DUMP_PATH)),
68
						datasetRefWriter);
69
			} finally {
70
				if (datasetRefWriter!=null) {
71
					datasetRefWriter.close();	
72
				}	
73
			}		
74
		} else {
75
			throw new InvalidParameterException("required parameter '" + 
76
					PARAM_ACM_XML_DUMP_PATH + "' is missing!");
77
		}
78
	}
79

  
80
	protected void processNode(FileSystem fs, Path currentPath,
81
			DataFileWriter<DocumentMetadata> datasetRefWriter) throws Exception {
82
		if (fs.isDirectory(currentPath)) {
83
			for (FileStatus fileStatus : fs.listStatus(currentPath)) {
84
				processNode(fs, fileStatus.getPath(), 
85
						datasetRefWriter);
86
			}
87
		} else {
88
			InputStream inputStream = null;
89
			SAXParser saxParser = null;
90
			try {
91
				saxParser = SAXParserFactory.newInstance().newSAXParser();
92
				saxParser.parse(inputStream = fs.open(
93
						currentPath),
94
						new AcmDumpXmlHandler( 
95
								new DataFileRecordReceiver<DocumentMetadata>(datasetRefWriter)));	
96
			} finally {
97
				if (inputStream!=null) {
98
					inputStream.close();
99
				}	
100
			}
101
		}
102
	}
103
	
104
}
0 105

  
modules/icm-iis-import/trunk/src/main/java/eu/dnetlib/iis/importer/acm/AcmDumpXmlHandler.java
1
package eu.dnetlib.iis.importer.acm;
2

  
3
import java.io.IOException;
4
import java.util.ArrayList;
5
import java.util.Collections;
6
import java.util.List;
7
import java.util.Stack;
8

  
9
import org.apache.log4j.Logger;
10
import org.xml.sax.Attributes;
11
import org.xml.sax.SAXException;
12
import org.xml.sax.helpers.DefaultHandler;
13

  
14
import eu.dnetlib.iis.citationmatching.schemas.BasicMetadata;
15
import eu.dnetlib.iis.citationmatching.schemas.DocumentMetadata;
16
import eu.dnetlib.iis.citationmatching.schemas.ReferenceMetadata;
17
import eu.dnetlib.iis.importer.dataset.RecordReceiver;
18

  
19
/**
20
 * ACM XML dump SAX handler.
21
 * Notice: writer is not being closed by handler.
22
 * Created outside, let it be closed outside as well.
23
 * @author mhorst
24
 *
25
 */
26
public class AcmDumpXmlHandler extends DefaultHandler {
27

  
28
	private static final String ELEM_ARTICLE_REC = "article_rec";
29
	private static final String ELEM_ARTICLE_ID = "article_id";
30
	private static final String ELEM_TITLE = "title";
31
	
32
	private static final String ELEM_REFERENCES = "references";
33
	private static final String ELEM_REF = "ref";
34
	private static final String ELEM_REF_SEQ_NO = "ref_seq_no";
35
	private static final String ELEM_REF_TEXT = "ref_text";
36
	
37
	private final Logger log = Logger.getLogger(this.getClass());
38
	
39
	private Stack<String> parents;
40
	
41
	private StringBuilder currentValue = new StringBuilder();
42
	
43
	private String articleId = null;
44
	private String title = null;
45
	private List<ReferenceMetadata> references = null;	
46
	private Integer refSeqNo = null;
47
	private String refText = null;
48
	
49
	private int counter = 0;
50
	
51
	private final RecordReceiver<DocumentMetadata> receiver;
52
	
53
	/**
54
	 * Default constructor.
55
	 * @param receiver
56
	 */
57
	public AcmDumpXmlHandler(RecordReceiver<DocumentMetadata> receiver) {
58
		super();
59
		this.receiver = receiver;
60
	}
61
	
62
	@Override
63
	public void startDocument() throws SAXException {
64
		this.parents = new Stack<String>();
65
		this.counter = 0;
66
		clearAllFields();
67
	}
68

  
69
	@Override
70
	public void startElement(String uri, String localName, String qName,
71
			Attributes attributes) throws SAXException {
72
		if (isWithinElement(qName, ELEM_ARTICLE_ID, ELEM_ARTICLE_REC)) {
73
			this.currentValue = new StringBuilder();
74
		} else if (isWithinElement(qName, ELEM_TITLE, ELEM_ARTICLE_REC)) {
75
			this.currentValue = new StringBuilder();
76
		} else if (isWithinElement(qName, ELEM_REFERENCES, ELEM_ARTICLE_REC)) {
77
			this.currentValue = new StringBuilder();
78
			this.references = new ArrayList<ReferenceMetadata>();
79
		} else if (isWithinElement(qName, ELEM_REF_SEQ_NO, ELEM_REF)) {
80
			this.currentValue = new StringBuilder();
81
		} else if (isWithinElement(qName, ELEM_REF_TEXT, ELEM_REF)) {
82
			this.currentValue = new StringBuilder();
83
		}
84
		this.parents.push(qName);
85
	}
86

  
87
	@Override
88
	public void endElement(String uri, String localName, String qName)
89
			throws SAXException {
90
		this.parents.pop();
91
		if (isWithinElement(qName, ELEM_ARTICLE_ID, ELEM_ARTICLE_REC)) {
92
			this.articleId = this.currentValue.toString().trim();
93
		} else if (isWithinElement(qName, ELEM_TITLE, ELEM_ARTICLE_REC)) {
94
			this.title = this.currentValue.toString().trim();
95
		} else if (isWithinElement(qName, ELEM_REF_SEQ_NO, ELEM_REF)) {
96
			this.refSeqNo = Integer.parseInt(this.currentValue.toString().trim());
97
		} else if (isWithinElement(qName, ELEM_REF_TEXT, ELEM_REF)) {
98
			this.refText = this.currentValue.toString().trim();
99
		} else if (isWithinElement(qName, ELEM_REF, ELEM_REFERENCES)) {
100
			ReferenceMetadata.Builder refMetaBuilder = ReferenceMetadata.newBuilder();
101
			BasicMetadata.Builder basicMetaBuilder = BasicMetadata.newBuilder();
102
			basicMetaBuilder.setAuthors(Collections.<CharSequence>emptyList());
103
			refMetaBuilder.setBasicMetadata(basicMetaBuilder.build());
104
			refMetaBuilder.setPosition(this.refSeqNo);
105
			refMetaBuilder.setRawText(this.refText);
106
			this.references.add(refMetaBuilder.build());
107
		} else if (isWithinElement(qName, ELEM_ARTICLE_REC, null)) {
108
//			writing whole record
109
			if (this.articleId!=null && !this.articleId.isEmpty()) {
110
				try {
111
					DocumentMetadata.Builder docMetaBuilder = DocumentMetadata.newBuilder();
112
					docMetaBuilder.setId(this.articleId);
113
					BasicMetadata.Builder basicMetaBuilder = BasicMetadata.newBuilder();
114
					basicMetaBuilder.setAuthors(Collections.<CharSequence>emptyList());
115
					if (this.title!=null && !this.title.isEmpty()) {
116
						basicMetaBuilder.setTitle(this.title);	
117
					}
118
					docMetaBuilder.setBasicMetadata(basicMetaBuilder.build());
119
					docMetaBuilder.setReferences(this.references);
120
					receiver.receive(docMetaBuilder.build());
121
					counter++;
122
					if (counter%10000==0) {
123
						log.debug("current progress: " + counter);
124
					}
125
				} catch (IOException e) {
126
					throw new SAXException(e);
127
				}
128
			} else {
129
				log.warn("omitting record with null/empty article id and title: " + this.title);
130
			}
131
			clearAllFields();
132
		}
133
//		resetting current value;
134
		this.currentValue = null;
135
	}
136

  
137
	private void clearAllFields() {
138
		this.articleId = null;
139
		this.title = null;
140
		this.references = null;		
141
		this.refSeqNo = null;
142
		this.refText = null;
143
	}
144
	
145
	boolean isWithinElement(String qName,
146
			String expectedElement, String expectedParent) {
147
		return qName.equals(expectedElement) && 
148
				(expectedParent==null || !this.parents.isEmpty() && expectedParent.equals(this.parents.peek()));
149
	}
150
	
151
	@Override
152
	public void endDocument() throws SAXException {
153
		parents.clear();
154
		parents = null;
155
		log.debug("total number of processed records: " + counter);
156
	}
157

  
158
	@Override
159
	public void characters(char[] ch, int start, int length)
160
			throws SAXException {
161
		if (this.currentValue!=null) {
162
			this.currentValue.append(ch, start, length);
163
		}
164
	}
165
	
166
}
0 167

  
modules/icm-iis-import/trunk/src/main/resources/eu/dnetlib/iis/importer/acm/oozie_app/workflow.xml
1
<workflow-app xmlns="uri:oozie:workflow:0.4" name="importer_acm">
2
	
3
	<parameters>
4
		<property>
5
			<name>input_hdfs_location</name>
6
			<description>input ACM XML dump HDFS location</description>
7
		</property>
8
		<property>
9
			<name>output</name>
10
			<description>document metadata output directory</description>
11
		</property>
12
	</parameters>
13

  
14
	<start to="acm-importer" />
15
	
16
	<action name="acm-importer">
17
		<java>
18
			<job-tracker>${jobTracker}</job-tracker>
19
			<name-node>${nameNode}</name-node>
20
			<!-- The data generated by this node is deleted in this section -->
21
			<prepare>
22
				<delete path="${nameNode}${workingDir}/import_dataset" />
23
				<delete path="${nameNode}${output}" />
24
				<mkdir path="${nameNode}${workingDir}/import_dataset" />
25
			</prepare>
26
			<configuration>
27
				<property>
28
					<name>mapred.job.queue.name</name>
29
					<value>${queueName}</value>
30
				</property>
31
			</configuration>
32
			<main-class>eu.dnetlib.iis.core.java.ProcessWrapper</main-class>
33
			<arg>eu.dnetlib.iis.importer.acm.AcmXmlImporter</arg>
34
			<arg>-SworkingDir=${workingDir}</arg>
35
			<arg>-Pimport.acm.xmldump.path=${input_hdfs_location}</arg>
36
			<arg>-Odocument_metadata=${output}</arg>
37
		</java>
38
		<ok to="end" />
39
		<error to="fail" />
40
	</action>
41
	
42
	<kill name="fail">
43
		<message>Unfortunately, the process failed -- error message:
44
			[${wf:errorMessage(wf:lastErrorNode())}]</message>
45
	</kill>
46
	<end name="end" />
47
</workflow-app>
0 48

  
modules/icm-iis-import/trunk/src/main/resources/eu/dnetlib/iis/importer/acm/job.properties
1
#input_hdfs_location=/tmp/acm_citations/sample
2
input_hdfs_location=/tmp/acm_citations/full
3
output=${workingDir}/out
4

  
0 5

  

Also available in: Unified diff