Revision 29515
Added by Marek Horst almost 10 years ago
modules/icm-iis-import/trunk/src/main/java/eu/dnetlib/iis/importer/acm/AcmXmlImporter.java | ||
---|---|---|
1 |
package eu.dnetlib.iis.importer.acm; |
|
2 |
|
|
3 |
import java.io.InputStream; |
|
4 |
import java.security.InvalidParameterException; |
|
5 |
import java.util.Collections; |
|
6 |
import java.util.HashMap; |
|
7 |
import java.util.Map; |
|
8 |
|
|
9 |
import javax.xml.parsers.SAXParser; |
|
10 |
import javax.xml.parsers.SAXParserFactory; |
|
11 |
|
|
12 |
import org.apache.avro.file.DataFileWriter; |
|
13 |
import org.apache.hadoop.fs.FileStatus; |
|
14 |
import org.apache.hadoop.fs.FileSystem; |
|
15 |
import org.apache.hadoop.fs.Path; |
|
16 |
|
|
17 |
import eu.dnetlib.iis.citationmatching.schemas.DocumentMetadata; |
|
18 |
import eu.dnetlib.iis.core.java.HadoopContext; |
|
19 |
import eu.dnetlib.iis.core.java.PortBindings; |
|
20 |
import eu.dnetlib.iis.core.java.Process; |
|
21 |
import eu.dnetlib.iis.core.java.io.DataStore; |
|
22 |
import eu.dnetlib.iis.core.java.io.FileSystemPath; |
|
23 |
import eu.dnetlib.iis.core.java.porttype.AvroPortType; |
|
24 |
import eu.dnetlib.iis.core.java.porttype.PortType; |
|
25 |
import eu.dnetlib.iis.importer.dataset.DataFileRecordReceiver; |
|
26 |
|
|
27 |
/** |
|
28 |
* Process module importing ACM records from xml dump |
|
29 |
* and writing output to avro datastore. |
|
30 |
* @author mhorst |
|
31 |
* |
|
32 |
*/ |
|
33 |
public class AcmXmlImporter implements Process { |
|
34 |
|
|
35 |
private static final String PORT_OUT_DOCUMENT_METADATA = "document_metadata"; |
|
36 |
|
|
37 |
public static final String PARAM_ACM_XML_DUMP_PATH = "import.acm.xmldump.path"; |
|
38 |
|
|
39 |
private static final Map<String, PortType> outputPorts = new HashMap<String, PortType>(); |
|
40 |
|
|
41 |
{ |
|
42 |
outputPorts.put(PORT_OUT_DOCUMENT_METADATA, |
|
43 |
new AvroPortType(DocumentMetadata.SCHEMA$)); |
|
44 |
} |
|
45 |
|
|
46 |
@Override |
|
47 |
public Map<String, PortType> getInputPorts() { |
|
48 |
return Collections.emptyMap(); |
|
49 |
} |
|
50 |
|
|
51 |
@Override |
|
52 |
public Map<String, PortType> getOutputPorts() { |
|
53 |
return outputPorts; |
|
54 |
} |
|
55 |
|
|
56 |
@Override |
|
57 |
public void run(PortBindings portBindings, HadoopContext context, |
|
58 |
Map<String, String> parameters) throws Exception { |
|
59 |
FileSystem fs = FileSystem.get(context.getConfiguration()); |
|
60 |
if (parameters.containsKey(PARAM_ACM_XML_DUMP_PATH)) { |
|
61 |
DataFileWriter<DocumentMetadata> datasetRefWriter = null; |
|
62 |
try { |
|
63 |
datasetRefWriter = DataStore.create( |
|
64 |
new FileSystemPath(fs, portBindings.getOutput().get(PORT_OUT_DOCUMENT_METADATA)), |
|
65 |
DocumentMetadata.SCHEMA$); |
|
66 |
processNode(fs, |
|
67 |
new Path(parameters.get(PARAM_ACM_XML_DUMP_PATH)), |
|
68 |
datasetRefWriter); |
|
69 |
} finally { |
|
70 |
if (datasetRefWriter!=null) { |
|
71 |
datasetRefWriter.close(); |
|
72 |
} |
|
73 |
} |
|
74 |
} else { |
|
75 |
throw new InvalidParameterException("required parameter '" + |
|
76 |
PARAM_ACM_XML_DUMP_PATH + "' is missing!"); |
|
77 |
} |
|
78 |
} |
|
79 |
|
|
80 |
protected void processNode(FileSystem fs, Path currentPath, |
|
81 |
DataFileWriter<DocumentMetadata> datasetRefWriter) throws Exception { |
|
82 |
if (fs.isDirectory(currentPath)) { |
|
83 |
for (FileStatus fileStatus : fs.listStatus(currentPath)) { |
|
84 |
processNode(fs, fileStatus.getPath(), |
|
85 |
datasetRefWriter); |
|
86 |
} |
|
87 |
} else { |
|
88 |
InputStream inputStream = null; |
|
89 |
SAXParser saxParser = null; |
|
90 |
try { |
|
91 |
saxParser = SAXParserFactory.newInstance().newSAXParser(); |
|
92 |
saxParser.parse(inputStream = fs.open( |
|
93 |
currentPath), |
|
94 |
new AcmDumpXmlHandler( |
|
95 |
new DataFileRecordReceiver<DocumentMetadata>(datasetRefWriter))); |
|
96 |
} finally { |
|
97 |
if (inputStream!=null) { |
|
98 |
inputStream.close(); |
|
99 |
} |
|
100 |
} |
|
101 |
} |
|
102 |
} |
|
103 |
|
|
104 |
} |
|
0 | 105 |
modules/icm-iis-import/trunk/src/main/java/eu/dnetlib/iis/importer/acm/AcmDumpXmlHandler.java | ||
---|---|---|
1 |
package eu.dnetlib.iis.importer.acm; |
|
2 |
|
|
3 |
import java.io.IOException; |
|
4 |
import java.util.ArrayList; |
|
5 |
import java.util.Collections; |
|
6 |
import java.util.List; |
|
7 |
import java.util.Stack; |
|
8 |
|
|
9 |
import org.apache.log4j.Logger; |
|
10 |
import org.xml.sax.Attributes; |
|
11 |
import org.xml.sax.SAXException; |
|
12 |
import org.xml.sax.helpers.DefaultHandler; |
|
13 |
|
|
14 |
import eu.dnetlib.iis.citationmatching.schemas.BasicMetadata; |
|
15 |
import eu.dnetlib.iis.citationmatching.schemas.DocumentMetadata; |
|
16 |
import eu.dnetlib.iis.citationmatching.schemas.ReferenceMetadata; |
|
17 |
import eu.dnetlib.iis.importer.dataset.RecordReceiver; |
|
18 |
|
|
19 |
/** |
|
20 |
* ACM XML dump SAX handler. |
|
21 |
* Notice: writer is not being closed by handler. |
|
22 |
* Created outside, let it be closed outside as well. |
|
23 |
* @author mhorst |
|
24 |
* |
|
25 |
*/ |
|
26 |
public class AcmDumpXmlHandler extends DefaultHandler { |
|
27 |
|
|
28 |
private static final String ELEM_ARTICLE_REC = "article_rec"; |
|
29 |
private static final String ELEM_ARTICLE_ID = "article_id"; |
|
30 |
private static final String ELEM_TITLE = "title"; |
|
31 |
|
|
32 |
private static final String ELEM_REFERENCES = "references"; |
|
33 |
private static final String ELEM_REF = "ref"; |
|
34 |
private static final String ELEM_REF_SEQ_NO = "ref_seq_no"; |
|
35 |
private static final String ELEM_REF_TEXT = "ref_text"; |
|
36 |
|
|
37 |
private final Logger log = Logger.getLogger(this.getClass()); |
|
38 |
|
|
39 |
private Stack<String> parents; |
|
40 |
|
|
41 |
private StringBuilder currentValue = new StringBuilder(); |
|
42 |
|
|
43 |
private String articleId = null; |
|
44 |
private String title = null; |
|
45 |
private List<ReferenceMetadata> references = null; |
|
46 |
private Integer refSeqNo = null; |
|
47 |
private String refText = null; |
|
48 |
|
|
49 |
private int counter = 0; |
|
50 |
|
|
51 |
private final RecordReceiver<DocumentMetadata> receiver; |
|
52 |
|
|
53 |
/** |
|
54 |
* Default constructor. |
|
55 |
* @param receiver |
|
56 |
*/ |
|
57 |
public AcmDumpXmlHandler(RecordReceiver<DocumentMetadata> receiver) { |
|
58 |
super(); |
|
59 |
this.receiver = receiver; |
|
60 |
} |
|
61 |
|
|
62 |
@Override |
|
63 |
public void startDocument() throws SAXException { |
|
64 |
this.parents = new Stack<String>(); |
|
65 |
this.counter = 0; |
|
66 |
clearAllFields(); |
|
67 |
} |
|
68 |
|
|
69 |
@Override |
|
70 |
public void startElement(String uri, String localName, String qName, |
|
71 |
Attributes attributes) throws SAXException { |
|
72 |
if (isWithinElement(qName, ELEM_ARTICLE_ID, ELEM_ARTICLE_REC)) { |
|
73 |
this.currentValue = new StringBuilder(); |
|
74 |
} else if (isWithinElement(qName, ELEM_TITLE, ELEM_ARTICLE_REC)) { |
|
75 |
this.currentValue = new StringBuilder(); |
|
76 |
} else if (isWithinElement(qName, ELEM_REFERENCES, ELEM_ARTICLE_REC)) { |
|
77 |
this.currentValue = new StringBuilder(); |
|
78 |
this.references = new ArrayList<ReferenceMetadata>(); |
|
79 |
} else if (isWithinElement(qName, ELEM_REF_SEQ_NO, ELEM_REF)) { |
|
80 |
this.currentValue = new StringBuilder(); |
|
81 |
} else if (isWithinElement(qName, ELEM_REF_TEXT, ELEM_REF)) { |
|
82 |
this.currentValue = new StringBuilder(); |
|
83 |
} |
|
84 |
this.parents.push(qName); |
|
85 |
} |
|
86 |
|
|
87 |
@Override |
|
88 |
public void endElement(String uri, String localName, String qName) |
|
89 |
throws SAXException { |
|
90 |
this.parents.pop(); |
|
91 |
if (isWithinElement(qName, ELEM_ARTICLE_ID, ELEM_ARTICLE_REC)) { |
|
92 |
this.articleId = this.currentValue.toString().trim(); |
|
93 |
} else if (isWithinElement(qName, ELEM_TITLE, ELEM_ARTICLE_REC)) { |
|
94 |
this.title = this.currentValue.toString().trim(); |
|
95 |
} else if (isWithinElement(qName, ELEM_REF_SEQ_NO, ELEM_REF)) { |
|
96 |
this.refSeqNo = Integer.parseInt(this.currentValue.toString().trim()); |
|
97 |
} else if (isWithinElement(qName, ELEM_REF_TEXT, ELEM_REF)) { |
|
98 |
this.refText = this.currentValue.toString().trim(); |
|
99 |
} else if (isWithinElement(qName, ELEM_REF, ELEM_REFERENCES)) { |
|
100 |
ReferenceMetadata.Builder refMetaBuilder = ReferenceMetadata.newBuilder(); |
|
101 |
BasicMetadata.Builder basicMetaBuilder = BasicMetadata.newBuilder(); |
|
102 |
basicMetaBuilder.setAuthors(Collections.<CharSequence>emptyList()); |
|
103 |
refMetaBuilder.setBasicMetadata(basicMetaBuilder.build()); |
|
104 |
refMetaBuilder.setPosition(this.refSeqNo); |
|
105 |
refMetaBuilder.setRawText(this.refText); |
|
106 |
this.references.add(refMetaBuilder.build()); |
|
107 |
} else if (isWithinElement(qName, ELEM_ARTICLE_REC, null)) { |
|
108 |
// writing whole record |
|
109 |
if (this.articleId!=null && !this.articleId.isEmpty()) { |
|
110 |
try { |
|
111 |
DocumentMetadata.Builder docMetaBuilder = DocumentMetadata.newBuilder(); |
|
112 |
docMetaBuilder.setId(this.articleId); |
|
113 |
BasicMetadata.Builder basicMetaBuilder = BasicMetadata.newBuilder(); |
|
114 |
basicMetaBuilder.setAuthors(Collections.<CharSequence>emptyList()); |
|
115 |
if (this.title!=null && !this.title.isEmpty()) { |
|
116 |
basicMetaBuilder.setTitle(this.title); |
|
117 |
} |
|
118 |
docMetaBuilder.setBasicMetadata(basicMetaBuilder.build()); |
|
119 |
docMetaBuilder.setReferences(this.references); |
|
120 |
receiver.receive(docMetaBuilder.build()); |
|
121 |
counter++; |
|
122 |
if (counter%10000==0) { |
|
123 |
log.debug("current progress: " + counter); |
|
124 |
} |
|
125 |
} catch (IOException e) { |
|
126 |
throw new SAXException(e); |
|
127 |
} |
|
128 |
} else { |
|
129 |
log.warn("omitting record with null/empty article id and title: " + this.title); |
|
130 |
} |
|
131 |
clearAllFields(); |
|
132 |
} |
|
133 |
// resetting current value; |
|
134 |
this.currentValue = null; |
|
135 |
} |
|
136 |
|
|
137 |
private void clearAllFields() { |
|
138 |
this.articleId = null; |
|
139 |
this.title = null; |
|
140 |
this.references = null; |
|
141 |
this.refSeqNo = null; |
|
142 |
this.refText = null; |
|
143 |
} |
|
144 |
|
|
145 |
boolean isWithinElement(String qName, |
|
146 |
String expectedElement, String expectedParent) { |
|
147 |
return qName.equals(expectedElement) && |
|
148 |
(expectedParent==null || !this.parents.isEmpty() && expectedParent.equals(this.parents.peek())); |
|
149 |
} |
|
150 |
|
|
151 |
@Override |
|
152 |
public void endDocument() throws SAXException { |
|
153 |
parents.clear(); |
|
154 |
parents = null; |
|
155 |
log.debug("total number of processed records: " + counter); |
|
156 |
} |
|
157 |
|
|
158 |
@Override |
|
159 |
public void characters(char[] ch, int start, int length) |
|
160 |
throws SAXException { |
|
161 |
if (this.currentValue!=null) { |
|
162 |
this.currentValue.append(ch, start, length); |
|
163 |
} |
|
164 |
} |
|
165 |
|
|
166 |
} |
|
0 | 167 |
modules/icm-iis-import/trunk/src/main/resources/eu/dnetlib/iis/importer/acm/oozie_app/workflow.xml | ||
---|---|---|
1 |
<workflow-app xmlns="uri:oozie:workflow:0.4" name="importer_acm"> |
|
2 |
|
|
3 |
<parameters> |
|
4 |
<property> |
|
5 |
<name>input_hdfs_location</name> |
|
6 |
<description>input ACM XML dump HDFS location</description> |
|
7 |
</property> |
|
8 |
<property> |
|
9 |
<name>output</name> |
|
10 |
<description>document metadata output directory</description> |
|
11 |
</property> |
|
12 |
</parameters> |
|
13 |
|
|
14 |
<start to="acm-importer" /> |
|
15 |
|
|
16 |
<action name="acm-importer"> |
|
17 |
<java> |
|
18 |
<job-tracker>${jobTracker}</job-tracker> |
|
19 |
<name-node>${nameNode}</name-node> |
|
20 |
<!-- The data generated by this node is deleted in this section --> |
|
21 |
<prepare> |
|
22 |
<delete path="${nameNode}${workingDir}/import_dataset" /> |
|
23 |
<delete path="${nameNode}${output}" /> |
|
24 |
<mkdir path="${nameNode}${workingDir}/import_dataset" /> |
|
25 |
</prepare> |
|
26 |
<configuration> |
|
27 |
<property> |
|
28 |
<name>mapred.job.queue.name</name> |
|
29 |
<value>${queueName}</value> |
|
30 |
</property> |
|
31 |
</configuration> |
|
32 |
<main-class>eu.dnetlib.iis.core.java.ProcessWrapper</main-class> |
|
33 |
<arg>eu.dnetlib.iis.importer.acm.AcmXmlImporter</arg> |
|
34 |
<arg>-SworkingDir=${workingDir}</arg> |
|
35 |
<arg>-Pimport.acm.xmldump.path=${input_hdfs_location}</arg> |
|
36 |
<arg>-Odocument_metadata=${output}</arg> |
|
37 |
</java> |
|
38 |
<ok to="end" /> |
|
39 |
<error to="fail" /> |
|
40 |
</action> |
|
41 |
|
|
42 |
<kill name="fail"> |
|
43 |
<message>Unfortunately, the process failed -- error message: |
|
44 |
[${wf:errorMessage(wf:lastErrorNode())}]</message> |
|
45 |
</kill> |
|
46 |
<end name="end" /> |
|
47 |
</workflow-app> |
|
0 | 48 |
modules/icm-iis-import/trunk/src/main/resources/eu/dnetlib/iis/importer/acm/job.properties | ||
---|---|---|
1 |
#input_hdfs_location=/tmp/acm_citations/sample |
|
2 |
input_hdfs_location=/tmp/acm_citations/full |
|
3 |
output=${workingDir}/out |
|
4 |
|
|
0 | 5 |
Also available in: Unified diff
#527 introducing ACM XML dump importer module importing bibliographic references for further citation-matching analysis