如何使用JAVA在Spark DataFrame上调用UDF?

这里类似的问题,但没有足够的点来评论那里。

根据最新的Spark文档,可以以两种不同的方式使用,一种用于SQL,另一种用于DataFrame。我发现了多个如何使用与sql一起使用的例子,但一直找不到任何关于如何直接在DataFrame上使用的例子。udfudfudf

o.p.在上面链接的问题上提供的解决方案使用根据Spark Java API文档在Spark 2.0中已经并将被删除的解决方案。在那里,它说:__callUDF()___deprecated_

“因为它与 udf() 是冗余的”

所以这意味着我应该能够用它来计算我的,但我不知道该怎么做。我没有偶然发现任何阐明Java-Spark程序语法的东西。我错过了什么?__udf()__udf

import org.apache.spark.sql.api.java.UDF1;
.
.    
UDF1 mode = new UDF1<String[], String>() {
    public String call(final String[] types) throws Exception {
        return types[0];
    }
};

sqlContext.udf().register("mode", mode, DataTypes.StringType);
df.???????? how do I call my udf (mode) on a given column of my DataFrame df?

答案 1

火花>= 2.3

Scala风格可以直接调用:udf

import static org.apache.spark.sql.functions.*;
import org.apache.spark.sql.expressions.UserDefinedFunction;

UserDefinedFunction mode = udf(
  (Seq<String> ss) -> ss.headOption(), DataTypes.StringType
);

df.select(mode.apply(col("vs"))).show();

火花< 2.3

即使我们假设您的UDF是有用的,并且不能被简单的调用替换,它也具有不正确的签名。数组列是使用 Scala 而不是普通的 Java 数组公开的,因此您必须调整签名:getItemWrappedArray

UDF1 mode = new UDF1<Seq<String>, String>() {
  public String call(final Seq<String> types) throws Exception {
    return types.headOption();
  }
};

如果 UDF 已注册:

sqlContext.udf().register("mode", mode, DataTypes.StringType);

你可以简单地使用callUDF(这是1.5中引入的一个新功能)来调用它的名字:

df.select(callUDF("mode", col("vs"))).show();

您也可以在以下位置使用它:selectExprs

df.selectExpr("mode(vs)").show();

答案 2

推荐