Python InfluxDB 教程:从零开始掌握时序数据库操作
目录
- 前言:为什么选择 InfluxDB?
- 环境准备
- 安装 InfluxDB
- 安装 Python 客户端库
- 核心概念速览
- 数据库 & 组织
- 测量
- 字段
- 时间戳
- Python 操作 InfluxDB 完整指南
- 建立连接
- 创建数据库/组织
- 写入数据
- 查询数据
- 更新与删除数据
- 关闭连接
- 完整代码示例
- 进阶主题
- 使用异步客户端 (
influxdb-client) - 使用 Pandas 进行数据分析
- 数据保留策略
- 使用异步客户端 (
- 总结与最佳实践
前言:为什么选择 InfluxDB?
InfluxDB 是一个专门为时间序列数据而设计的开源数据库,它非常适合处理那些带有时间戳、需要快速写入和查询的场景,

- 物联网:传感器数据、设备状态监控。
- DevOps/监控:服务器性能指标(CPU、内存、磁盘IO)、应用程序指标。
- 实时分析:点击流数据、金融交易数据。
它的主要优势:
- 高性能:针对时序数据进行了深度优化,写入和查询速度极快。
- 高效存储:使用 TSM (Time-Structured Merge Tree) 存储引擎,数据压缩率高。
- 类 SQL 查询语言 (InfluxQL / Flux):学习成本低,易于上手。
- 内置数据可视化:通过 InfluxDB 2.x 的内置 UI 或集成第三方工具(如 Grafana)可以轻松实现数据可视化。
环境准备
1 安装 InfluxDB
最简单的方式是使用 Docker,如果你没有安装 Docker,请先安装它。
运行以下命令启动一个 InfluxDB 2.x 实例:
docker run -d --name influxdb -p 8086:8086 influxdb:latest
这个命令会:

-d: 在后台运行容器。--name influxdb: 将容器命名为influxdb。-p 8086:8086: 将主机的 8086 端口映射到容器的 8086 端口(InfluxDB 的默认 API 端口)。
启动后,访问 http://localhost:8086 即可看到 InfluxDB 的 Web UI。
首次设置:
- 创建一个管理员账户(用户名和密码)。
- 创建一个初始组织 和 Bucket。
- 组织: 类似于公司或团队,用于管理用户、任务和权限。
- Bucket: 类似于传统数据库中的数据库,用于存储数据,创建时可以设置数据保留策略(Retention Policy),即数据保留多久。
记住你创建的URL、Token、组织名和Bucket名,后续 Python 代码会用到。
2 安装 Python 客户端库
我们推荐使用官方的 influxdb-client 库,它同时支持同步和异步操作,功能强大且是未来的发展方向。

pip install influxdb-client
核心概念速览
在写代码前,理解 InfluxDB 的数据模型至关重要。
- Organization (组织): 顶层管理单元,包含你的所有资源(用户、bucket、仪表盘等)。
- Bucket (桶): 存储数据的地方,相当于数据库,每个 bucket 有自己的数据保留策略。
- Measurement (测量): 存储数据的表结构,
cpu_load,sensor_reading,通常是一个名词。 - Tag (标签): 用于索引和查询的键值对,标签数据是字符串,并且是索引的,查询速度极快。
host="server01",region="us-west"。 - Field (字段): 存储实际测量值的键值对,字段值可以是数字、布尔值、字符串等,但不是索引的,主要用于存储数据。
value=98.6,error_count=5。 - Timestamp (时间戳): 每条记录都有一个精确的时间戳,由 InfluxDB 在写入时自动添加,也可以由客户端提供。
一个数据点的结构示例:
measurement: sensor_temp
tags: location="room1", sensor_id="A1"
fields: temperature=23.5, humidity=45.2
timestamp: 2025-10-27T10:00:00Z
Python 操作 InfluxDB 完整指南
我们将使用 influxdb-client 库进行操作。
建立连接
你需要一个Token 来进行身份验证,在 InfluxDB UI 的 Load Data -> API Tokens 页面可以创建一个 Token,并赋予它对你的 Bucket 的读写权限。
from influxdb_client import InfluxDBClient from influxdb_client.client.write_api import SYNCHRONOUS # --- 配置信息 --- INFLUXDB_URL = "http://localhost:8086" INFLUXDB_TOKEN = "YOUR_INFLUXDB_TOKEN" # 替换成你的 Token INFLUXDB_ORG = "YOUR_ORG_NAME" # 替换成你的组织名 INFLUXDB_BUCKET = "YOUR_BUCKET_NAME" # 替换成你的 Bucket 名 # --- 创建客户端 --- client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) # 获取 Write API 和 Query API 的实例 write_api = client.write_api(write_options=SYNCHRONOUS) query_api = client.query_api()
创建数据库/组织
在 InfluxDB 2.x 中,Bucket 是数据存储的地方,你通常需要在 UI 中创建它,Python 客户端也可以通过 BucketsApi 来创建,但为了简单起见,我们假设 Bucket 已经存在。
写入数据
写入数据的核心是创建一个 Point 对象,然后使用 write_api 写入。
from influxdb_client import Point
from datetime import datetime
# 创建一个数据点
point = Point("my_measurement") \
.tag("location", "server_room") \
.tag("sensor_id", "temp_01") \
.field("value", 25.3) \
.time(datetime.utcnow()) # 可以指定时间,不指定则使用服务器时间
# 写入数据
# write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point)
# 批量写入数据更高效
points_to_write = []
for i in range(5):
p = Point("my_measurement") \
.tag("location", "server_room") \
.tag("sensor_id", f"temp_{i:02d}") \
.field("value", 20.0 + i) \
.time(datetime.utcnow())
points_to_write.append(p)
# 写入一个列表的数据点
write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=points_to_write)
print("数据写入成功!")
查询数据
查询使用 Flux 语言(InfluxDB 2.x 的查询语言)。
from pprint import pprint
# Flux 查询语句
# 从指定 bucket 中选择所有 location 为 "server_room" 的数据
# |> range(start: -10m) 表示查询最近10分钟的数据
# |> filter 添加过滤条件
query = f'''
from(bucket:"{INFLUXDB_BUCKET}")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> filter(fn: (r) => r.location == "server_room")
'''
# 执行查询
tables = query_api.query(query, org=INFLUXDB_ORG)
# 解析和打印结果
for table in tables:
for record in table.records:
# record.values 是一个包含所有字段和标签的字典
print(f"时间: {record.get_time()}, 测量值: {record.get_value()}, 标签: {record.values.get('location')}")
# pprint(record.values) # 可以打印完整记录
更新与删除数据
重要提示:InfluxDB 是一个时序数据库,不支持原地更新,所谓的“更新”实际上是写入一条新的数据点,覆盖旧的数据点(前提是它们的 tag 和 time 完全相同)。
删除数据则必须通过 Flux 查询来指定要删除的数据范围。
# --- 更新数据 ---
# 写入一个具有相同 tag 和 time 的新点,就相当于更新
updated_point = Point("my_measurement") \
.tag("location", "server_room") \
.tag("sensor_id", "temp_01") \
.field("value", 99.9) \
.time(datetime.utcnow()) # 使用与旧点相同的时间戳
write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=updated_point)
print("数据'更新'成功!")
# --- 删除数据 ---
# 删除操作需要 Flux 脚本
# 删除 "server_room" 在最近5分钟内的所有 "my_measurement" 数据
delete_api = client.delete_api()
# 构建删除查询
delete_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
delete_stop = datetime.utcnow()
delete_query = f'''
from(bucket:"{INFLUXDB_BUCKET}")
|> range(start: {delete_start.isoformat()}Z, stop: {delete_stop.isoformat()}Z)
|> filter(fn: (r) => r._measurement == "my_measurement")
|> filter(fn: (r) => r.location == "server_room")
'''
# 执行删除
# delete_api.delete(delete_query, INFLUXDB_ORG, INFLUXDB_BUCKET)
print("删除查询已构建(实际删除操作已注释,请谨慎使用)")
关闭连接
完成所有操作后,记得关闭客户端以释放资源。
client.close()
print("客户端连接已关闭。")
完整代码示例
这是一个将以上步骤整合在一起的完整脚本。
import time
from datetime import datetime, timedelta
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# --- 1. 配置 ---
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "YOUR_INFLUXDB_TOKEN" # <--- 替换为你的 Token
INFLUXDB_ORG = "YOUR_ORG_NAME" # <--- 替换为你的组织名
INFLUXDB_BUCKET = "YOUR_BUCKET_NAME" # <--- 替换为你的 Bucket 名
# --- 2. 创建客户端 ---
print("正在连接 InfluxDB...")
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
# --- 3. 写入数据 ---
print("正在写入数据...")
point1 = Point("cpu_usage") \
.tag("host", "server01") \
.tag("region", "us-west") \
.field("usage_percent", 75.5) \
.time(datetime.utcnow())
point2 = Point("cpu_usage") \
.tag("host", "server02") \
.tag("region", "us-west") \
.field("usage_percent", 82.1) \
.time(datetime.utcnow())
write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=[point1, point2])
print("数据写入完成。")
# 等待一秒,确保数据被写入
time.sleep(1)
# --- 4. 查询数据 ---
print("\n正在查询数据...")
query = f'''
from(bucket:"{INFLUXDB_BUCKET}")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu_usage")
|> filter(fn: (r) => r.region == "us-west")
|> sort(columns: ["_time"], desc: true)
'''
tables = query_api.query(query, org=INFLUXDB_ORG)
print("查询结果:")
for table in tables:
for record in table.records:
print(f"主机: {record.values.get('host')}, CPU使用率: {record.get_value()}%, 时间: {record.get_time()}")
# --- 5. 关闭连接 ---
print("\n正在关闭客户端...")
client.close()
print("操作完成。")
进阶主题
1 使用异步客户端 (influxdb-client)
对于高并发场景(如处理大量 IoT 设备数据),异步操作能显著提高性能。
import asyncio
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import ASYNCHRONOUS
async def main():
async with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
write_api = client.write_api(write_options=ASYNCHRONOUS)
# 异步写入一个点
p = Point("async_data").tag("type", "async_test").field("value", 123)
await write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=p)
# 批量异步写入
points = [Point("async_data").tag("i", str(i)).field("value", i) for i in range(100)]
await write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=points)
print("异步写入完成。")
# 运行异步主函数
asyncio.run(main())
2 使用 Pandas 进行数据分析
influxdb-client 可以将查询结果直接转换为 Pandas DataFrame,非常方便。
from influxdb_client import InfluxDBClient
from influxdb_client.client.query_api import QueryOptions
import pandas as pd
# ... (连接代码同上) ...
# 使用 PANDAS 选项来获取 DataFrame
query_api = client.query_api(QueryOptions(dataframe_query=True))
query = f'''
from(bucket:"{INFLUXDB_BUCKET}")
|> range(start: -1h)
|> filter(fn: (r) => r._measurement == "cpu_usage")
'''
# 查询结果直接是 Pandas DataFrame
df = query_api.query_data_frame(query, org=INFLUXDB_ORG)
print(df.head())
print("\n按主机分组计算平均CPU使用率:")
print(df.groupby(['host'])['usage_percent'].mean())
3 数据保留策略
在 InfluxDB 2.x 中,Bucket 创建时就需要设置保留期,创建一个保留 7 天数据的 Bucket:
- 在 UI 中创建 Bucket 时,设置
Retention period为7d。 - 这意味着 7 天前的数据将被自动删除,无需手动管理。
总结与最佳实践
- 使用官方客户端:优先选择
influxdb-client,它功能完善且持续更新。 - 理解数据模型:正确区分
Measurement,Tag,Field是高效使用 InfluxDB 的关键,多用Tag进行查询过滤。 - 批量写入:尽量使用批量写入 (
write_api.write(record=[p1, p2, ...])),而不是单点循环写入,这能极大提升写入性能。 - 查询优化:查询时,
filter条件中优先使用Tag,因为Tag是索引的,而Field不是。 - Token 管理:为不同的应用创建不同权限的 Token,遵循最小权限原则。
- 异步操作:对于 I/O 密集型或高并发的写入场景,务必使用异步客户端。
- 备份与监控:定期备份你的 Bucket,并监控 InfluxDB 本身的性能指标。
希望这份详细的教程能帮助你顺利地在 Python 项目中集成和使用 InfluxDB!
