数据、例子,修改自:《Hive with Python example》
首先来看一下数据:
hive> select * from test; OK 1 3 2 2 3 1
假设,我们要输出每一列的md5值。在目前的hive中是没有这个udf的。
我们看一下Python的代码:
#!/home/tops/bin/python import sys import hashlib for line in sys.stdin: line = line.strip() arr = line.split() md5_arr = [] for a in arr: md5_arr.append(hashlib.md5(a).hexdigest()) print "\t".join(md5_arr)
在Hive中,使用脚本,首先要将他们加入:
add file /xxxx/test.py
然后,在调用时,使用TRANSFORM语法。
SELECT TRANSFORM (col1, col2) USING './test.py' AS (new1, new2) FORM test;
这里,我们使用了AS,指定输出的若干个列,分别对应到哪个列名。如果省略这句,则Hive会将第1个tab前的结果作为key,后面其余作为value。
这里有一个小坑:有时候,我们结合INSERT OVERWRITE使用上述TRANSFORM,而目标表,其分割副可能不是\t。但是请牢记:TRANSFORM的分割符号,传入、传出脚本的,永远是\t。不要考虑外面其他的分割符号!
最后,解释一下MAP、REDUCE。
在有的Hive语句中,大家可能会看到SELECT MAP (...) USING 'xx.py'这样的语法。
然而,在Hive中,MAP、REDUCE只不过是TRANSFORM的别名,Hive不保证一定会在map/reduce中调用脚本。看看官方文档是怎么说的:
Formally, MAP ... and REDUCE ... are syntactic transformations of SELECT TRANSFORM ( ... ). In other words, they serve as comments or notes to the reader of the query. BEWARE: Use of these keywords may be dangerous as (e.g.) typing "REDUCE" does not force a reduce phase to occur and typing "MAP" does not force a new map phase!
所以、混用map reduce语法关键字,甚至会引起混淆,所以建议大家还是都用TRANSFORM吧。
友情提示:如果脚本不是Python,而是awk、sed等系统内置命令,可以直接使用,而不用add file。
2014.03.04更新:
如果表中有MAP,ARRAY等复杂类型,怎么用TRANSFORM生成?
例如:
CREATE TABLE features ( id BIGINT, norm_features MAP<STRING, FLOAT> );
答案是,要在脚本的输出中,对特殊字段按照HDFS文件中的格式输出即可。
例如,以上面的表结构为例,每行输出应为:
1^Ifeature1^C1.0^Bfeature2^C2.0
其中^I是tab键,这是TRANSFORM要求的分割符号。^B和^C是Hive存储时MAP类型的KV分割符。
另外,在Hive的TRANSFORM语句的时候,要注意AS中加上类型声明:
SELECT TRANSFORM(stuff) USING 'script' AS (thing1 INT, thing2 MAP<STRING, FLOAT>)
SELECT
TRANSFORM (col1, col2)
USING './test.py'
AS (new1, new2)
FORM
test;
FORM-->FROM
#!/usr/bin/python
import sys
import hashlib
for line in sys.stdin:
line = line.strip()
arr = line.split()
md5_arr = []
for a in arr:
md5_arr.append(a)
md5_arr.append(' ')
md5_arr.append(hashlib.md5(a).hexdigest())
print('\t'.join(md5_arr))
# hive>add file md5.py
# select transform(id,name) using './md5.py' from t_task;