杰瑞科技汇

python 操作influxdb

InfluxDB 是一个专门为时间序列数据设计的数据库,Python 操作它也围绕着时间序列的核心概念:测量、标签、字段、时间戳

我们将主要介绍官方推荐的 influxdb-client-python 库,因为它功能强大、支持最新的 InfluxDB 3.x 和 2.x API,并且是未来发展的方向。


安装 InfluxDB 客户端库

你需要安装官方的 Python 客户端库,打开你的终端或命令行,运行以下命令:

pip install influxdb-client

连接到 InfluxDB

在操作数据库之前,你需要建立一个连接,你需要提供 InfluxDB 服务器的 URL、你的组织名、访问令牌。

如何获取访问令牌? 在 InfluxDB UI 中:

  1. 进入 Data (数据) 标签页。
  2. 选择一个 Bucket (存储桶)。
  3. 点击右上角的 API Tokens (API 令牌)。
  4. 点击 Generate API Token (生成 API 令牌)。
  5. 给令牌一个名字,并设置适当的权限(读写权限),然后生成并复制令牌。

连接代码示例:

from influxdb_client import InfluxDBClient
# --- 配置连接信息 ---
# InfluxDB 服务器的 URL
url = "http://localhost:8086"
# 你的组织名
org = "my-org"
# 你的访问令牌
token = "your-secret-api-token"
# 存储桶的名称
bucket = "my-bucket"
# --- 创建客户端 ---
# 使用 'with' 语句可以确保连接在使用完毕后自动关闭
with InfluxDBClient(url=url, token=token, org=org) as client:
    print("Connected to InfluxDB.")
    # 在这里执行你的操作...
    # client.close() # 'with' 语句会自动调用此方法

写入数据

写入数据是操作 InfluxDB 最核心的功能之一,你需要构造一个 Point 对象,然后使用 WriteApi 将其写入。

核心概念:

  • Point: 代表一个单独的数据点,你需要指定:
    • measurement: 测量名称,相当于表名。
    • tag: 标签,用于索引和过滤,通常是字符串类型,如 location, device_id,标签是可选的。
    • field: 字段,存储实际的数据值,必须是数值、布尔或字符串类型,每个数据点至少需要一个字段。
    • time: 时间戳,如果不提供,客户端会自动使用当前服务器时间。

写入代码示例:

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS # 同步写入
# --- 配置连接信息 (同上) ---
url = "http://localhost:8086"
org = "my-org"
token = "your-secret-api-token"
bucket = "my-bucket"
# --- 写入数据 ---
with InfluxDBClient(url=url, token=token, org=org) as client:
    # 创建一个 WriteApi 实例
    write_api = client.write_api(write_options=SYNCHRONOUS)
    # 创建一个数据点
    # measurement: "weather"
    # tag: "location" (值为 "New York")
    # field: "temperature" (值为 72.5)
    # field: "humidity" (值为 68)
    point = Point("weather") \
        .tag("location", "New York") \
        .field("temperature", 72.5) \
        .field("humidity", 68)
    print(f"Writing point: {point}")
    # 写入数据到指定的存储桶和组织
    write_api.write(bucket=bucket, org=org, record=point)
    print("Data written successfully.")
    # 关闭 WriteApi
    write_api.close()

批量写入: 为了提高性能,推荐批量写入数据。

from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# ... (连接信息同上)
with InfluxDBClient(url=url, token=token, org=org) as client:
    write_api = client.write_api(write_options=SYNCHRONOUS)
    # 创建多个数据点
    points = []
    points.append(Point("weather").tag("location", "New York").field("temperature", 72.5))
    points.append(Point("weather").tag("location", "London").field("temperature", 65.2))
    points.append(Point("cpu_usage").tag("host", "server01").field("usage_percent", 45.8))
    # 使用列表一次性写入
    write_api.write(bucket=bucket, org=org, record=points)
    print("Batch data written successfully.")
    write_api.close()

查询数据

查询数据主要使用 QueryApi,InfluxDB 2.x 使用 Flux 查询语言,这是一种功能强大的数据脚本语言。

核心概念:

  • QueryApi: 用于执行 Flux 查询。
  • Flux 查询: 从 from() 开始,通过管道符 连接不同的操作,如 filter(), range(), mean() 等。

查询代码示例:

from influxdb_client import InfluxDBClient
from influxdb_client.client.query_api import QueryOptions
# ... (连接信息同上)
# Flux 查询语句
# 查询 "my-bucket" 中 "weather" measurement 的 "location" 为 "New York" 的所有数据
# 时间范围是过去 24 小时
flux_query = f'''
from(bucket:"{bucket}")
  |> range(start: -24h)
  |> filter(fn: (r) => r._measurement == "weather")
  |> filter(fn: (r) => r.location == "New York")
'''
with InfluxDBClient(url=url, token=token, org=org) as client:
    # 创建 QueryApi 实例
    # query_options=QueryOptions(language="flux") 明确指定使用 Flux 语言
    query_api = client.query_api(query_options=QueryOptions(language="flux"))
    # 执行查询
    # query_api.query() 返回一个生成器,每次迭代产生一个表的结果
    tables = query_api.query(flux_query, org=org)
    # 遍历查询结果
    for table in tables:
        for record in table.records:
            # record 包含了每一行的数据
            print(f"Time: {record.get_time()}")
            print(f"Measurement: {record.get_measurement()}")
            print(f"Location: {record.get_value_by_key('location')}")
            print(f"Temperature: {record.get_value_by_key('temperature')}")
            print(f"Field: {record.get_field()}, Value: {record.get_value()}")
            print("-" * 20)

删除数据

删除数据同样使用 Flux 查询,但需要通过 DeleteApi 来执行。

重要提示:

  • InfluxDB 的删除操作是基于谓词的,即你只能通过 measurement, tag, field 和时间范围来删除数据。
  • 你不能直接删除单个数据点,除非你能构造一个唯一匹配该点的谓词。
  • 删除操作通常是异步的。

删除代码示例:

from influxdb_client import InfluxDBClient
# ... (连接信息同上)
# 定义删除的谓词
# 删除 "my-bucket" 中 "weather" measurement,且 "location" 为 "London" 的所有数据
# 时间范围是过去 1 小时
start_time = "now() - 1h"
stop_time = "now()"
predicate = f'_measurement="weather" and location="London"'
with InfluxDBClient(url=url, token=token, org=org) as client:
    # 创建 DeleteApi 实例
    delete_api = client.delete_api()
    print(f"Deleting data with predicate: {predicate}")
    # 执行删除操作
    delete_api.delete(start=start_time, stop=stop_time, predicate=predicate, bucket=bucket, org=org)
    print("Delete request sent successfully.")

完整示例脚本

下面是一个将所有操作整合在一起的完整 Python 脚本。

# main.py
import time
from datetime import datetime, timedelta
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryOptions
# --- 1. 配置 ---
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_ORG = "my-org"
INFLUXDB_TOKEN = "your-secret-api-token" # 请替换成你的真实 Token
INFLUXDB_BUCKET = "my-bucket"
def main():
    # --- 2. 写入数据 ---
    print("--- Step 1: Writing Data ---")
    with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
        write_api = client.write_api(write_options=SYNCHRONOUS)
        # 模拟写入 10 个数据点
        for i in range(10):
            point = Point("sensor_data") \
                .tag("device_id", f"sensor-{i % 3}") \
                .field("value", i * 10) \
                .time(datetime.utcnow() - timedelta(minutes=i), write_precision="s") # 使用过去的时间戳
            write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point)
            print(f"Wrote: {point}")
        write_api.close()
    print("Data writing complete.\n")
    # 等待 1 秒,确保数据被写入
    time.sleep(1)
    # --- 3. 查询数据 ---
    print("--- Step 2: Querying Data ---")
    flux_query = f'''
    from(bucket:"{INFLUXDB_BUCKET}")
      |> range(start: -10m)
      |> filter(fn: (r) => r._measurement == "sensor_data")
      |> filter(fn: (r) => r.device_id == "sensor-1")
      |> sort(columns: ["_time"], desc: false)
    '''
    with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
        query_api = client.query_api(query_options=QueryOptions(language="flux"))
        tables = query_api.query(flux_query, org=INFLUXDB_ORG)
        if not tables:
            print("No data found matching the query.")
        else:
            for table in tables:
                for record in table.records:
                    print(f"Time: {record.get_time()}, Device: {record.get_value_by_key('device_id')}, Value: {record.get_value()}")
    print("Querying complete.\n")
    # --- 4. 删除数据 ---
    print("--- Step 3: Deleting Data ---")
    # 删除 device_id 为 "sensor-2" 的所有数据
    delete_predicate = '_measurement="sensor_data" and device_id="sensor-2"'
    with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
        delete_api = client.delete_api()
        delete_api.delete(start="now() - 1h", stop="now()", predicate=delete_predicate, bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG)
        print(f"Delete request sent for predicate: {delete_predicate}")
    print("Deleting complete.\n")
    # --- 5. 验证删除 ---
    print("--- Step 4: Verifying Deletion ---")
    flux_verify_query = f'''
    from(bucket:"{INFLUXDB_BUCKET}")
      |> range(start: -10m)
      |> filter(fn: (r) => r._measurement == "sensor_data")
      |> filter(fn: (r) => r.device_id == "sensor-2")
    '''
    with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
        query_api = client.query_api(query_options=QueryOptions(language="flux"))
        tables = query_api.query(flux_verify_query, org=INFLUXDB_ORG)
        if not tables:
            print("Verification successful: No data found for 'sensor-2'.")
        else:
            print("Verification failed: Data for 'sensor-2' still exists.")
            for table in tables:
                for record in table.records:
                    print(f"Found remaining data: Time: {record.get_time()}, Value: {record.get_value()}")
if __name__ == "__main__":
    main()

总结与最佳实践

  1. 使用 with 语句:确保客户端连接在使用完毕后能被正确关闭,防止资源泄漏。
  2. 批量写入:对于大量数据,始终使用批量写入(传入一个 Point 列表),性能远高于逐条写入。
  3. 标签 vs. 字段
    • 用于过滤和分组,数据量小、基数低(不同值少)、通常是字符串。location, user_id
    • 字段:存储实际的数据值,数据量大、基数高、通常是数值。temperature, cpu_usage
    • 经验法则:如果你打算用这个字段进行 filtergroup by,就把它设为 tag
  4. 异步写入:对于高性能应用,可以使用 write_options=ASYNCHRONOUS 来获得更好的写入吞吐量,但要注意处理可能发生的错误。
  5. 错误处理:在实际生产环境中,应该用 try...except 块来捕获和处理可能发生的网络错误或 API 错误。
  6. 环境变量:不要将 Token 和 URL 等敏感信息硬编码在代码中,使用环境变量(如 os.environ)或配置文件来管理它们。

希望这份详细的指南能帮助你顺利地使用 Python 操作 InfluxDB!

分享:
扫描分享到社交APP
上一篇
下一篇