码迷,mamicode.com
首页 > 其他好文 > 详细

Spark开发_Spark的UDF开发

时间:2020-11-26 15:06:55      阅读:6      评论:0      收藏:0      [点我收藏+]

标签:两种   art   版本   imp   原理   use   nal   default   class   

Spark中的UDF

Spark1.6只能创建临时UDF,不支持创建持久化的UDF。
    从Spark-2.0开始,SparkSQL支持持久化的UDF,目前看来是支持UDAF

Spark中的UDF 过程

(1)自定义UDF类,实现UDF1/2/3....22中的接口之一,其中UDF后跟的数字,
 比如UDF1、UDF2;表示输入参数的个数,1表示有一个入参,2表示有两个入参,
  最多可传入22个输入参数 
  实现 call()方法
  两种方式 : 过匿名函数 和  通过实名函数
(2)注册UDF函数:  SparkSQL UDF 两种方式:udf() 和 register()   	    
  01.
	Spark1.x:  sqlContext.udf.register
	Spark2.x:       spark.udf.register
	   org.apache.spark.sql  SparkSession
	     def udf: UDFRegistration = sessionState.udfRegistration
  02. DataFrame的udf方法在 org.apache.spark.sql.functions 里
  org.apache.spark.sql
        functions.{col, lit, udf}
     spark.sql.function.udf() 方法 此时注册的方法,对外部可见
	   def udf(f: AnyRef, dataType: DataType): UserDefinedFunction = { UserDefinedFunction(f, dataType, None)}
 (3) 目前使用UDAF可以在SQL中,而Spark UDF使用,在Spark 任务中可以执行,在SparkSQL任务中执行报错

示例代码

 import org.apache.spark.sql.api.java.UDF1;
 public class StringLenUDF implements UDF1<String , Integer> {
 @Override
 public Integer call(String  value) throws Exception {
         Integer data = value.length();
         return data;
         }
 }
(3)在Spark中使用
        spark.udf().register("trans_len",new StringLenUDF(),DataTypes.IntegerType);
        String sqlTex = "select date,trans_len(date ) mem_distinct_cnt " +
                "from mem_data_table ";
        spark.sql(sqlTex).show();

(3) SQL 中使用UDF 
打包:
  ADD  jar sparkudf.jar;
 CREATE  TEMPORARY FUNCTION  trans_len AS  ‘com.test.txt.udf.StringLenUDF‘;
 select t1.data,trans_len(t1.data) as uid_bitmap_byte
 from (
 select 100 as user_id,  ‘2020‘ as data
 union all
 select 200  as user_id, ‘2019‘ as data)  t1
注释:
 情景: Spark SQL 在SQL中使用 Spark的UDF报错
 报错: Error in query: No handler for UDF/UDAF/UDTF ‘com.test.txt.udf.LenCountUDF‘; line 3 pos 15
 原因:
      SessionCatalog calls registerFunction to add a function to function registry. 
	  However, makeFunctionExpression supports only UserDefinedAggregateFunction.
 参考:
    Default SessionCatalog should support UDFs https://issues.apache.org/jira/browse/SPARK-25334
   Hive中Binary类型于0.8版本以上开始支持。
   https://stackoverflow.com/questions/52164488/spark-hive-udf-no-handler-for-udaf-analysis-exception

底层原理

报错的 org.apache.spark.sql.hive.HiveSessionCatalog
    // Construct a [[FunctionBuilder]] based on the provided class that represents a function.
   private def makeFunctionBuilder
    throw new AnalysisException(s"No handler for Hive UDF ‘${clazz.getCanonicalName}‘")
org.apache.spark.sql.catalyst.catalog.SessionCatalog
    SessionCatalog calls registerFunction to add a function to function registry.
  
    //Registers a temporary or permanent function into a session-specific [[FunctionRegistry]]
      def registerFunction(
    //Drop a temporary function.
       def dropTempFunction
介绍:
  Spark SQL 中的 Catalog 体系实现以 SessionCatalog 为主体,通过 SparkSession (Spark 程序入口)提供给外部调用。
  一般一个 SparkSession 对应一个 SessionCatalog。 
  本质上, Session Catalog 起到了一个代理的作用,对底层的元数据信息、临时表信息、视图信息和函数 信息进行了封装。
  HiveSessionCatalog 继承了Spark的默认SessionCatalo
 /*** A catalog for looking up user defined functions, used by an [[Analyzer]].*/
   trait FunctionRegistry {

参考:

 SparkSession解析SessionCatalog、SharedState和SessionState https://blog.csdn.net/qq_41775852/article/details/105157879

Spark开发_Spark的UDF开发

标签:两种   art   版本   imp   原理   use   nal   default   class   

原文地址:https://www.cnblogs.com/ytwang/p/14023589.html

(0)
(0)
   
举报
评论 一句话评论(0
登录后才能评论!
© 2014 mamicode.com 版权所有  联系我们:gaon5@hotmail.com
迷上了代码!