Project

General

Profile

1
package eu.dnetlib.data.hadoop.action;
2

    
3
import java.io.IOException;
4

    
5
import org.apache.commons.lang.StringUtils;
6
import org.apache.commons.logging.Log;
7
import org.apache.commons.logging.LogFactory;
8
import org.apache.hadoop.conf.Configuration;
9
import org.apache.hadoop.fs.Path;
10
import org.apache.hadoop.io.SequenceFile;
11
import org.apache.hadoop.io.Text;
12
import org.springframework.beans.factory.annotation.Autowired;
13
import org.springframework.beans.factory.annotation.Required;
14

    
15
import eu.dnetlib.data.hadoop.config.ClusterName;
16
import eu.dnetlib.data.hadoop.config.ConfigurationEnumerator;
17
import eu.dnetlib.data.hadoop.hdfs.SequenceFileWriterFactory;
18
import eu.dnetlib.enabling.resultset.client.ResultSetClientFactory;
19

    
20
public class SequenceFileFeeder {
21

    
22
	private static final Log log = LogFactory.getLog(SequenceFileFeeder.class); // NOPMD by marko on 11/24/08 5:02 PM
23

    
24
	private ResultSetClientFactory resultSetClientFactory;
25

    
26
	@Autowired
27
	protected ConfigurationEnumerator configurationEnumerator;
28

    
29
	@Autowired
30
	protected SequenceFileWriterFactory sequenceFileWriterFactory;
31

    
32
	public int feed(final String epr, final ClusterName clusterName, final String path) throws IOException {
33
		return doWrite(epr, clusterName, path);
34
	}
35

    
36
	private int doWrite(final String epr, final ClusterName clusterName, final String path) throws IOException {
37
		final SequenceFile.Writer writer = sequenceFileWriterFactory.getSequenceFileWriter(Text.class, Text.class, getConf(clusterName), new Path(path));
38
		log.info("Opened sequence file writer: " + writer.toString());
39

    
40
		try {
41
			final Text idText = new Text();
42
			final Text bodyText = new Text();
43
			int count = 0;
44
			int nulls = 0;
45
			for (String record : getResultSetClientFactory().getClient(epr)) {
46
				if (StringUtils.isBlank(record)) {
47
					nulls++;
48
				} else {
49
					idText.set(String.valueOf(count++));
50
					bodyText.set(record);
51
					writer.append(idText, bodyText);
52
				}
53
			}
54
			log.info("written " + count + " records in sequence file: " + path);
55
			if (nulls > 0) {
56
				log.warn("found " + nulls + " records in epr!");
57
			}
58
			return count;
59
		} finally {
60
			writer.close();
61
		}
62
	}
63

    
64
	protected Configuration getConf(final ClusterName clusterName) {
65
		return configurationEnumerator.get(clusterName);
66
	}
67

    
68
	public ResultSetClientFactory getResultSetClientFactory() {
69
		return resultSetClientFactory;
70
	}
71

    
72
	@Required
73
	public void setResultSetClientFactory(final ResultSetClientFactory resultSetClientFactory) {
74
		this.resultSetClientFactory = resultSetClientFactory;
75
	}
76

    
77
}
(10-10/13)