定制Hadoop的MapReduce任务的FileOutputFormat

需求:Reduce输出特殊的格式结果
例如:如Reducer的结果,压到Guava的BloomFilter中

import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;

public class BloomFilterFileOutputFormat extends FileOutputFormat<Text, NullWritable> {

    private static final String FILE_NAME = ".model";

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(
            TaskAttemptContext job) throws IOException, InterruptedException {
        Path file = getDefaultWorkFile(job, FILE_NAME);
        FileSystem fs = file.getFileSystem(job.getConfiguration());
        FSDataOutputStream fileOut = fs.create(file, false);
        return new BloomFilterRecordWriter(fileOut);
    }

    // writer class
    public static class BloomFilterRecordWriter extends RecordWriter<Text, NullWritable> {

        // max rows 1000w
        private static final long ROWS = 1000_0000L;

        // fpp 0.01%
        private static final double FPP = 0.0001;

        private OutputStream out;

        private BloomFilter<String> bf;

        public BloomFilterRecordWriter(OutputStream outputStream) {
            this.out = new BufferedOutputStream(outputStream);
            this.bf = BloomFilter.create(Funnels.stringFunnel(
                    Charset.defaultCharset()), ROWS, FPP);
        }

        @Override
        public void write(Text text, NullWritable nullWritable) throws IOException {
            bf.put(text.toString());
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            bf.writeTo(out);
            out.close();
        }
    }
}

 

 

 

 

 

Leave a Reply

Your email address will not be published. Required fields are marked *