UDF 允许用户使用自己编写的 Python 函数来处理 DataFrame 中的列数据,这在 Spark 提供的内置函数无法满足复杂业务逻辑时非常有用。

下面我将从以下几个方面进行深入讲解:
- 什么是 UDF?
- 为什么以及何时使用 UDF?
- 如何创建和使用 UDF (完整示例)
- 重要注意事项与性能优化
- Pandas UDF (Vectorized UDF) - 更高效的替代方案
什么是 UDF?
UDF,即用户自定义函数,是 Spark 提供的一种机制,允许用户将 Python 函数应用于 DataFrame 的一列或多列数据。
你可以写一个普通的 Python 函数,然后通过 pyspark.sql.functions.udf 将其“注册”为一个 Spark UDF,之后,你就可以像使用 Spark 内置函数(如 col(), length())一样,在 select(), withColumn(), filter() 等操作中使用这个 UDF。
核心工作流程:

- 定义 Python 函数:编写一个接受一个或多个参数并返回一个值的普通 Python 函数。
- 注册 UDF:使用
udf()函数将你的 Python 函数包装成一个 Spark UDF,这个过程会返回一个Column对象。 - 应用 UDF:在 DataFrame 的转换操作(如
withColumn)中应用这个 UDF。
为什么以及何时使用 UDF?
使用场景:
当 Spark 内置函数(在 pyspark.sql.functions 中)无法满足你的数据处理需求时。
- 复杂的字符串处理:如根据特定正则表达式提取并转换信息。
- 自定义业务逻辑:如根据多个列的值计算一个复杂的业务评分。
- 调用外部库:如使用
re,datetime,json等标准库或第三方库进行计算。 - 数据清洗和转换:如将非标准格式的日期字符串转换为标准格式。
为什么不总是使用 UDF?
UDDF 的主要缺点是 性能,因为 UDF 会在 Python 解释器中执行,而 Spark 是基于 JVM 的,这意味着 Spark 和 Python 之间会有大量的数据序列化和反序列化开销(将 JVM 的数据转换为 Python 对象,反之亦然),这被称为 "Python UDF 的性能瓶颈"。

应优先使用 Spark 内置函数,只有在万不得已时才使用 UDF。
如何创建和使用 UDF (完整示例)
让我们通过一个完整的例子来学习 UDF 的使用。
场景:我们有一个包含用户姓名的 DataFrame,我们想根据姓名的长度给用户打一个标签。
步骤 1: 创建 SparkSession 和 DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
# 1. 创建 SparkSession
spark = SparkSession.builder \
.appName("Python UDF Example") \
.getOrCreate()
# 2. 创建示例数据
data = [("Alice",), ("Bob",), ("Charlie",), ("David",)]
df = spark.createDataFrame(data, ["name"])
df.show()
+--------+
| name|
+--------+
| Alice|
| Bob|
| Charlie|
| David|
+--------+
步骤 2: 定义 Python 函数
这是一个普通的 Python 函数,它接受一个字符串 name 作为输入,并返回一个字符串标签。
def get_name_label(name):
"""根据姓名长度返回标签"""
if len(name) > 6:
return "long_name"
elif len(name) > 3:
return "medium_name"
else:
return "short_name"
步骤 3: 注册 UDF
使用 pyspark.sql.functions.udf 来注册函数,这里我们指定了 UDF 的返回类型为 StringType(),这是一个好习惯,可以帮助 Spark 优化。
# 注册 Python 函数为 Spark UDF # 第一个参数是 Python 函数 # 第二个参数是返回值的类型 (可选,但推荐) name_label_udf = udf(get_name_label, StringType())
步骤 4: 在 DataFrame 中应用 UDF
我们可以像使用内置函数一样使用 name_label_udf,最常用的方式是在 withColumn 中创建一个新列。
# 使用 withColumn 应用 UDF 来创建新列
df_with_label = df.withColumn("name_label", name_label_udf(col("name")))
df_with_label.show()
+--------+----------+
| name|name_label|
+--------+----------+
| Alice|short_name|
| Bob|short_name|
| Charlie| long_name|
| David|medium_name|
+--------+----------+
代码总结
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType
# 1. 初始化
spark = SparkSession.builder.appName("UDFExample").getOrCreate()
# 2. 创建DF
data = [("Alice",), ("Bob",), ("Charlie",), ("David",)]
df = spark.createDataFrame(data, ["name"])
# 3. 定义Python函数
def get_name_label(name):
if len(name) > 6:
return "long_name"
elif len(name) > 3:
return "medium_name"
else:
return "short_name"
# 4. 注册UDF
name_label_udf = udf(get_name_label, StringType())
# 5. 应用UDF
df_with_label = df.withColumn("name_label", name_label_udf(col("name")))
df_with_label.show(truncate=False)
spark.stop()
重要注意事项与性能优化
1 UDF 的性能问题
如前所述,Python UDF 性能不佳,为了理解原因,你需要了解 Arrow。
- Arrow:一个跨语言的内存格式,它像一座桥梁,让数据可以在 JVM (Spark) 和 Python 进程之间高效传递,而无需进行繁琐的序列化/反序列化。
2 如何启用 Arrow 来提升 UDF 性能?
从 Spark 2.3 开始,Spark 引入了 Pandas UDF (或称为 Vectorized UDF),它利用 Arrow 来批量处理数据,性能比传统 UDF 提升了几个数量级。
即使使用传统 UDF,启用 Arrow 也能带来一些性能提升。
启用 Arrow 的方法:
在创建 SparkSession 时设置 spark.sql.execution.arrow.pyspark.enabled 为 True。
spark = SparkSession.builder \
.appName("Arrow Enabled UDF") \
.config("spark.sql.execution.arrow.pyspark.enabled", "true") \
.getOrCreate()
注意:启用 Arrow 后,Arrow 不可用或数据格式不兼容,Spark 会自动回退到标准的序列化方式,不会导致任务失败。
3 缓存 UDF 结果
如果同一个 UDF 会被多次使用,可以将其结果缓存起来。
from pyspark.sql.functions import udf
from functools import lru_cache
@lru_cache(maxsize=None) # Python 内置的缓存装饰器
def cached_get_name_label(name):
# ... 函数逻辑 ...
pass
# 然后将这个被缓存的函数注册为 UDF
cached_udf = udf(cached_get_name_label, StringType())
这在函数计算成本高且输入值重复率高时非常有效。
Pandas UDF (Vectorized UDF) - 更高效的替代方案
对于追求极致性能的场景,强烈推荐使用 Pandas UDF。
核心思想: Pandas UDF 一次处理一整列数据(一个 Pandas Series),而不是像传统 UDF 那样一次只处理一行,这种“向量化”操作极大地减少了 Python 解释器和 JVM 之间的通信开销,从而大幅提升性能。
如何定义 Pandas UDF?
你需要使用 pyspark.sql.functions.pandas_udf 装饰器,并且函数的输入和输出类型通常是 pandas.Series。
示例: 使用 Pandas UDF 实现和上面完全相同的功能。
from pyspark.sql.functions import pandas_udf, PandasUDFType
import pandas as pd
# 使用 @pandas_udf 装饰器
# 指定函数签名: 输入类型 -> 输出类型
# type="string" 表示输入和输出都是字符串类型的 Pandas Series
@pandas_udf('string')
def pandas_get_name_label(names: pd.Series) -> pd.Series:
"""
这是一个 Pandas UDF,它接收一个 Pandas Series,返回一个 Pandas Series。
"""
def label_func(name):
if len(name) > 6:
return "long_name"
elif len(name) > 3:
return "medium_name"
else:
return "short_name"
# Pandas 的 apply 方法会逐个元素应用函数
return names.apply(label_func)
# 应用 Pandas UDF
df_with_pandas_label = df.withColumn("pandas_name_label", pandas_get_name_label(col("name")))
df_with_pandas_label.show()
+--------+------------------+
| name|pandas_name_label|
+--------+------------------+
| Alice| short_name|
| Bob| short_name|
| Charlie| long_name|
| David| medium_name|
+--------+------------------+
Pandas UDF vs. 传统 UDF
| 特性 | 传统 UDF | Pandas UDF (Vectorized UDF) |
|---|---|---|
| 处理单元 | 单行数据 | 整列数据 (Pandas Series) |
| 性能 | 较慢 (Python 解释器开销大) | 非常快 (利用 Arrow,批量处理) |
| Python 函数 | 普通函数,输入/输出为标量 | 使用 @pandas_udf 装饰器,输入/输出为 pandas.Series |
| 依赖 | 仅需 Python 环境 | 需要 pandas 库和 pyarrow 库 |
| 适用场景 | 逻辑极其复杂,无法用 Pandas 实现时 | 绝大多数情况下的首选,尤其是性能敏感型任务 |
| 类型 | 适用场景 | 性能 | 推荐度 |
|---|---|---|---|
| Spark 内置函数 | 简单、常见的操作 (如 col, sum, when) |
最高 | ⭐⭐⭐⭐⭐ 首选 |
| Pandas UDF | 复杂的 Python 逻辑,需要高性能 | 很高 (利用 Arrow) | ⭐⭐⭐⭐⭐ 强烈推荐 |
| 传统 Python UDF | 极其复杂的逻辑,且无法用 Pandas 实现 | 较低 | ⭐⭐ 尽量避免 |
最佳实践建议:
- 优先使用 Spark 内置函数:始终先检查
pyspark.sql.functions是否有你需要的功能。 - 其次考虑 Pandas UDF:如果内置函数不够用,并且你对性能有要求,请使用 Pandas UDF。
- 最后才考虑传统 UDF:仅在 Pandas UDF 无法实现(依赖某些无法在 Pandas Series 上操作的库)且性能要求不高的场景下,才使用传统 Python UDF,并务必启用 Arrow。
- 为 UDF 指定返回类型:在注册 UDF 时,明确指定返回类型(如
StringType(),IntegerType()),有助于 Spark 进行优化。 - 注意空值处理:UDF 中的 Python 函数需要能够处理 Spark 中的
None值,否则可能会抛出异常,你可以在函数内部添加if name is None: return None这样的判断。
