需求:Reduce输出特殊的格式结果
例如:如Reducer的结果,压到Guava的BloomFilter中
import com.google.common.hash.BloomFilter; import com.google.common.hash.Funnels; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import java.io.BufferedOutputStream; import java.io.IOException; import java.io.OutputStream; import java.nio.charset.Charset; public class BloomFilterFileOutputFormat extends FileOutputFormat<Text, NullWritable> { private static final String FILE_NAME = ".model"; @Override public RecordWriter<Text, NullWritable> getRecordWriter( TaskAttemptContext job) throws IOException, InterruptedException { Path file = getDefaultWorkFile(job, FILE_NAME); FileSystem fs = file.getFileSystem(job.getConfiguration()); FSDataOutputStream fileOut = fs.create(file, false); return new BloomFilterRecordWriter(fileOut); } // writer class public static class BloomFilterRecordWriter extends RecordWriter<Text, NullWritable> { // max rows 1000w private static final long ROWS = 1000_0000L; // fpp 0.01% private static final double FPP = 0.0001; private OutputStream out; private BloomFilter<String> bf; public BloomFilterRecordWriter(OutputStream outputStream) { this.out = new BufferedOutputStream(outputStream); this.bf = BloomFilter.create(Funnels.stringFunnel( Charset.defaultCharset()), ROWS, FPP); } @Override public void write(Text text, NullWritable nullWritable) throws IOException { bf.put(text.toString()); } @Override public void close(TaskAttemptContext context) throws IOException, InterruptedException { bf.writeTo(out); out.close(); } } }