杰瑞科技汇

Python开发Spark Streaming要注意什么?

Spark Streaming 是 Spark Core API 的一个扩展,它支持可扩展、高吞吐、容错的实时数据流处理,其核心抽象是 DStream (Discretized Stream),它代表一个持续不断的数据流,可以看作是一个 RDD 序列。

Python开发Spark Streaming要注意什么?-图1
(图片来源网络,侵删)

下面我将从核心概念、开发步骤、完整代码示例到最佳实践,为你提供一个全面的指南。


核心概念

在开始编码前,理解几个核心概念至关重要:

1 DStream (离散化流)

DStream 是 Spark Streaming 提供的抽象,它代表一个持续不断的数据流,它内部是由一系列的 RDD 组成的,对 DStream 的任何操作,最终都会转化为对底层 RDD 的批处理操作。

2 批处理间隔

Spark Streaming 不会一条一条地处理数据,而是将数据流切分成一个个小的 批次,这个切分的时间间隔就是 批处理间隔,1 秒、5 秒、10 秒,这个值是你在创建 StreamingContext 时指定的,它直接影响应用的延迟和吞吐量。

Python开发Spark Streaming要注意什么?-图2
(图片来源网络,侵删)

3 输入源

数据可以从哪里来,Spark Streaming 支持多种输入源:

  • 基础源: 套接字连接、文件、以及 HDFS 上的文件。
  • 高级源: Kafka, Flume, Kinesis, Twitter 等,这些源需要额外的依赖库。

4 转换操作

和 RDD 类似,DStream 也支持两种类型的操作:

  • 无状态转换: 对每个批次的数据独立进行操作,不依赖之前的数据。map(), filter(), reduceByKey() 等。
  • 有状态转换: 需要跨批次维护状态,最典型的就是 窗口操作,如 reduceByKeyAndWindow()

5 输出操作

将处理后的结果输出到外部系统,如控制台、数据库、HDFS 等,输出操作是 行动,会触发实际的计算。


开发环境准备

在开始之前,确保你的环境已经配置好。

Python开发Spark Streaming要注意什么?-图3
(图片来源网络,侵删)

1 安装 PySpark

如果你的 Spark 已经安装,PySpark 通常会自带,如果没有,可以通过 pip 安装:

pip install pyspark

2 添加额外依赖

如果你使用的是高级源(如 Kafka),你需要下载对应的 JAR 包,并在运行 spark-submit 时指定。

以 Kafka 为例: 你需要下载 spark-sql-kafka-0-10_2.12-x.x.x.x.jar,你可以在你的 Spark 安装目录的 jars 文件夹中找到它,或者从 Maven Central 下载。


Spark Streaming 开发基本步骤

一个标准的 Spark Streaming 应用程序遵循以下步骤:

  1. 初始化: 创建一个 StreamingContext,这是所有 Streaming 功能的入口。
  2. 创建输入 DStream: 连接到数据源,创建一个输入 DStream。
  3. 定义转换: 对 DStream 应用转换操作(如 map, filter, reduceByKey)来定义你的业务逻辑。
  4. 输出结果: 调用输出操作(如 print(), saveAsTextFiles())来启动计算并输出结果。
  5. 启动流: 调用 ssc.start() 来启动接收和处理数据。
  6. 等待终止: 调用 ssc.awaitTermination() 来等待计算被手动停止或发生错误。

完整代码示例

我们将通过两个最经典的例子来展示开发过程:套接字字数统计基于 Kafka 的结构化流处理

套接字字数统计 (Socket WordCount)

这个例子是入门的 "Hello, World",它会监听一个指定端口上的网络数据,并实时统计单词出现的频率。

代码 (socket_wordcount.py)

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 1. 初始化 SparkContext 和 StreamingContext
# SparkContext 是 Spark 的基础
# StreamingContext 的批处理间隔设置为 5 秒
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 5) # 5秒的批处理间隔
# 2. 创建输入 DStream
# 连接到 localhost:9999 的数据源
# Spark Streaming 会持续不断地从该套接字读取数据
lines = ssc.socketTextStream("localhost", 9999)
# 3. 定义转换操作
# a. 将每一行文本按空格分割成单词
words = lines.flatMap(lambda line: line.split(" "))
# b. 将每个单词映射为 (word, 1) 的形式
pairs = words.map(lambda word: (word, 1))
# c. 以 "word" 为 key,进行 reduce 操作,计算每个批次内每个单词的总数
#    这是一个有状态的窗口操作吗?不,这是一个无状态的 reduceByKey。
#    它只对当前 RDD(即当前批次的数据)进行计算。
word_counts = pairs.reduceByKey(lambda x, y: x + y)
# 4. 输出结果
# print() 会将每个批次的结果打印到控制台
# 注意:这是一个输出操作,会触发实际的计算
word_counts.pprint() # pprint 是 pretty print 的缩写
# 5. 启动流
print("Starting the streaming context. Press Ctrl+C to stop.")
ssc.start()
# 6. 等待终止
ssc.awaitTermination() # 阻塞主线程,直到流被停止

如何运行这个示例?

  1. 启动一个 NetCat (nc) 服务器作为数据源: 在你的终端中运行以下命令,它会监听 9999 端口,并将你输入的任何内容发送出去。

    # 在 macOS 上
    nc -lk 9999
    # 在 Linux 上
    nc -l -p 9999
  2. 运行 Python 脚本: 在另一个终端中,运行你的 Python 脚本。

    python socket_wordcount.py
  3. 观察结果: 回到 NetCat 终端,输入一些文本,

    hello spark
    hello world
    spark is great

    你会在运行脚本的终端中看到每 5 秒输出一次的结果,格式如下:

    Time: 2025-10-27 10:30:00
    (hello, 2)
    (spark, 2)
    (world, 1)
    (is, 1)
    (great, 1)

基于 Kafka 的结构化流处理 (Structured Streaming)

从 Spark 2.0 开始,推荐使用 Structured Streaming API,它基于 DataFrame/Dataset,提供了更强大、更统一的流处理能力,它将流视为一个不断追加的表。

准备工作: 确保你有 Kafka 环境,并创建一个名为 wordcount_topic 的 topic。

# 假设 Kafka 安装在 /opt/kafka
# 1. 启动 Zookeeper 和 Kafka Broker (如果未启动)
# 2. 创建 topic
bin/kafka-topics.sh --create --topic wordcount_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1

代码 (kafka_wordcount.py)

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StructType, StructField, StringType
# 1. 初始化 SparkSession
# 注意:对于 Kafka,需要通过 .config 添加依赖的 JAR 包
spark = SparkSession \
    .builder \
    .appName("StructuredKafkaWordCount") \
    .config("spark.jars", "/path/to/your/spark-sql-kafka-0-10_2.12-3.3.x.jar") # !!! 请替换为你的 JAR 包路径 !!! \
    .getOrCreate()
# 2. 定义输入源:从 Kafka 读取数据
# schema 定义了从 Kafka 消息中读取的数据结构
schema = StructType([
    StructField("value", StringType(), True)
])
# 读取 Kafka topic
lines = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "wordcount_topic") \
    .option("startingOffsets", "latest") # 从最新的消息开始读取 \
    .load() \
    .selectExpr("CAST(value AS STRING)") # 将 Kafka 的 value (二进制) 转为字符串
# 3. 定义转换操作 (使用 DataFrame API)
# a. 将一行文本按空格分割成多个单词
words = lines.select(
    explode(
        split(lines.value, " ")
    ).alias("word")
)
# b. 按 word 分组,并计数
wordCounts = words.groupBy("word").count()
# 4. 输出结果
# 将结果输出到控制台
# outputMode("complete") 表示输出所有聚合后的结果
query = wordCounts.writeStream \
    .outputMode("complete") \
    .format("console") \
    .start()
# 5. 等待终止
query.awaitTermination()

如何运行这个示例?

  1. 运行 Python 脚本

    python kafka_wordcount.py
  2. 向 Kafka topic 发送数据: 使用 Kafka 的命令行工具发送一些消息。

    bin/kafka-console-producer.sh --topic wordcount_topic --bootstrap-server localhost:9092

    然后输入文本,

    structured streaming is powerful
    spark is the best
  3. 观察结果: 你会在运行脚本的终端中看到类似下面的输出( 是 DataFrame 的格式):

    -------------------------------------------
    Batch: 0
    -------------------------------------------
    +---------+-----+
    |     word|count|
    +---------+-----+
    |   spark|    1|
    |     the|    1|
    |    best|    1|
    |structured|    1|
    | streaming|    1|
    |   is|    2|
    | powerful|    1|
    +---------+-----+

最佳实践与注意事项

  1. 配置与性能:

    • 批处理间隔: 根据你的延迟需求和集群资源调整,间隔越小,延迟越低,但开销越大。
    • 并行度: 为 DStream 操作(如 reduceByKey, join)设置合适的分区数,以避免数据倾斜和充分利用集群资源。
    • 序列化: 使用 Kryo 序列化器 (spark.serializer=org.apache.spark.serializer.KryoSerializer) 通常比默认的 Java 序列化更快、更紧凑。
  2. 容错与状态管理:

    • 检查点: 对于有状态的操作(如 updateStateByKey 或使用窗口),必须启用检查点,检查点会将 DStream 的元数据和 RDD 的数据保存到可靠的存储(如 HDFS, S3)中,以便在驱动程序故障后能恢复。
      # 在创建 StreamingContext 之前
      sc = SparkContext(conf)
      ssc = StreamingContext(sc, 5)
      ssc.checkpoint("hdfs://your-checkpoint-directory") # 或本地目录
    • Exactly-Once 语义: 对于端到端的精确一次语义,输出操作必须是幂等的(如 HDFS)或支持事务的(如 Kafka, MySQL),Structured Streaming 默认为 Kafka 提供端到端的精确一次语义。
  3. 监控与调试:

    • Spark UI: Spark UI 是监控流作业的利器,你可以查看批次处理时间、数据延迟、任务执行情况等。
    • 日志: 始终查看 Spark 驱动程序和执行器的日志,以便排查错误。
    • 打印操作: pprint()printSchema() 等操作非常适合开发和调试,但在生产环境中应谨慎使用,因为它们会增加开销。
  4. 停止 StreamingContext:

    • 使用 ssc.stop() 来停止流。
    • 如果你想在停止前处理完当前批次的数据,可以传入 stopSparkContext=False,默认情况下,ssc.stop() 也会停止底层的 SparkContext

希望这份详细的指南能帮助你顺利开始 Python Spark Streaming 的开发之旅!

分享:
扫描分享到社交APP
上一篇
下一篇