定制Hadoop的MapReduce任务的FileOutputFormat

需求:Reduce输出特殊的格式结果
例如:如Reducer的结果,压到Guava的BloomFilter中

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;
public class BloomFilterFileOutputFormat extends FileOutputFormat<Text, NullWritable> {
private static final String FILE_NAME = ".model";
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Path file = getDefaultWorkFile(job, FILE_NAME);
FileSystem fs = file.getFileSystem(job.getConfiguration());
FSDataOutputStream fileOut = fs.create(file, false);
return new BloomFilterRecordWriter(fileOut);
}
// writer class
public static class BloomFilterRecordWriter extends RecordWriter<Text, NullWritable> {
// max rows 1000w
private static final long ROWS = 1000_0000L;
// fpp 0.01%
private static final double FPP = 0.0001;
private OutputStream out;
private BloomFilter<String> bf;
public BloomFilterRecordWriter(OutputStream outputStream) {
this.out = new BufferedOutputStream(outputStream);
this.bf = BloomFilter.create(Funnels.stringFunnel(
Charset.defaultCharset()), ROWS, FPP);
}
@Override
public void write(Text text, NullWritable nullWritable) throws IOException {
bf.put(text.toString());
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
bf.writeTo(out);
out.close();
}
}
}
import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; public class BloomFilterFileOutputFormat extends FileOutputFormat<Text, NullWritable> { private static final String FILE_NAME = ".model"; @Override public RecordWriter<Text, NullWritable> getRecordWriter( TaskAttemptContext job) throws IOException, InterruptedException { Path file = getDefaultWorkFile(job, FILE_NAME); FileSystem fs = file.getFileSystem(job.getConfiguration()); FSDataOutputStream fileOut = fs.create(file, false); return new BloomFilterRecordWriter(fileOut); } // writer class public static class BloomFilterRecordWriter extends RecordWriter<Text, NullWritable> { // max rows 1000w private static final long ROWS = 1000_0000L; // fpp 0.01% private static final double FPP = 0.0001; private OutputStream out; private BloomFilter<String> bf; public BloomFilterRecordWriter(OutputStream outputStream) { this.out = new BufferedOutputStream(outputStream); this.bf = BloomFilter.create(Funnels.stringFunnel( Charset.defaultCharset()), ROWS, FPP); } @Override public void write(Text text, NullWritable nullWritable) throws IOException { bf.put(text.toString()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { bf.writeTo(out); out.close(); } } }
import com.google.common.hash.BloomFilter;
import com.google.common.hash.Funnels;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.BufferedOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.Charset;

public class BloomFilterFileOutputFormat extends FileOutputFormat<Text, NullWritable> {

    private static final String FILE_NAME = ".model";

    @Override
    public RecordWriter<Text, NullWritable> getRecordWriter(
            TaskAttemptContext job) throws IOException, InterruptedException {
        Path file = getDefaultWorkFile(job, FILE_NAME);
        FileSystem fs = file.getFileSystem(job.getConfiguration());
        FSDataOutputStream fileOut = fs.create(file, false);
        return new BloomFilterRecordWriter(fileOut);
    }

    // writer class
    public static class BloomFilterRecordWriter extends RecordWriter<Text, NullWritable> {

        // max rows 1000w
        private static final long ROWS = 1000_0000L;

        // fpp 0.01%
        private static final double FPP = 0.0001;

        private OutputStream out;

        private BloomFilter<String> bf;

        public BloomFilterRecordWriter(OutputStream outputStream) {
            this.out = new BufferedOutputStream(outputStream);
            this.bf = BloomFilter.create(Funnels.stringFunnel(
                    Charset.defaultCharset()), ROWS, FPP);
        }

        @Override
        public void write(Text text, NullWritable nullWritable) throws IOException {
            bf.put(text.toString());
        }

        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            bf.writeTo(out);
            out.close();
        }
    }
}

 

 

 

 

 

Leave a Reply

Your email address will not be published. Required fields are marked *