杰瑞科技汇

Python如何访问InfluxDB?

安装 InfluxDB Python 客户端

你需要安装官方的 Python 客户端库,打开你的终端或命令行,运行以下命令:

Python如何访问InfluxDB?-图1
(图片来源网络,侵删)
pip install influxdb-client

准备工作:InfluxDB Server 和 Bucket

在编写 Python 代码之前,请确保你有一个可用的 InfluxDB 实例。

  • InfluxDB 2.x (推荐):

    1. 启动你的 InfluxDB 服务。
    2. 登录 InfluxDB UI (通常是 http://localhost:8086)。
    3. 创建一个 Bucket (用来存储数据的“数据库”),例如命名为 python_demo
    4. 生成一个 API Token,点击 Load Data -> API Tokens -> Generate API Token,并给它足够的权限(All Access)。请务必保存好这个 Token
  • InfluxDB 1.x:

    1. 启动你的 InfluxDB 服务。
    2. 创建一个数据库,python_demo_db
    3. 创建一个用户并授予读写权限,或者使用默认的 admin 用户。

连接到 InfluxDB

连接到 InfluxDB 2.x 和 1.x 的方式略有不同。

Python如何访问InfluxDB?-图2
(图片来源网络,侵删)

连接到 InfluxDB 2.x

InfluxDB 2.x 使用 Token 进行认证,URL 格式为 http://host:port

from influxdb_client import InfluxDBClient
from influxdb_client.client.write_api import SYNCHRONOUS
# --- 配置信息 ---
# 替换成你的 InfluxDB 服务器地址和端口
url = "http://localhost:8086"
# 替换成你的 Token
token = "your-secret-api-token"
# 替换成你的组织名称
org = "your-org-name"
# 替换成你的 Bucket 名称
bucket = "python_demo"
# --- 创建客户端 ---
# 使用 `with` 语句可以确保连接在使用完毕后被正确关闭
with InfluxDBClient(url=url, token=token, org=org) as client:
    print("Connected to InfluxDB.")
    # ... 在这里进行后续操作 ...

连接到 InfluxDB 1.x

InfluxDB 1.x 使用用户名、密码和数据库名进行认证。

from influxdb_client import InfluxDBClient
# --- 配置信息 ---
# 替换成你的 InfluxDB 服务器地址和端口
url = "http://localhost:8086"
# 替换成你的用户名和密码
username = "admin"
password = "your-password"
# 替换成你的数据库名
database = "python_demo_db"
# --- 创建客户端 ---
with InfluxDBClient(url=url, username=username, password=password, database=database) as client:
    print("Connected to InfluxDB 1.x.")
    # ... 在这里进行后续操作 ...

核心操作:写入数据

在 InfluxDB 中,数据以“点”(Point)的形式写入,一个 Point 包含:

  • Measurement: 测量值名称,如 cpu_usage
  • Tag: 键值对,用于索引和过滤,如 host=server01
  • Field: 键值对,存储实际的数据值,如 value=95.5,Field 是数据的核心。
  • Timestamp: 时间戳,如果不提供,InfluxDB 会使用服务器时间。

写入数据到 InfluxDB 2.x

influxdb-client 提供了 Point 对象来构建数据,非常方便。

Python如何访问InfluxDB?-图3
(图片来源网络,侵删)
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
import time
# (复用上面的连接配置)
url = "http://localhost:8086"
token = "your-secret-api-token"
org = "your-org-name"
bucket = "python_demo"
# 创建客户端
with InfluxDBClient(url=url, token=token, org=org) as client:
    # 获取 Write API
    write_api = client.write_api(write_options=SYNCHRONOUS)
    # 创建一个数据点
    p = Point("mem_usage") \
        .tag("host", "server01") \
        .tag("region", "us-west") \
        .field("used_percent", 78.5) \
        .field("free_bytes", 12345678)
    # 写入数据
    write_api.write(bucket=bucket, record=p)
    print(f"Point written: {p}")
    # 也可以一次性写入多个点
    points_to_write = [
        Point("cpu_usage").tag("host", "server01").field("value", 25.3),
        Point("cpu_usage").tag("host", "server02").field("value", 45.8),
    ]
    write_api.write(bucket=bucket, record=points_to_write)
    print("Multiple points written.")

写入数据到 InfluxDB 1.x

对于 1.x,可以直接使用行协议字符串写入,或者使用 Point 对象(推荐,因为更通用)。

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# (复用上面的连接配置)
url = "http://localhost:8086"
username = "admin"
password = "your-password"
database = "python_demo_db"
with InfluxDBClient(url=url, username=username, password=password, database=database) as client:
    write_api = client.write_api(write_options=SYNCHRONOUS)
    # 使用 Point 对象(推荐)
    p = Point("disk_usage") \
        .tag("mount_point", "/") \
        .field("used_percent", 92.1)
    write_api.write(bucket=database, record=p) # 在1.x中,bucket参数通常就是数据库名
    print(f"Point written to 1.x: {p}")

核心操作:查询数据

查询数据使用 Flux (InfluxDB 2.x) 或 InfluxQL (InfluxDB 1.x) 查询语言。

查询数据从 InfluxDB 2.x (使用 Flux)

from influxdb_client import InfluxDBClient, QueryOptions
from influxdb_client.client.query_api import QueryApi
# (复用上面的连接配置)
url = "http://localhost:8086"
token = "your-secret-api-token"
org = "your-org-name"
with InfluxDBClient(url=url, token=token, org=org) as client:
    # 获取 Query API
    query_api = client.query_api()
    # Flux 查询语句
    # 从 'python_demo' bucket 中查询 'mem_usage' measurement 的 'used_percent' field
    # 最近5分钟的数据
    query = f'''
    from(bucket:"{bucket}")
      |> range(start: -5m)
      |> filter(fn: (r) => r._measurement == "mem_usage")
      |> filter(fn: (r) => r._field == "used_percent")
    '''
    # 执行查询
    tables = query_api.query(query, org=org)
    # 处理查询结果
    for table in tables:
        for record in table.records:
            # record 对象包含了时间戳、measurement、tag、field 等所有信息
            print(f"Time: {record.get_time()}, "
                  f"Measurement: {record.get_measurement()}, "
                  f"Tag: host={record.get_value_by_key('host')}, "
                  f"Field: {record.get_field()}={record.get_value()}")

查询数据从 InfluxDB 1.x (使用 InfluxQL)

from influxdb_client import InfluxDBClient
# (复用上面的连接配置)
url = "http://localhost:8086"
username = "admin"
password = "your-password"
database = "python_demo_db"
with InfluxDBClient(url=url, username=username, password=password, database=database) as client:
    query_api = client.query_api()
    # InfluxQL 查询语句
    query = f'SELECT * FROM "disk_usage" WHERE time > now() - 5m'
    # 执行查询
    tables = query_api.query(query)
    # 处理查询结果
    for table in tables:
        for record in table.records:
            print(f"Time: {record.get_time()}, "
                  f"Measurement: {record.get_measurement()}, "
                  f"Tag: mount_point={record.get_value_by_key('mount_point')}, "
                  f"Field: {record.get_field()}={record.get_value()}")

完整示例:写入并查询

这是一个将所有操作整合在一起的完整 Python 脚本,适用于 InfluxDB 2.x。

# full_example_influxdb2.py
import time
from datetime import datetime, timedelta
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryApi
# --- 1. 配置 ---
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_TOKEN = "your-secret-api-token" # <--- 替换成你的 Token
INFLUXDB_ORG = "your-org-name"           # <--- 替换成你的 Org
INFLUXDB_BUCKET = "python_demo"          # <--- 替换成你的 Bucket
# --- 2. 写入数据 ---
print("--- Writing data to InfluxDB ---")
with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
    write_api = client.write_api(write_options=SYNCHRONOUS)
    # 写入一个带有特定时间戳的点
    point_time = datetime.utcnow() - timedelta(minutes=1)
    p = Point("sensor_data") \
        .tag("sensor_id", "sensor-001") \
        .field("temperature", 23.5) \
        .time(point_time)
    write_api.write(bucket=INFLUXDB_BUCKET, record=p)
    print(f"Successfully wrote: {p}")
    # 写入多个点(模拟数据流)
    write_api.write(bucket=INFLUXDB_BUCKET, record=[
        Point("sensor_data").tag("sensor_id", "sensor-002").field("temperature", 24.1),
        Point("sensor_data").tag("sensor_id", "sensor-003").field("humidity", 55.2),
    ])
    print("Successfully wrote multiple points.")
# 等待数据写入完成
time.sleep(1)
# --- 3. 查询数据 ---
print("\n--- Querying data from InfluxDB ---")
with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
    query_api = client.query_api()
    # Flux 查询:获取所有传感器最近10分钟的温度数据
    flux_query = f'''
    from(bucket:"{INFLUXDB_BUCKET}")
      |> range(start: -10m)
      |> filter(fn: (r) => r._measurement == "sensor_data")
      |> filter(fn: (r) => r._field == "temperature")
      |> sort(columns: ["_time"], desc: true)
    '''
    tables = query_api.query(flux_query, org=INFLUXDB_ORG)
    print("\nQuery Results:")
    for table in tables:
        for record in table.records:
            # 使用 f-string 格式化输出,更清晰
            print(f"Time: {record.get_time()}, Sensor: {record.get_value_by_key('sensor_id')}, Temperature: {record.get_value()}°C")

高级用法:异步写入

对于高频数据写入(例如从物联网设备接收数据),同步写入可能会成为性能瓶颈。influxdb-client 支持异步写入,可以大大提高吞吐量。

你需要安装 asyncioaiohttp 库(influxdb-client 通常会自动处理依赖)。

import asyncio
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import ASYNC
# (复用上面的连接配置)
url = "http://localhost:8086"
token = "your-secret-api-token"
org = "your-org-name"
bucket = "python_demo"
async def main():
    # 使用异步上下文管理器
    async with InfluxDBClient(url=url, token=token, org=org) as client:
        # 获取异步的 Write API
        write_api = client.write_api(write_options=ASYNC)
        # 创建一个批量写入的任务
        points = [
            Point("async_sensor").tag("id", "async-1").field("value", i) for i in range(1000)
        ]
        # 使用 gather 并行执行写入任务
        await asyncio.gather(*[write_api.write(bucket=bucket, record=p) for p in points])
        # 确保所有写入任务都完成
        write_api.close()
        print("Async write finished.")
# 运行异步主函数
asyncio.run(main())

总结与建议

功能 InfluxDB 2.x (推荐) InfluxDB 1.x
认证 API Token 用户名/密码
数据存储 Bucket Database
查询语言 Flux InfluxQL
Python库 influxdb-client influxdb-clientpython-influxdb (旧版)
写入方式 Point 对象,行协议 Point 对象,行协议,直接SQL
推荐做法 使用 with 语句管理客户端生命周期。
2. 优先使用 Point 对象构建数据,可读性好。
3. 对于高频写入,使用异步 ASYNC API。
同上。
2. 如果项目必须使用1.x,influxdb-client 依然兼容。

对于新项目,强烈建议使用 InfluxDB 2.x 和官方的 influxdb-client 库,因为它功能更强大、社区更活跃、API 设计也更现代化。

分享:
扫描分享到社交APP
上一篇
下一篇