Logstash 的 Python 插件主要分为两种,它们的工作方式和用途完全不同,理解它们的区别至关重要:
logstash-filter-python: 一个输入过滤器,用于在 Logstash 管道中对事件数据进行处理。logstash-input-exec: 一个输入插件,通过执行外部 Python 脚本来生成新的事件。
下面我们分别对这两种进行详细说明,并提供完整的配置示例。
logstash-filter-python 过滤器插件
这是最常用的“Python 插件”,它允许你在 Logstash 管道中嵌入一段 Python 代码来处理事件数据,它的核心优势在于:
- 灵活性:当 Logstash 内置的过滤器(如
grok,mutate,ruby)无法满足复杂的逻辑时,可以使用 Python。 - 功能强大:可以轻松调用 Python 丰富的标准库和第三方库(如
pandas,requests,numpy等)进行数据计算、网络请求、文件操作等。 - 无缝集成:处理后的数据可以无缝地流回 Logstash 管道,供后续插件使用。
工作原理
logstash-filter-python 插件会启动一个嵌入式的 Python 解释器,当事件通过管道时,它会将事件数据(一个字典)传递给你的 Python 函数,你的函数对这个字典进行修改,然后将修改后的事件返回给 Logstash。
安装
# 在你的 Logstash 安装目录下运行 ./bin/logstash-plugin install logstash-filter-python
配置示例
假设我们有一个日志文件,其中包含一个 JSON 格式的消息,我们想用 Python 来解析并提取其中的特定字段。
输入日志 (app.log):
{"timestamp": "2025-10-27T10:00:00Z", "level": "INFO", "message": "User 'alice' logged in from 192.168.1.100", "user_ip": "192.168.1.100"}
Logstash 配置文件 (python-filter.conf):
input {
file {
path => "/path/to/your/app.log"
start_position => "beginning"
sincedb_path => "/dev/null" # 仅用于测试,避免记录位置
}
}
filter {
# 1. 将 message 字段的内容解析为 JSON 对象
# 这样,user_ip 字段就会变成事件的一个顶级字段
json {
source => "message"
target => "parsed_message"
}
# 2. 使用 python 插件进行复杂处理
python {
# 指定要执行的 Python 代码
code => "
# 'event' 是 Logstash 传递给 Python 函数的事件对象,它是一个字典
# 我们可以直接修改它
# 检查是否存在 'parsed_message' 和 'user_ip' 字段
if event.get('parsed_message') and event.get('parsed_message', {}).get('user_ip'):
original_ip = event['parsed_message']['user_ip']
# 示例:将 IP 地址转换为十六进制格式(只是一个演示)
# 注意:这只是一个简单的示例,实际中你可能使用 ipaddress 库
try:
# 分割 IP 并转换为十进制,然后转换为十六进制
octets = original_ip.split('.')
hex_ip = hex(int(octets[0]))[2:] + hex(int(octets[1]))[2:] + hex(int(octets[2]))[2:] + hex(int(octets[3]))[2:]
event['user_ip_hex'] = hex_ip.upper()
except Exception:
event['user_ip_hex'] = 'CONVERSION_ERROR'
# 添加一个由 Python 生成的字段
event['processed_by'] = 'python_filter'
# 你也可以删除字段
# if event.get('parsed_message'):
# event.remove('parsed_message')
"
}
# (可选) 清理中间字段
if [parsed_message] {
mutate {
remove_field => [ "parsed_message" ]
}
}
}
output {
stdout { codec => rubydebug }
# 也可以输出到 Elasticsearch
# elasticsearch {
# hosts => ["http://localhost:9200"]
# index => "app-logs-python"
# }
}
运行 Logstash:
./bin/logstash -f python-filter.conf
预期输出:
你会看到类似下面的输出,user_ip_hex 和 processed_by 字段是由 Python 代码动态添加的。
{
"message" => "{\"timestamp\": \"2025-10-27T10:00:00Z\", \"level\": \"INFO\", \"message\": \"User 'alice' logged in from 192.168.1.100\", \"user_ip\": \"192.168.1.100\"}",
"host" => "your-hostname",
"@timestamp" => 2025-10-27T10:00:00.000Z,
"@version" => "1",
"path" => "/path/to/your/app.log",
"level" => "INFO",
"user_ip_hex" => "C0A80164", # 192.168.1.100 的十六进制表示
"processed_by" => "python_filter",
"timestamp" => "2025-10-27T10:00:00Z",
"user_ip" => "192.168.1.100"
}
logstash-input-exec 输入插件
这个插件本身不是“Python 插件”,而是一个通用的“执行脚本”输入插件,它通过调用系统的命令行来执行一个 Python 脚本,然后将该脚本的标准输出 解析为 Logstash 事件。
工作原理
- Logstash 启动
logstash-input-exec插件。 - 插件按照设定的间隔(
interval)执行你指定的 Python 命令(如python /path/to/your/script.py)。 - Python 脚本将需要生成的事件数据以JSON 格式打印到标准输出。
exec插件捕获这些 JSON 行,并将每一行转换成一个 Logstash 事件。- 这个新生成的事件会进入 Logstash 管道的下一个阶段(
filter和output)。
安装
这个插件是 Logstash 核心的一部分,通常已经预装,无需额外安装。
配置示例
假设我们有一个 Python 脚本,它从某个 API 获取数据并生成 JSON。
Python 脚本 (data_fetcher.py):
#!/usr/bin/env python3
import json
import time
import random
# 模拟从 API 获取数据
def get_data():
data = {
"service": "user_metrics",
"timestamp": int(time.time()),
"active_users": random.randint(100, 500),
"error_rate": round(random.uniform(0.0, 5.0), 2)
}
# 必须将字典序列化为 JSON 字符串并打印到 stdout
print(json.dumps(data))
if __name__ == "__main__":
get_data()
Logstash 配置文件 (exec-input.conf):
input {
exec {
# command: 要执行的命令
# interval: 执行间隔(秒)
# mode: "spawn" 表示在后台持续运行,这是默认值
command => "python /path/to/your/data_fetcher.py"
interval => 10
mode => "spawn"
# (可选) 如果脚本输出多行,可以这样处理
# codec => "json_lines"
}
}
# 你可以在这里添加过滤器,但通常从 exec 输入的数据已经是最终格式
# filter {
# # 添加一个标签
# mutate {
# add_tag => ["api_data"]
# }
# }
output {
stdout { codec => rubydebug }
# 输出到 Elasticsearch
# elasticsearch {
# hosts => ["http://localhost:9200"]
# index => "api-metrics"
# }
}
运行 Logstash:
./bin/logstash -f exec-input.conf
预期输出: 每 10 秒,你会看到由 Python 脚本生成的新事件。
{
"host" => "your-hostname",
"@timestamp" => 2025-10-27T10:00:10.000Z,
"@version" => "1",
"path" => "/path/to/your/data_fetcher.py",
"error_rate" => 1.23,
"active_users" => 345,
"service" => "user_metrics",
"timestamp" => 1698384010
}
总结与对比
| 特性 | logstash-filter-python |
logstash-input-exec |
|---|---|---|
| 类型 | 过滤器 | 输入 |
| 用途 | 对已有事件进行修改、增强、转换 | 从外部源通过脚本生成新事件 |
| 数据流 | 事件进入 Python 函数 -> 修改后返回 Logstash | Python 脚本输出 JSON -> exec 插件解析成新事件 |
| 集成度 | 高,直接在 Logstash 进程中运行,性能较好 | 低,通过进程间通信,有额外的启动和解析开销 |
| 适用场景 | 复杂的字段解析、调用 Python 库进行计算、条件性逻辑 | 定时任务、从非标准数据源拉取数据、作为数据源适配器 |
| 性能 | 通常比 exec 更快,因为它避免了创建新进程 |
较慢,因为每次执行都需要启动一个新 Python 进程 |
| 代码风格 | 在 code 字段内直接写 Python 函数体 |
写一个独立的、可执行的 Python 脚本,并使用 print(json.dumps(...)) 输出 |
如何选择?
- 选择
logstash-filter-python:如果你已经有 Logstash 事件,只是需要用 Python 的强大功能来处理它,这是首选,解析一个复杂的自定义日志格式、调用一个 REST API 来验证数据、使用pandas进行数据分析等。 - 选择
logstash-input-exec:如果你需要创建全新的 Logstash 事件,而这些事件需要通过一个独立的脚本来生成,每分钟从数据库查询一次状态、从一个 IoT 设备读取数据、或者作为其他系统数据流的适配器。
希望这个详细的解释能帮助你理解并正确使用 Logstash 的 Python 相关功能!
