前言
Hive中,默认使用的是TextInputFormat,一行表示一条记录。在每条记录(一行中),默认使用^A分割各个字段。
在有些时候,我们往往面对多行,结构化的文档,并需要将其导入Hive处理,此时,就需要自定义InputFormat、OutputFormat,以及SerDe了。
首先来理清这三者之间的关系,我们直接引用Hive官方说法:
SerDe is a short name for "Serializer and Deserializer."
Hive uses SerDe (and !FileFormat) to read and write table rows.
HDFS files --> InputFileFormat --> <key, value> --> Deserializer --> Row object
Row object --> Serializer --> <key, value> --> OutputFileFormat --> HDFS files
总结一下,当面临一个HDFS上的文件时,Hive将如下处理(以读为例):
(1) 调用InputFormat,将文件切成不同的文档。每篇文档即一行(Row)。
(2) 调用SerDe的Deserializer,将一行(Row),切分为各个字段。
当HIVE执行INSERT操作,将Row写入文件时,主要调用OutputFormat、SerDe的Seriliazer,顺序与读取相反。
本文将对InputFormat、OutputFormat、SerDe自定义,使Hive能够与自定义的文档格式进行交互:
<DOC> id=1 name=a </DOC> <DOC> id=2 name=b </DOC> <DOC> id=3 name=c </DOC> <DOC> id=4 name=d </DOC>
如上所示,每篇文档用<DOC>和</DOC>分割。文档之中的每行,为key=value的格式。
1、自定义InputFormat
Hive的InputFormat来源于Hadoop中的对应的部分。需要注意的是,其采用了mapred的老接口。
package com.coder4.hive; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.TextInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.JobConfigurable; import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; public class DocFileInputFormat extends TextInputFormat implements JobConfigurable { @Override public RecordReader<LongWritable, Text> getRecordReader(InputSplit split, JobConf job, Reporter reporter) throws IOException { reporter.setStatus(split.toString()); return new DocRecordReader(job, (FileSplit) split); } }
在本文实现中,我们省略了压缩、解压缩等细节,如果需要,可以参考Hadoop官方的实现。
在上述的InputFormat中,只是简单的实现了接口。对文档进行切分的业务逻辑,在DocRecordReader中完成。
package com.coder4.hive; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.LineRecordReader; import org.apache.hadoop.mapred.RecordReader; public class DocRecordReader implements RecordReader<LongWritable, Text> { // Reader private LineRecordReader reader; // The current line_num and lin private LongWritable lineKey = null; private Text lineValue = null; // Doc related private StringBuilder sb = new StringBuilder(); private boolean inDoc = false; private final String DOC_START = "<DOC>"; private final String DOC_END = "</DOC>"; public DocRecordReader(JobConf job, FileSplit split) throws IOException { reader = new LineRecordReader(job, split); lineKey = reader.createKey(); lineValue = reader.createValue(); } @Override public void close() throws IOException { reader.close(); } @Override public boolean next(LongWritable key, Text value) throws IOException { while (true) { // get current line if (!reader.next(lineKey, lineValue)) { break; } if (!inDoc) { // not in doc, check if <doc> if (lineValue.toString().startsWith(DOC_START)) { // reset doc status inDoc = true; // clean buff sb.delete(0, sb.length()); } } else { // indoc, check if </doc> if (lineValue.toString().startsWith(DOC_END)) { // reset doc status inDoc = false; // set kv and return key.set(key.get() + 1); value.set(sb.toString()); return true; } else { if (sb.length() != 0) { sb.append("\n"); } sb.append(lineValue.toString()); } } } return false; } @Override public float getProgress() throws IOException { return reader.getProgress(); } @Override public LongWritable createKey() { return new LongWritable(0); } @Override public Text createValue() { return new Text(""); } @Override public long getPos() throws IOException { return reader.getPos(); } }
如上的代码中,使用了LineRecordReader,用于读取Split的每一行。为了节省内存,这里对lineValue、lineKey进行了复用。
2、自定义OutputFormat
OutputFormat负责写入,这里要注意的是,不能再照抄Hadoop的对应接口了,需要实现HiveOutputFormat。
package com.coder4.hive; import java.io.IOException; import java.util.Properties; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.TextOutputFormat; import org.apache.hadoop.util.Progressable; @SuppressWarnings({ "rawtypes" }) public class DocFileOutputFormat<K extends WritableComparable, V extends Writable> extends TextOutputFormat<K, V> implements HiveOutputFormat<K, V> { public RecordWriter getHiveRecordWriter(JobConf job, Path outPath, Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties, Progressable progress) throws IOException { FileSystem fs = outPath.getFileSystem(job); FSDataOutputStream out = fs.create(outPath); return new DocRecordWriter(out); } }
类似的,业务逻辑在如下的RecordWriter中:
package com.coder4.hive; import java.io.IOException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; import org.apache.hadoop.io.Writable; public class DocRecordWriter implements RecordWriter { private FSDataOutputStream out; private final String DOC_START = "<DOC>"; private final String DOC_END = "</DOC>"; public DocRecordWriter(FSDataOutputStream o) { this.out = o; } @Override public void close(boolean abort) throws IOException { out.flush(); out.close(); } @Override public void write(Writable wr) throws IOException { write(DOC_START); write("\n"); write(wr.toString()); write("\n"); write(DOC_END); write("\n"); } private void write(String str) throws IOException { out.write(str.getBytes(), 0, str.length()); } }
3、自定义SerDe or UDF?
在自定义InputFormat、OutputFomat后,我们已经将Split拆分为了 多个Row(文档)。
接下来,我们需要将Row拆分为Field。此时,我们有两个技术选择:
(1) 写一个UDF,将Row拆分为kv对,以Map<K, V>返回。此时,Table中只需定义一个STRING类型变量即可。
(2) 实现SerDe,将Row直接转化为Table对应的字段。
先来看一下UDF的这种方法,在Json解析等字段名不确定(或要经常变更) 的 应用场景下,这种方法还是比较适用的。
package com.coder4.hive; import java.util.Map; import org.apache.hadoop.hive.ql.exec.UDF; public class DocToMap extends UDF { public Map<String, String> evaluate(String s) { return Doc.deserialize(s); } }
其中Doc的deserilize只是自定义方法,无需重载方法或继承接口。
使用时的方法为:
CREATE EXTERNAL TABLE IF NOT EXISTS test_table ( doc STRING ) STORED AS INPUTFORMAT 'com.coder4.hive.DocFileInputFormat' OUTPUTFORMAT 'com.coder4.hive.DocFileOutputFormat' LOCATION '/user/heyuan.lhy/doc/' ; add jar /xxxxxxxx/hive-test.jar; CREATE TEMPORARY FUNCTION doc_to_map AS 'com.coder4.hive.DocToMap'; SELECT raw['id'], raw['name'] FROM ( SELECT doc_to_map(doc) raw FROM test_table ) t;
4、自定义SerDe
如果选择自定义SerDe,实现起来要略微麻烦一点。
这里主要参考了一篇Blog,和官方的源代码
http://svn.apache.org/repos/asf/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroSerDe.java
http://blog.cloudera.com/blog/2012/12/how-to-use-a-serde-in-apache-hive/
package com.coder4.hive; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde.Constants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; public class MySerDe extends AbstractSerDe { // params private List<String> columnNames = null; private List<TypeInfo> columnTypes = null; private ObjectInspector objectInspector = null; // seperator private String nullString = null; private String lineSep = null; private String kvSep = null; @Override public void initialize(Configuration conf, Properties tbl) throws SerDeException { // Read sep lineSep = "\n"; kvSep = "="; nullString = tbl.getProperty(Constants.SERIALIZATION_NULL_FORMAT, ""); // Read Column Names String columnNameProp = tbl.getProperty(Constants.LIST_COLUMNS); if (columnNameProp != null && columnNameProp.length() > 0) { columnNames = Arrays.asList(columnNameProp.split(",")); } else { columnNames = new ArrayList<String>(); } // Read Column Types String columnTypeProp = tbl.getProperty(Constants.LIST_COLUMN_TYPES); // default all string if (columnTypeProp == null) { String[] types = new String[columnNames.size()]; Arrays.fill(types, 0, types.length, Constants.STRING_TYPE_NAME); columnTypeProp = StringUtils.join(types, ":"); } columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProp); // Check column and types equals if (columnTypes.size() != columnNames.size()) { throw new SerDeException("len(columnNames) != len(columntTypes)"); } // Create ObjectInspectors from the type information for each column List<ObjectInspector> columnOIs = new ArrayList<ObjectInspector>(); ObjectInspector oi; for (int c = 0; c < columnNames.size(); c++) { oi = TypeInfoUtils .getStandardJavaObjectInspectorFromTypeInfo(columnTypes .get(c)); columnOIs.add(oi); } objectInspector = ObjectInspectorFactory .getStandardStructObjectInspector(columnNames, columnOIs); } @Override public Object deserialize(Writable wr) throws SerDeException { // Split to kv pair if (wr == null) return null; Map<String, String> kvMap = new HashMap<String, String>(); Text text = (Text) wr; for (String kv : text.toString().split(lineSep)) { String[] pair = kv.split(kvSep); if (pair.length == 2) { kvMap.put(pair[0], pair[1]); } } // Set according to col_names and col_types ArrayList<Object> row = new ArrayList<Object>(); String colName = null; TypeInfo type_info = null; Object obj = null; for (int i = 0; i < columnNames.size(); i++) { colName = columnNames.get(i); type_info = columnTypes.get(i); obj = null; if (type_info.getCategory() == ObjectInspector.Category.PRIMITIVE) { PrimitiveTypeInfo p_type_info = (PrimitiveTypeInfo) type_info; switch (p_type_info.getPrimitiveCategory()) { case STRING: obj = StringUtils.defaultString(kvMap.get(colName), ""); break; case LONG: case INT: try { obj = Long.parseLong(kvMap.get(colName)); } catch (Exception e) { } } } row.add(obj); } return row; } @Override public ObjectInspector getObjectInspector() throws SerDeException { return objectInspector; } @Override public SerDeStats getSerDeStats() { // Not suppourt yet return null; } @Override public Class<? extends Writable> getSerializedClass() { // Not suppourt yet return Text.class; } @Override public Writable serialize(Object arg0, ObjectInspector arg1) throws SerDeException { // Not suppourt yet return null; } }
最终的Hive定义为:
add jar /xxxxxxxx/hive-test.jar; CREATE EXTERNAL TABLE IF NOT EXISTS test_table ( id BIGINT, name STRING ) ROW FORMAT SERDE 'com.coder4.hive.MySerDe' STORED AS INPUTFORMAT 'com.coder4.hive.DocFileInputFormat' OUTPUTFORMAT 'com.coder4.hive.DocFileOutputFormat' LOCATION '/user/heyuan.lhy/doc/'
我们自定义的SerDe,会将每一个<DOC>内的文档,根据k=v切分,若key name为id,name,则将其置入对应的字段中。
5、测试,效果:
首先,我们在hdfs目录/user/heyuan.lhy/doc/ 放置了一个文件,内容如下:
<DOC> id=1 name=a </DOC> <DOC> id=2 name=b </DOC> <DOC> id=3 name=c </DOC> <DOC> id=4 name=d </DOC>
在如4中,定义了表的schema后,我们来SELECT。
SELECT * FROM test_table; OK 1 a 2 b 3 c 4 d
可以看到,id和name字段被分别解析出来了。
由于我们的SerDe没有实现serialize方法,因此无法实现写入。
如果有需要,可以使用UDF + Map的方法,完成。
谢谢。