杰瑞科技汇

Python如何连接InfluxDB?数据写入查询全解析

Python InfluxDB 教程:从零开始掌握时序数据库操作

目录

  1. 前言:为什么选择 InfluxDB?
  2. 环境准备
    • 安装 InfluxDB
    • 安装 Python 客户端库
  3. 核心概念速览
    • 数据库 & 组织
    • 测量
    • 字段
    • 时间戳
  4. Python 操作 InfluxDB 完整指南
    • 建立连接
    • 创建数据库/组织
    • 写入数据
    • 查询数据
    • 更新与删除数据
    • 关闭连接
  5. 完整代码示例
  6. 进阶主题
    • 使用异步客户端 (influxdb-client)
    • 使用 Pandas 进行数据分析
    • 数据保留策略
  7. 总结与最佳实践

前言:为什么选择 InfluxDB?

InfluxDB 是一个专门为时间序列数据而设计的开源数据库,它非常适合处理那些带有时间戳、需要快速写入和查询的场景,

Python如何连接InfluxDB?数据写入查询全解析-图1
(图片来源网络,侵删)
  • 物联网:传感器数据、设备状态监控。
  • DevOps/监控:服务器性能指标(CPU、内存、磁盘IO)、应用程序指标。
  • 实时分析:点击流数据、金融交易数据。

它的主要优势:

  • 高性能:针对时序数据进行了深度优化,写入和查询速度极快。
  • 高效存储:使用 TSM (Time-Structured Merge Tree) 存储引擎,数据压缩率高。
  • 类 SQL 查询语言 (InfluxQL / Flux):学习成本低,易于上手。
  • 内置数据可视化:通过 InfluxDB 2.x 的内置 UI 或集成第三方工具(如 Grafana)可以轻松实现数据可视化。

环境准备

1 安装 InfluxDB

最简单的方式是使用 Docker,如果你没有安装 Docker,请先安装它。

运行以下命令启动一个 InfluxDB 2.x 实例:

docker run -d --name influxdb -p 8086:8086 influxdb:latest

这个命令会:

Python如何连接InfluxDB?数据写入查询全解析-图2
(图片来源网络,侵删)
  • -d: 在后台运行容器。
  • --name influxdb: 将容器命名为 influxdb
  • -p 8086:8086: 将主机的 8086 端口映射到容器的 8086 端口(InfluxDB 的默认 API 端口)。

启动后,访问 http://localhost:8086 即可看到 InfluxDB 的 Web UI。

首次设置

  1. 创建一个管理员账户(用户名和密码)。
  2. 创建一个初始组织Bucket
    • 组织: 类似于公司或团队,用于管理用户、任务和权限。
    • Bucket: 类似于传统数据库中的数据库,用于存储数据,创建时可以设置数据保留策略(Retention Policy),即数据保留多久。

记住你创建的URLToken组织名Bucket名,后续 Python 代码会用到。

2 安装 Python 客户端库

我们推荐使用官方的 influxdb-client 库,它同时支持同步和异步操作,功能强大且是未来的发展方向。

Python如何连接InfluxDB?数据写入查询全解析-图3
(图片来源网络,侵删)
pip install influxdb-client

核心概念速览

在写代码前,理解 InfluxDB 的数据模型至关重要。

  • Organization (组织): 顶层管理单元,包含你的所有资源(用户、bucket、仪表盘等)。
  • Bucket (桶): 存储数据的地方,相当于数据库,每个 bucket 有自己的数据保留策略。
  • Measurement (测量): 存储数据的表结构,cpu_load, sensor_reading,通常是一个名词。
  • Tag (标签): 用于索引查询的键值对,标签数据是字符串,并且是索引的,查询速度极快。host="server01", region="us-west"
  • Field (字段): 存储实际测量值的键值对,字段值可以是数字、布尔值、字符串等,但不是索引的,主要用于存储数据。value=98.6, error_count=5
  • Timestamp (时间戳): 每条记录都有一个精确的时间戳,由 InfluxDB 在写入时自动添加,也可以由客户端提供。

一个数据点的结构示例

measurement: sensor_temp
tags: location="room1", sensor_id="A1"
fields: temperature=23.5, humidity=45.2
timestamp: 2025-10-27T10:00:00Z

Python 操作 InfluxDB 完整指南

我们将使用 influxdb-client 库进行操作。

建立连接

你需要一个Token 来进行身份验证,在 InfluxDB UI 的 Load Data -> API Tokens 页面可以创建一个 Token,并赋予它对你的 Bucket 的读写权限。

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
# --- 配置信息 ---
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "YOUR_INFLUXDB_TOKEN"  # 替换成你的 Token
INFLUXDB_ORG = "YOUR_ORG_NAME"        # 替换成你的组织名
INFLUXDB_BUCKET = "YOUR_BUCKET_NAME"   # 替换成你的 Bucket 名
# --- 创建客户端 ---
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
# 获取 Write API 和 Query API 的实例
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

创建数据库/组织

在 InfluxDB 2.x 中,Bucket 是数据存储的地方,你通常需要在 UI 中创建它,Python 客户端也可以通过 BucketsApi 来创建,但为了简单起见,我们假设 Bucket 已经存在。

写入数据

写入数据的核心是创建一个 Point 对象,然后使用 write_api 写入。

from influxdb_client import Point
from datetime import datetime
# 创建一个数据点
point = Point("my_measurement") \
    .tag("location", "server_room") \
    .tag("sensor_id", "temp_01") \
    .field("value", 25.3) \
    .time(datetime.utcnow()) # 可以指定时间,不指定则使用服务器时间
# 写入数据
# write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point)
# 批量写入数据更高效
points_to_write = []
for i in range(5):
    p = Point("my_measurement") \
        .tag("location", "server_room") \
        .tag("sensor_id", f"temp_{i:02d}") \
        .field("value", 20.0 + i) \
        .time(datetime.utcnow())
    points_to_write.append(p)
# 写入一个列表的数据点
write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=points_to_write)
print("数据写入成功!")

查询数据

查询使用 Flux 语言(InfluxDB 2.x 的查询语言)。

from pprint import pprint
# Flux 查询语句
# 从指定 bucket 中选择所有 location 为 "server_room" 的数据
# |> range(start: -10m) 表示查询最近10分钟的数据
# |> filter 添加过滤条件
query = f'''
from(bucket:"{INFLUXDB_BUCKET}")
  |> range(start: -10m)
  |> filter(fn: (r) => r._measurement == "my_measurement")
  |> filter(fn: (r) => r.location == "server_room")
'''
# 执行查询
tables = query_api.query(query, org=INFLUXDB_ORG)
# 解析和打印结果
for table in tables:
    for record in table.records:
        # record.values 是一个包含所有字段和标签的字典
        print(f"时间: {record.get_time()}, 测量值: {record.get_value()}, 标签: {record.values.get('location')}")
        # pprint(record.values) # 可以打印完整记录

更新与删除数据

重要提示:InfluxDB 是一个时序数据库,不支持原地更新,所谓的“更新”实际上是写入一条新的数据点,覆盖旧的数据点(前提是它们的 tag 和 time 完全相同)。

删除数据则必须通过 Flux 查询来指定要删除的数据范围。

# --- 更新数据 ---
# 写入一个具有相同 tag 和 time 的新点,就相当于更新
updated_point = Point("my_measurement") \
    .tag("location", "server_room") \
    .tag("sensor_id", "temp_01") \
    .field("value", 99.9) \
    .time(datetime.utcnow()) # 使用与旧点相同的时间戳
write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=updated_point)
print("数据'更新'成功!")
# --- 删除数据 ---
# 删除操作需要 Flux 脚本
# 删除 "server_room" 在最近5分钟内的所有 "my_measurement" 数据
delete_api = client.delete_api()
# 构建删除查询
delete_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
delete_stop = datetime.utcnow()
delete_query = f'''
from(bucket:"{INFLUXDB_BUCKET}")
  |> range(start: {delete_start.isoformat()}Z, stop: {delete_stop.isoformat()}Z)
  |> filter(fn: (r) => r._measurement == "my_measurement")
  |> filter(fn: (r) => r.location == "server_room")
'''
# 执行删除
# delete_api.delete(delete_query, INFLUXDB_ORG, INFLUXDB_BUCKET)
print("删除查询已构建(实际删除操作已注释,请谨慎使用)")

关闭连接

完成所有操作后,记得关闭客户端以释放资源。

client.close()
print("客户端连接已关闭。")

完整代码示例

这是一个将以上步骤整合在一起的完整脚本。

import time
from datetime import datetime, timedelta
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# --- 1. 配置 ---
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "YOUR_INFLUXDB_TOKEN"  # <--- 替换为你的 Token
INFLUXDB_ORG = "YOUR_ORG_NAME"        # <--- 替换为你的组织名
INFLUXDB_BUCKET = "YOUR_BUCKET_NAME"   # <--- 替换为你的 Bucket 名
# --- 2. 创建客户端 ---
print("正在连接 InfluxDB...")
client = InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()
# --- 3. 写入数据 ---
print("正在写入数据...")
point1 = Point("cpu_usage") \
    .tag("host", "server01") \
    .tag("region", "us-west") \
    .field("usage_percent", 75.5) \
    .time(datetime.utcnow())
point2 = Point("cpu_usage") \
    .tag("host", "server02") \
    .tag("region", "us-west") \
    .field("usage_percent", 82.1) \
    .time(datetime.utcnow())
write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=[point1, point2])
print("数据写入完成。")
# 等待一秒,确保数据被写入
time.sleep(1)
# --- 4. 查询数据 ---
print("\n正在查询数据...")
query = f'''
from(bucket:"{INFLUXDB_BUCKET}")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu_usage")
  |> filter(fn: (r) => r.region == "us-west")
  |> sort(columns: ["_time"], desc: true)
'''
tables = query_api.query(query, org=INFLUXDB_ORG)
print("查询结果:")
for table in tables:
    for record in table.records:
        print(f"主机: {record.values.get('host')}, CPU使用率: {record.get_value()}%, 时间: {record.get_time()}")
# --- 5. 关闭连接 ---
print("\n正在关闭客户端...")
client.close()
print("操作完成。")

进阶主题

1 使用异步客户端 (influxdb-client)

对于高并发场景(如处理大量 IoT 设备数据),异步操作能显著提高性能。

import asyncio
from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import ASYNCHRONOUS
async def main():
    async with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
        write_api = client.write_api(write_options=ASYNCHRONOUS)
        # 异步写入一个点
        p = Point("async_data").tag("type", "async_test").field("value", 123)
        await write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=p)
        # 批量异步写入
        points = [Point("async_data").tag("i", str(i)).field("value", i) for i in range(100)]
        await write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=points)
        print("异步写入完成。")
# 运行异步主函数
asyncio.run(main())

2 使用 Pandas 进行数据分析

influxdb-client 可以将查询结果直接转换为 Pandas DataFrame,非常方便。

from influxdb_client import InfluxDBClient
from influxdb_client.client.query_api import QueryOptions
import pandas as pd
# ... (连接代码同上) ...
# 使用 PANDAS 选项来获取 DataFrame
query_api = client.query_api(QueryOptions(dataframe_query=True))
query = f'''
from(bucket:"{INFLUXDB_BUCKET}")
  |> range(start: -1h)
  |> filter(fn: (r) => r._measurement == "cpu_usage")
'''
# 查询结果直接是 Pandas DataFrame
df = query_api.query_data_frame(query, org=INFLUXDB_ORG)
print(df.head())
print("\n按主机分组计算平均CPU使用率:")
print(df.groupby(['host'])['usage_percent'].mean())

3 数据保留策略

在 InfluxDB 2.x 中,Bucket 创建时就需要设置保留期,创建一个保留 7 天数据的 Bucket:

  • 在 UI 中创建 Bucket 时,设置 Retention period7d
  • 这意味着 7 天前的数据将被自动删除,无需手动管理。

总结与最佳实践

  • 使用官方客户端:优先选择 influxdb-client,它功能完善且持续更新。
  • 理解数据模型:正确区分 Measurement, Tag, Field 是高效使用 InfluxDB 的关键,多用 Tag 进行查询过滤。
  • 批量写入:尽量使用批量写入 (write_api.write(record=[p1, p2, ...])),而不是单点循环写入,这能极大提升写入性能。
  • 查询优化:查询时,filter 条件中优先使用 Tag,因为 Tag 是索引的,而 Field 不是。
  • Token 管理:为不同的应用创建不同权限的 Token,遵循最小权限原则。
  • 异步操作:对于 I/O 密集型或高并发的写入场景,务必使用异步客户端。
  • 备份与监控:定期备份你的 Bucket,并监控 InfluxDB 本身的性能指标。

希望这份详细的教程能帮助你顺利地在 Python 项目中集成和使用 InfluxDB!

分享:
扫描分享到社交APP
上一篇
下一篇