杰瑞科技汇

Logstash Python插件如何高效处理数据?

Logstash 的 Python 插件主要分为两种,它们的工作方式和用途完全不同,理解它们的区别至关重要:

  1. logstash-filter-python: 一个输入过滤器,用于在 Logstash 管道中对事件数据进行处理。
  2. logstash-input-exec: 一个输入插件,通过执行外部 Python 脚本来生成新的事件。

下面我们分别对这两种进行详细说明,并提供完整的配置示例。


logstash-filter-python 过滤器插件

这是最常用的“Python 插件”,它允许你在 Logstash 管道中嵌入一段 Python 代码来处理事件数据,它的核心优势在于:

  • 灵活性:当 Logstash 内置的过滤器(如 grok, mutate, ruby)无法满足复杂的逻辑时,可以使用 Python。
  • 功能强大:可以轻松调用 Python 丰富的标准库和第三方库(如 pandas, requests, numpy 等)进行数据计算、网络请求、文件操作等。
  • 无缝集成:处理后的数据可以无缝地流回 Logstash 管道,供后续插件使用。

工作原理

logstash-filter-python 插件会启动一个嵌入式的 Python 解释器,当事件通过管道时,它会将事件数据(一个字典)传递给你的 Python 函数,你的函数对这个字典进行修改,然后将修改后的事件返回给 Logstash。

安装

# 在你的 Logstash 安装目录下运行
./bin/logstash-plugin install logstash-filter-python

配置示例

假设我们有一个日志文件,其中包含一个 JSON 格式的消息,我们想用 Python 来解析并提取其中的特定字段。

输入日志 (app.log):

{"timestamp": "2025-10-27T10:00:00Z", "level": "INFO", "message": "User 'alice' logged in from 192.168.1.100", "user_ip": "192.168.1.100"}

Logstash 配置文件 (python-filter.conf):

input {
  file {
    path => "/path/to/your/app.log"
    start_position => "beginning"
    sincedb_path => "/dev/null" # 仅用于测试,避免记录位置
  }
}
filter {
  # 1. 将 message 字段的内容解析为 JSON 对象
  #    这样,user_ip 字段就会变成事件的一个顶级字段
  json {
    source => "message"
    target => "parsed_message"
  }
  # 2. 使用 python 插件进行复杂处理
  python {
    # 指定要执行的 Python 代码
    code => "
      # 'event' 是 Logstash 传递给 Python 函数的事件对象,它是一个字典
      # 我们可以直接修改它
      # 检查是否存在 'parsed_message' 和 'user_ip' 字段
      if event.get('parsed_message') and event.get('parsed_message', {}).get('user_ip'):
        original_ip = event['parsed_message']['user_ip']
        # 示例:将 IP 地址转换为十六进制格式(只是一个演示)
        # 注意:这只是一个简单的示例,实际中你可能使用 ipaddress 库
        try:
          # 分割 IP 并转换为十进制,然后转换为十六进制
          octets = original_ip.split('.')
          hex_ip = hex(int(octets[0]))[2:] + hex(int(octets[1]))[2:] + hex(int(octets[2]))[2:] + hex(int(octets[3]))[2:]
          event['user_ip_hex'] = hex_ip.upper()
        except Exception:
          event['user_ip_hex'] = 'CONVERSION_ERROR'
      # 添加一个由 Python 生成的字段
      event['processed_by'] = 'python_filter'
      # 你也可以删除字段
      # if event.get('parsed_message'):
      #   event.remove('parsed_message')
    "
  }
  # (可选) 清理中间字段
  if [parsed_message] {
    mutate {
      remove_field => [ "parsed_message" ]
    }
  }
}
output {
  stdout { codec => rubydebug }
  # 也可以输出到 Elasticsearch
  # elasticsearch {
  #   hosts => ["http://localhost:9200"]
  #   index => "app-logs-python"
  # }
}

运行 Logstash:

./bin/logstash -f python-filter.conf

预期输出: 你会看到类似下面的输出,user_ip_hexprocessed_by 字段是由 Python 代码动态添加的。

{
       "message" => "{\"timestamp\": \"2025-10-27T10:00:00Z\", \"level\": \"INFO\", \"message\": \"User 'alice' logged in from 192.168.1.100\", \"user_ip\": \"192.168.1.100\"}",
      "host" => "your-hostname",
    "@timestamp" => 2025-10-27T10:00:00.000Z,
      "@version" => "1",
       "path" => "/path/to/your/app.log",
      "level" => "INFO",
  "user_ip_hex" => "C0A80164",  # 192.168.1.100 的十六进制表示
  "processed_by" => "python_filter",
    "timestamp" => "2025-10-27T10:00:00Z",
      "user_ip" => "192.168.1.100"
}

logstash-input-exec 输入插件

这个插件本身不是“Python 插件”,而是一个通用的“执行脚本”输入插件,它通过调用系统的命令行来执行一个 Python 脚本,然后将该脚本的标准输出 解析为 Logstash 事件。

工作原理

  1. Logstash 启动 logstash-input-exec 插件。
  2. 插件按照设定的间隔(interval)执行你指定的 Python 命令(如 python /path/to/your/script.py)。
  3. Python 脚本将需要生成的事件数据以JSON 格式打印到标准输出。
  4. exec 插件捕获这些 JSON 行,并将每一行转换成一个 Logstash 事件。
  5. 这个新生成的事件会进入 Logstash 管道的下一个阶段(filteroutput)。

安装

这个插件是 Logstash 核心的一部分,通常已经预装,无需额外安装。

配置示例

假设我们有一个 Python 脚本,它从某个 API 获取数据并生成 JSON。

Python 脚本 (data_fetcher.py):

#!/usr/bin/env python3
import json
import time
import random
# 模拟从 API 获取数据
def get_data():
    data = {
        "service": "user_metrics",
        "timestamp": int(time.time()),
        "active_users": random.randint(100, 500),
        "error_rate": round(random.uniform(0.0, 5.0), 2)
    }
    # 必须将字典序列化为 JSON 字符串并打印到 stdout
    print(json.dumps(data))
if __name__ == "__main__":
    get_data()

Logstash 配置文件 (exec-input.conf):

input {
  exec {
    # command: 要执行的命令
    # interval: 执行间隔(秒)
    # mode: "spawn" 表示在后台持续运行,这是默认值
    command => "python /path/to/your/data_fetcher.py"
    interval => 10
    mode => "spawn"
    # (可选) 如果脚本输出多行,可以这样处理
    # codec => "json_lines" 
  }
}
# 你可以在这里添加过滤器,但通常从 exec 输入的数据已经是最终格式
# filter {
#   # 添加一个标签
#   mutate {
#     add_tag => ["api_data"]
#   }
# }
output {
  stdout { codec => rubydebug }
  # 输出到 Elasticsearch
  # elasticsearch {
  #   hosts => ["http://localhost:9200"]
  #   index => "api-metrics"
  # }
}

运行 Logstash:

./bin/logstash -f exec-input.conf

预期输出: 每 10 秒,你会看到由 Python 脚本生成的新事件。

{
       "host" => "your-hostname",
    "@timestamp" => 2025-10-27T10:00:10.000Z,
      "@version" => "1",
       "path" => "/path/to/your/data_fetcher.py",
    "error_rate" => 1.23,
    "active_users" => 345,
      "service" => "user_metrics",
    "timestamp" => 1698384010
}

总结与对比

特性 logstash-filter-python logstash-input-exec
类型 过滤器 输入
用途 已有事件进行修改、增强、转换 外部源通过脚本生成新事件
数据流 事件进入 Python 函数 -> 修改后返回 Logstash Python 脚本输出 JSON -> exec 插件解析成新事件
集成度 ,直接在 Logstash 进程中运行,性能较好 ,通过进程间通信,有额外的启动和解析开销
适用场景 复杂的字段解析、调用 Python 库进行计算、条件性逻辑 定时任务、从非标准数据源拉取数据、作为数据源适配器
性能 通常比 exec 更快,因为它避免了创建新进程 较慢,因为每次执行都需要启动一个新 Python 进程
代码风格 code 字段内直接写 Python 函数体 写一个独立的、可执行的 Python 脚本,并使用 print(json.dumps(...)) 输出

如何选择?

  • 选择 logstash-filter-python:如果你已经有 Logstash 事件,只是需要用 Python 的强大功能来处理它,这是首选,解析一个复杂的自定义日志格式、调用一个 REST API 来验证数据、使用 pandas 进行数据分析等。
  • 选择 logstash-input-exec:如果你需要创建全新的 Logstash 事件,而这些事件需要通过一个独立的脚本来生成,每分钟从数据库查询一次状态、从一个 IoT 设备读取数据、或者作为其他系统数据流的适配器。

希望这个详细的解释能帮助你理解并正确使用 Logstash 的 Python 相关功能!

分享:
扫描分享到社交APP
上一篇
下一篇