Spark Streaming 是 Spark Core API 的一个扩展,它支持可扩展、高吞吐、容错的实时数据流处理,其核心抽象是 DStream (Discretized Stream),它代表一个持续不断的数据流,可以看作是一个 RDD 序列。

下面我将从核心概念、开发步骤、完整代码示例到最佳实践,为你提供一个全面的指南。
核心概念
在开始编码前,理解几个核心概念至关重要:
1 DStream (离散化流)
DStream 是 Spark Streaming 提供的抽象,它代表一个持续不断的数据流,它内部是由一系列的 RDD 组成的,对 DStream 的任何操作,最终都会转化为对底层 RDD 的批处理操作。
2 批处理间隔
Spark Streaming 不会一条一条地处理数据,而是将数据流切分成一个个小的 批次,这个切分的时间间隔就是 批处理间隔,1 秒、5 秒、10 秒,这个值是你在创建 StreamingContext 时指定的,它直接影响应用的延迟和吞吐量。

3 输入源
数据可以从哪里来,Spark Streaming 支持多种输入源:
- 基础源: 套接字连接、文件、以及 HDFS 上的文件。
- 高级源: Kafka, Flume, Kinesis, Twitter 等,这些源需要额外的依赖库。
4 转换操作
和 RDD 类似,DStream 也支持两种类型的操作:
- 无状态转换: 对每个批次的数据独立进行操作,不依赖之前的数据。
map(),filter(),reduceByKey()等。 - 有状态转换: 需要跨批次维护状态,最典型的就是 窗口操作,如
reduceByKeyAndWindow()。
5 输出操作
将处理后的结果输出到外部系统,如控制台、数据库、HDFS 等,输出操作是 行动,会触发实际的计算。
开发环境准备
在开始之前,确保你的环境已经配置好。

1 安装 PySpark
如果你的 Spark 已经安装,PySpark 通常会自带,如果没有,可以通过 pip 安装:
pip install pyspark
2 添加额外依赖
如果你使用的是高级源(如 Kafka),你需要下载对应的 JAR 包,并在运行 spark-submit 时指定。
以 Kafka 为例:
你需要下载 spark-sql-kafka-0-10_2.12-x.x.x.x.jar,你可以在你的 Spark 安装目录的 jars 文件夹中找到它,或者从 Maven Central 下载。
Spark Streaming 开发基本步骤
一个标准的 Spark Streaming 应用程序遵循以下步骤:
- 初始化: 创建一个
StreamingContext,这是所有 Streaming 功能的入口。 - 创建输入 DStream: 连接到数据源,创建一个输入 DStream。
- 定义转换: 对 DStream 应用转换操作(如
map,filter,reduceByKey)来定义你的业务逻辑。 - 输出结果: 调用输出操作(如
print(),saveAsTextFiles())来启动计算并输出结果。 - 启动流: 调用
ssc.start()来启动接收和处理数据。 - 等待终止: 调用
ssc.awaitTermination()来等待计算被手动停止或发生错误。
完整代码示例
我们将通过两个最经典的例子来展示开发过程:套接字字数统计 和 基于 Kafka 的结构化流处理。
套接字字数统计 (Socket WordCount)
这个例子是入门的 "Hello, World",它会监听一个指定端口上的网络数据,并实时统计单词出现的频率。
代码 (socket_wordcount.py)
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
# 1. 初始化 SparkContext 和 StreamingContext
# SparkContext 是 Spark 的基础
# StreamingContext 的批处理间隔设置为 5 秒
sc = SparkContext("local[2]", "NetworkWordCount")
ssc = StreamingContext(sc, 5) # 5秒的批处理间隔
# 2. 创建输入 DStream
# 连接到 localhost:9999 的数据源
# Spark Streaming 会持续不断地从该套接字读取数据
lines = ssc.socketTextStream("localhost", 9999)
# 3. 定义转换操作
# a. 将每一行文本按空格分割成单词
words = lines.flatMap(lambda line: line.split(" "))
# b. 将每个单词映射为 (word, 1) 的形式
pairs = words.map(lambda word: (word, 1))
# c. 以 "word" 为 key,进行 reduce 操作,计算每个批次内每个单词的总数
# 这是一个有状态的窗口操作吗?不,这是一个无状态的 reduceByKey。
# 它只对当前 RDD(即当前批次的数据)进行计算。
word_counts = pairs.reduceByKey(lambda x, y: x + y)
# 4. 输出结果
# print() 会将每个批次的结果打印到控制台
# 注意:这是一个输出操作,会触发实际的计算
word_counts.pprint() # pprint 是 pretty print 的缩写
# 5. 启动流
print("Starting the streaming context. Press Ctrl+C to stop.")
ssc.start()
# 6. 等待终止
ssc.awaitTermination() # 阻塞主线程,直到流被停止
如何运行这个示例?
-
启动一个 NetCat (nc) 服务器作为数据源: 在你的终端中运行以下命令,它会监听 9999 端口,并将你输入的任何内容发送出去。
# 在 macOS 上 nc -lk 9999 # 在 Linux 上 nc -l -p 9999
-
运行 Python 脚本: 在另一个终端中,运行你的 Python 脚本。
python socket_wordcount.py
-
观察结果: 回到 NetCat 终端,输入一些文本,
hello spark hello world spark is great你会在运行脚本的终端中看到每 5 秒输出一次的结果,格式如下:
Time: 2025-10-27 10:30:00 (hello, 2) (spark, 2) (world, 1) (is, 1) (great, 1)
基于 Kafka 的结构化流处理 (Structured Streaming)
从 Spark 2.0 开始,推荐使用 Structured Streaming API,它基于 DataFrame/Dataset,提供了更强大、更统一的流处理能力,它将流视为一个不断追加的表。
准备工作:
确保你有 Kafka 环境,并创建一个名为 wordcount_topic 的 topic。
# 假设 Kafka 安装在 /opt/kafka # 1. 启动 Zookeeper 和 Kafka Broker (如果未启动) # 2. 创建 topic bin/kafka-topics.sh --create --topic wordcount_topic --bootstrap-server localhost:9092 --partitions 1 --replication-factor 1
代码 (kafka_wordcount.py)
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, split
from pyspark.sql.types import StructType, StructField, StringType
# 1. 初始化 SparkSession
# 注意:对于 Kafka,需要通过 .config 添加依赖的 JAR 包
spark = SparkSession \
.builder \
.appName("StructuredKafkaWordCount") \
.config("spark.jars", "/path/to/your/spark-sql-kafka-0-10_2.12-3.3.x.jar") # !!! 请替换为你的 JAR 包路径 !!! \
.getOrCreate()
# 2. 定义输入源:从 Kafka 读取数据
# schema 定义了从 Kafka 消息中读取的数据结构
schema = StructType([
StructField("value", StringType(), True)
])
# 读取 Kafka topic
lines = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "wordcount_topic") \
.option("startingOffsets", "latest") # 从最新的消息开始读取 \
.load() \
.selectExpr("CAST(value AS STRING)") # 将 Kafka 的 value (二进制) 转为字符串
# 3. 定义转换操作 (使用 DataFrame API)
# a. 将一行文本按空格分割成多个单词
words = lines.select(
explode(
split(lines.value, " ")
).alias("word")
)
# b. 按 word 分组,并计数
wordCounts = words.groupBy("word").count()
# 4. 输出结果
# 将结果输出到控制台
# outputMode("complete") 表示输出所有聚合后的结果
query = wordCounts.writeStream \
.outputMode("complete") \
.format("console") \
.start()
# 5. 等待终止
query.awaitTermination()
如何运行这个示例?
-
运行 Python 脚本:
python kafka_wordcount.py
-
向 Kafka topic 发送数据: 使用 Kafka 的命令行工具发送一些消息。
bin/kafka-console-producer.sh --topic wordcount_topic --bootstrap-server localhost:9092
然后输入文本,
structured streaming is powerful spark is the best -
观察结果: 你会在运行脚本的终端中看到类似下面的输出( 是 DataFrame 的格式):
------------------------------------------- Batch: 0 ------------------------------------------- +---------+-----+ | word|count| +---------+-----+ | spark| 1| | the| 1| | best| 1| |structured| 1| | streaming| 1| | is| 2| | powerful| 1| +---------+-----+
最佳实践与注意事项
-
配置与性能:
- 批处理间隔: 根据你的延迟需求和集群资源调整,间隔越小,延迟越低,但开销越大。
- 并行度: 为 DStream 操作(如
reduceByKey,join)设置合适的分区数,以避免数据倾斜和充分利用集群资源。 - 序列化: 使用 Kryo 序列化器 (
spark.serializer=org.apache.spark.serializer.KryoSerializer) 通常比默认的 Java 序列化更快、更紧凑。
-
容错与状态管理:
- 检查点: 对于有状态的操作(如
updateStateByKey或使用窗口),必须启用检查点,检查点会将 DStream 的元数据和 RDD 的数据保存到可靠的存储(如 HDFS, S3)中,以便在驱动程序故障后能恢复。# 在创建 StreamingContext 之前 sc = SparkContext(conf) ssc = StreamingContext(sc, 5) ssc.checkpoint("hdfs://your-checkpoint-directory") # 或本地目录 - Exactly-Once 语义: 对于端到端的精确一次语义,输出操作必须是幂等的(如 HDFS)或支持事务的(如 Kafka, MySQL),Structured Streaming 默认为 Kafka 提供端到端的精确一次语义。
- 检查点: 对于有状态的操作(如
-
监控与调试:
- Spark UI: Spark UI 是监控流作业的利器,你可以查看批次处理时间、数据延迟、任务执行情况等。
- 日志: 始终查看 Spark 驱动程序和执行器的日志,以便排查错误。
- 打印操作:
pprint()和printSchema()等操作非常适合开发和调试,但在生产环境中应谨慎使用,因为它们会增加开销。
-
停止 StreamingContext:
- 使用
ssc.stop()来停止流。 - 如果你想在停止前处理完当前批次的数据,可以传入
stopSparkContext=False,默认情况下,ssc.stop()也会停止底层的SparkContext。
- 使用
希望这份详细的指南能帮助你顺利开始 Python Spark Streaming 的开发之旅!
