Project

General

Profile

1
package eu.dnetlib.data.mapreduce.hbase.lodExport.build;
2

    
3

    
4
import org.apache.hadoop.fs.FSDataOutputStream;
5
import org.apache.hadoop.fs.FileSystem;
6
import org.apache.hadoop.fs.Path;
7
import org.apache.hadoop.io.NullWritable;
8
import org.apache.hadoop.io.Text;
9
import org.apache.hadoop.io.compress.CompressionCodec;
10
import org.apache.hadoop.io.compress.GzipCodec;
11
import org.apache.hadoop.mapred.FileOutputFormat;
12
import org.apache.hadoop.mapred.JobConf;
13
import org.apache.hadoop.mapred.RecordWriter;
14
import org.apache.hadoop.mapred.Reporter;
15
import org.apache.hadoop.mapred.TextOutputFormat;
16
import org.apache.hadoop.util.Progressable;
17
import org.apache.hadoop.util.ReflectionUtils;
18
import org.apache.log4j.Logger;
19

    
20
import java.io.DataOutputStream;
21
import java.io.IOException;
22

    
23
public class StreamingTextOutputFormat<K, V> extends TextOutputFormat<K, V> {
24
    protected static class StreamingLineRecordWriter<K, V> implements RecordWriter<K, V> {
25
        private static final String newline = "\n";
26
        private Logger log = Logger.getLogger(this.getClass());
27

    
28
        protected DataOutputStream out;
29
        private final String keyValueSeparator = "\t";
30
        private final String valueDelimiter = ",";
31
        private boolean dataWritten = false;
32

    
33
        public StreamingLineRecordWriter(DataOutputStream out) {
34
            this.out = out;
35
        }
36

    
37

    
38
        private void writeObject(Object o) throws IOException {
39
            if (o instanceof Text) {
40
                Text to = (Text) o;
41
                //out.write(to.getBytes(), 0, to.getLength());
42
                //          out.write("AAAAAAAAAA".getBytes(), 0, to.getLength());
43
                out.writeUTF(to.toString().replace("\n", ""));
44
            } else {
45
                out.writeUTF(o.toString());
46
            }
47
        }
48

    
49
        public synchronized void write(K key, V value) throws IOException {
50

    
51
            boolean nullKey = key == null || key instanceof NullWritable;
52
            boolean nullValue = value == null || value instanceof NullWritable;
53
            if (nullKey && nullValue) {
54
                return;
55
            }
56

    
57
            if (!nullKey) {
58
                // if we've written data before, append a new line
59
                if (dataWritten) {
60
                    out.writeUTF(newline);
61
                }
62
                // write out the key and separator
63
                writeObject(key);
64
                out.writeUTF(keyValueSeparator);
65
            } else if (!nullValue) {
66
                // write out the value delimiter
67
                out.writeUTF(valueDelimiter);
68
                // write out the value
69
                writeObject(value);
70
            }
71

    
72
            // track that we've written some data
73
            dataWritten = true;
74
        }
75

    
76
        public synchronized void close(Reporter reporter) throws IOException {
77
            // if we've written out any data, append a closing newline
78
            if (dataWritten) {
79
                out.writeUTF(newline);
80
            }
81

    
82
            out.close();
83
        }
84
    }
85

    
86
    @Override
87
    public RecordWriter<K, V> getRecordWriter(FileSystem fileSystem, JobConf job, String name, Progressable progress) throws IOException {
88
        boolean isCompressed = getCompressOutput(job);
89
        if (!isCompressed) {
90
            Path file = FileOutputFormat.getTaskOutputPath(job, name);
91
            FileSystem fs = file.getFileSystem(job);
92
            FSDataOutputStream fileOut = fs.create(file, progress);
93
            return new StreamingLineRecordWriter<K, V>(fileOut);
94
        } else {
95
            Class<? extends CompressionCodec> codecClass = getOutputCompressorClass(
96
                    job, GzipCodec.class);
97
            // create the named codec
98
            CompressionCodec codec = ReflectionUtils.newInstance(codecClass,
99
                    job);
100
            // build the filename including the extension
101
            Path file = FileOutputFormat.getTaskOutputPath(job,
102
                    name + codec.getDefaultExtension());
103
            FileSystem fs = file.getFileSystem(job);
104
            FSDataOutputStream fileOut = fs.create(file, progress);
105
            return new StreamingLineRecordWriter<K, V>(new DataOutputStream(codec.createOutputStream(fileOut)));
106
        }
107
    }
108
}
(4-4/5)