InfluxDB 是一个专门为时间序列数据设计的数据库,Python 操作它也围绕着时间序列的核心概念:测量、标签、字段、时间戳。
我们将主要介绍官方推荐的 influxdb-client-python 库,因为它功能强大、支持最新的 InfluxDB 3.x 和 2.x API,并且是未来发展的方向。
安装 InfluxDB 客户端库
你需要安装官方的 Python 客户端库,打开你的终端或命令行,运行以下命令:
pip install influxdb-client
连接到 InfluxDB
在操作数据库之前,你需要建立一个连接,你需要提供 InfluxDB 服务器的 URL、你的组织名、访问令牌。
如何获取访问令牌? 在 InfluxDB UI 中:
- 进入 Data (数据) 标签页。
- 选择一个 Bucket (存储桶)。
- 点击右上角的 API Tokens (API 令牌)。
- 点击 Generate API Token (生成 API 令牌)。
- 给令牌一个名字,并设置适当的权限(读写权限),然后生成并复制令牌。
连接代码示例:
from influxdb_client import InfluxDBClient
# --- 配置连接信息 ---
# InfluxDB 服务器的 URL
url = "http://localhost:8086"
# 你的组织名
org = "my-org"
# 你的访问令牌
token = "your-secret-api-token"
# 存储桶的名称
bucket = "my-bucket"
# --- 创建客户端 ---
# 使用 'with' 语句可以确保连接在使用完毕后自动关闭
with InfluxDBClient(url=url, token=token, org=org) as client:
print("Connected to InfluxDB.")
# 在这里执行你的操作...
# client.close() # 'with' 语句会自动调用此方法
写入数据
写入数据是操作 InfluxDB 最核心的功能之一,你需要构造一个 Point 对象,然后使用 WriteApi 将其写入。
核心概念:
Point: 代表一个单独的数据点,你需要指定:measurement: 测量名称,相当于表名。tag: 标签,用于索引和过滤,通常是字符串类型,如location,device_id,标签是可选的。field: 字段,存储实际的数据值,必须是数值、布尔或字符串类型,每个数据点至少需要一个字段。time: 时间戳,如果不提供,客户端会自动使用当前服务器时间。
写入代码示例:
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS # 同步写入
# --- 配置连接信息 (同上) ---
url = "http://localhost:8086"
org = "my-org"
token = "your-secret-api-token"
bucket = "my-bucket"
# --- 写入数据 ---
with InfluxDBClient(url=url, token=token, org=org) as client:
# 创建一个 WriteApi 实例
write_api = client.write_api(write_options=SYNCHRONOUS)
# 创建一个数据点
# measurement: "weather"
# tag: "location" (值为 "New York")
# field: "temperature" (值为 72.5)
# field: "humidity" (值为 68)
point = Point("weather") \
.tag("location", "New York") \
.field("temperature", 72.5) \
.field("humidity", 68)
print(f"Writing point: {point}")
# 写入数据到指定的存储桶和组织
write_api.write(bucket=bucket, org=org, record=point)
print("Data written successfully.")
# 关闭 WriteApi
write_api.close()
批量写入: 为了提高性能,推荐批量写入数据。
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
# ... (连接信息同上)
with InfluxDBClient(url=url, token=token, org=org) as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
# 创建多个数据点
points = []
points.append(Point("weather").tag("location", "New York").field("temperature", 72.5))
points.append(Point("weather").tag("location", "London").field("temperature", 65.2))
points.append(Point("cpu_usage").tag("host", "server01").field("usage_percent", 45.8))
# 使用列表一次性写入
write_api.write(bucket=bucket, org=org, record=points)
print("Batch data written successfully.")
write_api.close()
查询数据
查询数据主要使用 QueryApi,InfluxDB 2.x 使用 Flux 查询语言,这是一种功能强大的数据脚本语言。
核心概念:
QueryApi: 用于执行 Flux 查询。- Flux 查询: 从
from()开始,通过管道符 连接不同的操作,如filter(),range(),mean()等。
查询代码示例:
from influxdb_client import InfluxDBClient
from influxdb_client.client.query_api import QueryOptions
# ... (连接信息同上)
# Flux 查询语句
# 查询 "my-bucket" 中 "weather" measurement 的 "location" 为 "New York" 的所有数据
# 时间范围是过去 24 小时
flux_query = f'''
from(bucket:"{bucket}")
|> range(start: -24h)
|> filter(fn: (r) => r._measurement == "weather")
|> filter(fn: (r) => r.location == "New York")
'''
with InfluxDBClient(url=url, token=token, org=org) as client:
# 创建 QueryApi 实例
# query_options=QueryOptions(language="flux") 明确指定使用 Flux 语言
query_api = client.query_api(query_options=QueryOptions(language="flux"))
# 执行查询
# query_api.query() 返回一个生成器,每次迭代产生一个表的结果
tables = query_api.query(flux_query, org=org)
# 遍历查询结果
for table in tables:
for record in table.records:
# record 包含了每一行的数据
print(f"Time: {record.get_time()}")
print(f"Measurement: {record.get_measurement()}")
print(f"Location: {record.get_value_by_key('location')}")
print(f"Temperature: {record.get_value_by_key('temperature')}")
print(f"Field: {record.get_field()}, Value: {record.get_value()}")
print("-" * 20)
删除数据
删除数据同样使用 Flux 查询,但需要通过 DeleteApi 来执行。
重要提示:
- InfluxDB 的删除操作是基于谓词的,即你只能通过
measurement,tag,field和时间范围来删除数据。 - 你不能直接删除单个数据点,除非你能构造一个唯一匹配该点的谓词。
- 删除操作通常是异步的。
删除代码示例:
from influxdb_client import InfluxDBClient
# ... (连接信息同上)
# 定义删除的谓词
# 删除 "my-bucket" 中 "weather" measurement,且 "location" 为 "London" 的所有数据
# 时间范围是过去 1 小时
start_time = "now() - 1h"
stop_time = "now()"
predicate = f'_measurement="weather" and location="London"'
with InfluxDBClient(url=url, token=token, org=org) as client:
# 创建 DeleteApi 实例
delete_api = client.delete_api()
print(f"Deleting data with predicate: {predicate}")
# 执行删除操作
delete_api.delete(start=start_time, stop=stop_time, predicate=predicate, bucket=bucket, org=org)
print("Delete request sent successfully.")
完整示例脚本
下面是一个将所有操作整合在一起的完整 Python 脚本。
# main.py
import time
from datetime import datetime, timedelta
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from influxdb_client.client.query_api import QueryOptions
# --- 1. 配置 ---
INFLUXDB_URL = "http://localhost:8086"
INFLUXDB_ORG = "my-org"
INFLUXDB_TOKEN = "your-secret-api-token" # 请替换成你的真实 Token
INFLUXDB_BUCKET = "my-bucket"
def main():
# --- 2. 写入数据 ---
print("--- Step 1: Writing Data ---")
with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
write_api = client.write_api(write_options=SYNCHRONOUS)
# 模拟写入 10 个数据点
for i in range(10):
point = Point("sensor_data") \
.tag("device_id", f"sensor-{i % 3}") \
.field("value", i * 10) \
.time(datetime.utcnow() - timedelta(minutes=i), write_precision="s") # 使用过去的时间戳
write_api.write(bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG, record=point)
print(f"Wrote: {point}")
write_api.close()
print("Data writing complete.\n")
# 等待 1 秒,确保数据被写入
time.sleep(1)
# --- 3. 查询数据 ---
print("--- Step 2: Querying Data ---")
flux_query = f'''
from(bucket:"{INFLUXDB_BUCKET}")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r.device_id == "sensor-1")
|> sort(columns: ["_time"], desc: false)
'''
with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
query_api = client.query_api(query_options=QueryOptions(language="flux"))
tables = query_api.query(flux_query, org=INFLUXDB_ORG)
if not tables:
print("No data found matching the query.")
else:
for table in tables:
for record in table.records:
print(f"Time: {record.get_time()}, Device: {record.get_value_by_key('device_id')}, Value: {record.get_value()}")
print("Querying complete.\n")
# --- 4. 删除数据 ---
print("--- Step 3: Deleting Data ---")
# 删除 device_id 为 "sensor-2" 的所有数据
delete_predicate = '_measurement="sensor_data" and device_id="sensor-2"'
with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
delete_api = client.delete_api()
delete_api.delete(start="now() - 1h", stop="now()", predicate=delete_predicate, bucket=INFLUXDB_BUCKET, org=INFLUXDB_ORG)
print(f"Delete request sent for predicate: {delete_predicate}")
print("Deleting complete.\n")
# --- 5. 验证删除 ---
print("--- Step 4: Verifying Deletion ---")
flux_verify_query = f'''
from(bucket:"{INFLUXDB_BUCKET}")
|> range(start: -10m)
|> filter(fn: (r) => r._measurement == "sensor_data")
|> filter(fn: (r) => r.device_id == "sensor-2")
'''
with InfluxDBClient(url=INFLUXDB_URL, token=INFLUXDB_TOKEN, org=INFLUXDB_ORG) as client:
query_api = client.query_api(query_options=QueryOptions(language="flux"))
tables = query_api.query(flux_verify_query, org=INFLUXDB_ORG)
if not tables:
print("Verification successful: No data found for 'sensor-2'.")
else:
print("Verification failed: Data for 'sensor-2' still exists.")
for table in tables:
for record in table.records:
print(f"Found remaining data: Time: {record.get_time()}, Value: {record.get_value()}")
if __name__ == "__main__":
main()
总结与最佳实践
- 使用
with语句:确保客户端连接在使用完毕后能被正确关闭,防止资源泄漏。 - 批量写入:对于大量数据,始终使用批量写入(传入一个
Point列表),性能远高于逐条写入。 - 标签 vs. 字段:
- 用于过滤和分组,数据量小、基数低(不同值少)、通常是字符串。
location,user_id。 - 字段:存储实际的数据值,数据量大、基数高、通常是数值。
temperature,cpu_usage。 - 经验法则:如果你打算用这个字段进行
filter或group by,就把它设为tag。
- 用于过滤和分组,数据量小、基数低(不同值少)、通常是字符串。
- 异步写入:对于高性能应用,可以使用
write_options=ASYNCHRONOUS来获得更好的写入吞吐量,但要注意处理可能发生的错误。 - 错误处理:在实际生产环境中,应该用
try...except块来捕获和处理可能发生的网络错误或 API 错误。 - 环境变量:不要将 Token 和 URL 等敏感信息硬编码在代码中,使用环境变量(如
os.environ)或配置文件来管理它们。
希望这份详细的指南能帮助你顺利地使用 Python 操作 InfluxDB!
