杰瑞科技汇

spark python udf 函数

UDF 允许用户使用自己编写的 Python 函数来处理 DataFrame 中的列数据,这在 Spark 提供的内置函数无法满足复杂业务逻辑时非常有用。

spark python udf 函数-图1
(图片来源网络,侵删)

下面我将从以下几个方面进行深入讲解:

  1. 什么是 UDF?
  2. 为什么以及何时使用 UDF?
  3. 如何创建和使用 UDF (完整示例)
  4. 重要注意事项与性能优化
  5. Pandas UDF (Vectorized UDF) - 更高效的替代方案

什么是 UDF?

UDF,即用户自定义函数,是 Spark 提供的一种机制,允许用户将 Python 函数应用于 DataFrame 的一列或多列数据。

你可以写一个普通的 Python 函数,然后通过 pyspark.sql.functions.udf 将其“注册”为一个 Spark UDF,之后,你就可以像使用 Spark 内置函数(如 col(), length())一样,在 select(), withColumn(), filter() 等操作中使用这个 UDF。

核心工作流程:

spark python udf 函数-图2
(图片来源网络,侵删)
  1. 定义 Python 函数:编写一个接受一个或多个参数并返回一个值的普通 Python 函数。
  2. 注册 UDF:使用 udf() 函数将你的 Python 函数包装成一个 Spark UDF,这个过程会返回一个 Column 对象。
  3. 应用 UDF:在 DataFrame 的转换操作(如 withColumn)中应用这个 UDF。

为什么以及何时使用 UDF?

使用场景:

当 Spark 内置函数(在 pyspark.sql.functions 中)无法满足你的数据处理需求时。

  • 复杂的字符串处理:如根据特定正则表达式提取并转换信息。
  • 自定义业务逻辑:如根据多个列的值计算一个复杂的业务评分。
  • 调用外部库:如使用 re, datetime, json 等标准库或第三方库进行计算。
  • 数据清洗和转换:如将非标准格式的日期字符串转换为标准格式。

为什么不总是使用 UDF?

UDDF 的主要缺点是 性能,因为 UDF 会在 Python 解释器中执行,而 Spark 是基于 JVM 的,这意味着 Spark 和 Python 之间会有大量的数据序列化和反序列化开销(将 JVM 的数据转换为 Python 对象,反之亦然),这被称为 "Python UDF 的性能瓶颈"

spark python udf 函数-图3
(图片来源网络,侵删)

应优先使用 Spark 内置函数,只有在万不得已时才使用 UDF。


如何创建和使用 UDF (完整示例)

让我们通过一个完整的例子来学习 UDF 的使用。

场景:我们有一个包含用户姓名的 DataFrame,我们想根据姓名的长度给用户打一个标签。

步骤 1: 创建 SparkSession 和 DataFrame

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
# 1. 创建 SparkSession
spark = SparkSession.builder \
    .appName("Python UDF Example") \
    .getOrCreate()
# 2. 创建示例数据
data = [("Alice",), ("Bob",), ("Charlie",), ("David",)]
df = spark.createDataFrame(data, ["name"])
df.show()
+--------+
|    name|
+--------+
|   Alice|
|     Bob|
| Charlie|
|   David|
+--------+

步骤 2: 定义 Python 函数

这是一个普通的 Python 函数,它接受一个字符串 name 作为输入,并返回一个字符串标签。

def get_name_label(name):
    """根据姓名长度返回标签"""
    if len(name) > 6:
        return "long_name"
    elif len(name) > 3:
        return "medium_name"
    else:
        return "short_name"

步骤 3: 注册 UDF

使用 pyspark.sql.functions.udf 来注册函数,这里我们指定了 UDF 的返回类型为 StringType(),这是一个好习惯,可以帮助 Spark 优化。

# 注册 Python 函数为 Spark UDF
# 第一个参数是 Python 函数
# 第二个参数是返回值的类型 (可选,但推荐)
name_label_udf = udf(get_name_label, StringType())

步骤 4: 在 DataFrame 中应用 UDF

我们可以像使用内置函数一样使用 name_label_udf,最常用的方式是在 withColumn 中创建一个新列。

# 使用 withColumn 应用 UDF 来创建新列
df_with_label = df.withColumn("name_label", name_label_udf(col("name")))
df_with_label.show()
+--------+----------+
|    name|name_label|
+--------+----------+
|   Alice|short_name|
|     Bob|short_name|
| Charlie| long_name|
|   David|medium_name|
+--------+----------+

代码总结

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
# 1. 初始化
spark = SparkSession.builder.appName("UDFExample").getOrCreate()
# 2. 创建DF
data = [("Alice",), ("Bob",), ("Charlie",), ("David",)]
df = spark.createDataFrame(data, ["name"])
# 3. 定义Python函数
def get_name_label(name):
    if len(name) > 6:
        return "long_name"
    elif len(name) > 3:
        return "medium_name"
    else:
        return "short_name"
# 4. 注册UDF
name_label_udf = udf(get_name_label, StringType())
# 5. 应用UDF
df_with_label = df.withColumn("name_label", name_label_udf(col("name")))
df_with_label.show(truncate=False)
spark.stop()

重要注意事项与性能优化

1 UDF 的性能问题

如前所述,Python UDF 性能不佳,为了理解原因,你需要了解 Arrow

  • Arrow:一个跨语言的内存格式,它像一座桥梁,让数据可以在 JVM (Spark) 和 Python 进程之间高效传递,而无需进行繁琐的序列化/反序列化。

2 如何启用 Arrow 来提升 UDF 性能?

从 Spark 2.3 开始,Spark 引入了 Pandas UDF (或称为 Vectorized UDF),它利用 Arrow 来批量处理数据,性能比传统 UDF 提升了几个数量级。

即使使用传统 UDF,启用 Arrow 也能带来一些性能提升。

启用 Arrow 的方法:

在创建 SparkSession 时设置 spark.sql.execution.arrow.pyspark.enabledTrue

spark = SparkSession.builder \
    .appName("Arrow Enabled UDF") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

注意:启用 Arrow 后,Arrow 不可用或数据格式不兼容,Spark 会自动回退到标准的序列化方式,不会导致任务失败。

3 缓存 UDF 结果

如果同一个 UDF 会被多次使用,可以将其结果缓存起来。

from pyspark.sql.functions import udf
from functools import lru_cache
@lru_cache(maxsize=None) # Python 内置的缓存装饰器
def cached_get_name_label(name):
    # ... 函数逻辑 ...
    pass
# 然后将这个被缓存的函数注册为 UDF
cached_udf = udf(cached_get_name_label, StringType())

这在函数计算成本高且输入值重复率高时非常有效。


Pandas UDF (Vectorized UDF) - 更高效的替代方案

对于追求极致性能的场景,强烈推荐使用 Pandas UDF

核心思想: Pandas UDF 一次处理一整列数据(一个 Pandas Series),而不是像传统 UDF 那样一次只处理一行,这种“向量化”操作极大地减少了 Python 解释器和 JVM 之间的通信开销,从而大幅提升性能。

如何定义 Pandas UDF?

你需要使用 pyspark.sql.functions.pandas_udf 装饰器,并且函数的输入和输出类型通常是 pandas.Series

示例: 使用 Pandas UDF 实现和上面完全相同的功能。

from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
# 使用 @pandas_udf 装饰器
# 指定函数签名: 输入类型 -> 输出类型
# type="string" 表示输入和输出都是字符串类型的 Pandas Series
@pandas_udf('string')
def pandas_get_name_label(names: pd.Series) -> pd.Series:
    """
    这是一个 Pandas UDF,它接收一个 Pandas Series,返回一个 Pandas Series。
    """
    def label_func(name):
        if len(name) > 6:
            return "long_name"
        elif len(name) > 3:
            return "medium_name"
        else:
            return "short_name"
    # Pandas 的 apply 方法会逐个元素应用函数
    return names.apply(label_func)
# 应用 Pandas UDF
df_with_pandas_label = df.withColumn("pandas_name_label", pandas_get_name_label(col("name")))
df_with_pandas_label.show()
+--------+------------------+
|    name|pandas_name_label|
+--------+------------------+
|   Alice|         short_name|
|     Bob|         short_name|
| Charlie|          long_name|
|   David|        medium_name|
+--------+------------------+

Pandas UDF vs. 传统 UDF

特性 传统 UDF Pandas UDF (Vectorized UDF)
处理单元 单行数据 整列数据 (Pandas Series)
性能 较慢 (Python 解释器开销大) 非常快 (利用 Arrow,批量处理)
Python 函数 普通函数,输入/输出为标量 使用 @pandas_udf 装饰器,输入/输出为 pandas.Series
依赖 仅需 Python 环境 需要 pandas 库和 pyarrow
适用场景 逻辑极其复杂,无法用 Pandas 实现时 绝大多数情况下的首选,尤其是性能敏感型任务

类型 适用场景 性能 推荐度
Spark 内置函数 简单、常见的操作 (如 col, sum, when) 最高 ⭐⭐⭐⭐⭐ 首选
Pandas UDF 复杂的 Python 逻辑,需要高性能 很高 (利用 Arrow) ⭐⭐⭐⭐⭐ 强烈推荐
传统 Python UDF 极其复杂的逻辑,且无法用 Pandas 实现 较低 ⭐⭐ 尽量避免

最佳实践建议:

  1. 优先使用 Spark 内置函数:始终先检查 pyspark.sql.functions 是否有你需要的功能。
  2. 其次考虑 Pandas UDF:如果内置函数不够用,并且你对性能有要求,请使用 Pandas UDF。
  3. 最后才考虑传统 UDF:仅在 Pandas UDF 无法实现(依赖某些无法在 Pandas Series 上操作的库)且性能要求不高的场景下,才使用传统 Python UDF,并务必启用 Arrow。
  4. 为 UDF 指定返回类型:在注册 UDF 时,明确指定返回类型(如 StringType(), IntegerType()),有助于 Spark 进行优化。
  5. 注意空值处理:UDF 中的 Python 函数需要能够处理 Spark 中的 None 值,否则可能会抛出异常,你可以在函数内部添加 if name is None: return None 这样的判断。
分享:
扫描分享到社交APP
上一篇
下一篇