Zabbix API 批量日志监控

基于 Zabbix API 和 Python 实现批量日志监控自动化,通过 Excel 导入监控配置,一键创建监控项和触发器,适用于服务器日志的大量监控场景。

概述

基于 Zabbix API 和 Python 实现批量日志监控自动化,通过 Excel 导入监控配置,一键创建监控项和触发器,适用于服务器日志的大量监控场景。

一、环境准备与基础方法

1. 环境准备

依赖项 版本要求

Python >= 3.6

Zabbix Server (本文基于 6.4 验证)

pip install requests pandas openpyxl

前置条件:Zabbix Server 已启用日志监控所需的 logrt / log 相关 Agent 主动/被动检查,目标主机上 zabbix_agent2.conf 已配置日志文件访问权限。

2. 登录认证

Zabbix API 采用 JSON-RPC 2.0 协议,操作需携带认证 Token。通过 user.login 方法获取:

import requests

import json

requests.packages.urllib3.disable_warnings()

class PerseusZAPIClient:

    def __init__(self, url: str, user: str, pwd: str):

        self.url = url + "/api_jsonrpc.php"

        self.user = user

        self.pwd = pwd

        self.token = None

        self.headers = {"Content-Type": "application/json"}

        self._login()

    def _login(self):

        payload = {"jsonrpc":"2.0","method":"user.login",

                   "params":{"username":self.user,"password":self.pwd},

                   "id":1,"auth":None}

        res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)

        self.token = res.json()["result"]

        print(f"✅ 认证成功,Token: {self.token[:10]}...")

verify=False:内网环境常使用自签名证书,生产环境建议配置 CA 证书路径。

3. 主机查询与监控项检查

批量创建前需要获取主机 ID,同时使用 check_item 检查监控项是否已存在以避免重复创建:

def get_host_id(self, host):

    """根据主机名获取主机 ID"""

    payload = {"jsonrpc":"2.0","method":"host.get",

               "params":{"filter":{"host":[host]},"output":["hostid"]},

               "id":2,"auth":self.token}

    res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)

    return res.json()["result"][0]["hostid"]

def check_item(self, host_id, key):

    """检查监控项是否已存在,返回 (是否存在, item_id)"""

    payload = {"jsonrpc":"2.0","method":"item.get",

               "params":{"hostids":[host_id],"filter":{"key_":key},"output":["itemid"]},

               "id":3,"auth":self.token}

    res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)

    res_json = res.json()

    return (True, res_json["result"][0]["itemid"]) if res_json["result"] else (False, "")

二、创建日志监控项

1. 日志监控项类型

Zabbix 原生支持以下日志类 Item 类型:

类型键值 说明 关键参数

log 监控固定路径日志文件 file, regexp

logrt 监控轮转日志(支持通配符/正则) file, regexp, output

Key 自动构建规则(调用脚本中根据"发现关键字次数"自动选择):

发现关键字次数 生成的 Key 说明

非空(如 0) logrt.count["{path}","{keywords}"] 计数模式,配合 last() 触发器

为空 logrt["{path}", "{keywords}",, skip] 匹配模式,配合 find() 触发器

2. 创建监控项实现

参数说明:

参数 类型 说明

host_id str 目标主机 ID

items str 监控项 Key(如 logrt[...])

delay str 采集间隔

name str 监控项名称

hostname str 主机显示名(用于日志输出)

item_type / value_type 说明:

创建时固定 type=7(Zabbix agent 主动式)、value_type=2(日志),同时自动添加 Application: 日志 标签用于分类筛选。

item_type 说明 value_type 说明

0 Zabbix agent 0 浮点数

2 Zabbix trapper 1 字符

7 Zabbix agent (主动式) 2 日志

19 HTTP agent 3 整数

21 脚本 4 文本

def create_item(self, host_id, items, delay, name, hostname):

    """创建单个日志监控项,已存在则抛出 ITEM_EXISTS 异常"""

    exists, item_id = self.check_item(host_id, items)

    if exists:

        raise Exception(f"ITEM_EXISTS:{item_id}")

    payload = {"jsonrpc":"2.0","method":"item.create",

               "params":{"hostid":host_id,"name":name,"key_":items,

                         "type":7,"value_type":2,"interfaceid":0,"delay":delay,

                         "tags":[{"tag":"Application","value":"日志"}]},

               "id":4,"auth":self.token}

    res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)

    res_json = res.json()

    if "error" in res_json:

        raise Exception(f"项创建失败: {res_json['error']['message']} - {res_json['error'].get('data','')}")

    item_id = res_json["result"]["itemids"][0]

    print(f"✅ 成功创建监控项 {hostname} (ID: {item_id})")

    return item_id

已存在的监控项会抛出 ITEM_EXISTS:{item_id} 异常,上层 batch_create 会捕获并复用已有 ID,保证幂等性。

3. Key 语法参考

log[file,<regexp>,<encoding>,<maxlines>,<mode>,<output>,<maxdelay>,<options>]

logrt[file_regexp,<regexp>,<encoding>,<maxlines>,<mode>,<output>,<maxdelay>,<options>]

# 匹配模式 - 关键字告警(skip 跳过旧数据)

logrt["/var/log/nginx/error.log.*", "5\d{2}",, skip]

# 计数模式 - 次数超阈值告警

logrt.count["/var/log/nginx/error.log.*","5\d{2}"]

三、创建日志触发器

1. 触发器优先级

脚本使用中文等级映射:

值 等级 值 等级

0 未分类 1 信息

2 警告 3 次要

4 严重 5 紧急

severity_reverse = {"信息": 1, "警告": 2, "次要": 3, "严重": 4, "紧急": 5}

2. 创建触发器实现

参数说明:

参数 类型 说明

host str Zabbix 主机名(用于构建表达式路径)

items str 监控项 Key

trigger_name str 触发器名称

trigger_level str 严重等级(信息/警告/次要/严重/紧急)

trigger_count str 阈值,非空时使用 last() 表达式

keywords str 匹配关键字,使用 find() + regexp 表达式

表达式自动构建逻辑:

条件 生成的表达式 说明

trigger_count 非空 last(/host/key) > {trigger_count} 值超阈值触发

trigger_count 为空 find(/host/key,,"regexp","{keywords}") = 1 匹配关键字触发

def create_trigger(self, host, items, trigger_name, trigger_level, trigger_count, keywords):

    """创建单个日志触发器,自动构建表达式"""

    severity_reverse = {"信息": 1, "警告": 2, "次要": 3, "严重": 4, "紧急": 5}

    priority = severity_reverse.get(trigger_level, 4)

    expr = f'find(/{host}/{items},,"regexp","{keywords}")=1'

    if trigger_count != "":

        expr = f'last(/{host}/{items})>{trigger_count}'

    payload = {"jsonrpc":"2.0","method":"trigger.create",

               "params":{"description":trigger_name,"expression":expr,

                         "manual_close":1,"priority":priority,

                         "tags":[{"tag":"Application","value":"日志"}]},

               "id":5,"auth":self.token}

    res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)

    res_json = res.json()

    if "error" in res_json:

        raise Exception(f"API错误: {res_json['error']['message']} - {res_json['error'].get('data','')}")

    trigger_id = res_json["result"]["triggerids"][0]

    print(f"✅ 成功创建触发器 {trigger_name} (ID: {trigger_id})")

    return trigger_id

manual_close: 1 允许手动关闭告警,Application: 日志 标签与监控项保持一致。

3. 触发器表达式参考

# 匹配到关键字即告警(find 模式 - 脚本默认)

find(/Web-Server-01/logrt["/var/log/nginx/error.log.*","500"],,"regexp","500")=1

# 值超过阈值触发(last 模式 - trigger_count 非空时)

last(/App-Server-01/logrt.count["/var/log/app/error.log","CRITICAL"])>0

四、完整代码

1. 核心类 items_creants.py

#!/usr/bin/env python3

# -*- coding: utf-8 -*-

import requests

import json

requests.packages.urllib3.disable_warnings()

class PerseusZAPIClient:

    def __init__(self, url: str, user: str, pwd: str):

        self.url = url + "/api_jsonrpc.php"

        self.user = user

        self.pwd = pwd

        self.token = None

        self.headers = {"Content-Type": "application/json"}

        self._login()

    def _login(self):

        payload = {"jsonrpc":"2.0","method":"user.login",

                   "params":{"username":self.user,"password":self.pwd},

                   "id":1,"auth":None}

        res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)

        self.token = res.json()["result"]

        print(f"✅ 认证成功,Token: {self.token[:10]}...")

    def get_host_id(self, host):

        payload = {"jsonrpc":"2.0","method":"host.get",

                   "params":{"filter":{"host":[host]},"output":["hostid"]},

                   "id":2,"auth":self.token}

        res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)

        return res.json()["result"][0]["hostid"]

    def check_item(self, host_id, key):

        payload = {"jsonrpc":"2.0","method":"item.get",

                   "params":{"hostids":[host_id],"filter":{"key_":key},"output":["itemid"]},

                   "id":3,"auth":self.token}

        res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)

        res_json = res.json()

        return (True, res_json["result"][0]["itemid"]) if res_json["result"] else (False, "")

    def create_item(self, host_id, items, delay, name, hostname):

        exists, item_id = self.check_item(host_id, items)

        if exists: raise Exception(f"ITEM_EXISTS:{item_id}")

        payload = {"jsonrpc":"2.0","method":"item.create",

                   "params":{"hostid":host_id,"name":name,"key_":items,

                             "type":7,"value_type":2,"interfaceid":0,"delay":delay,

                             "tags":[{"tag":"Application","value":"日志"}]},

                   "id":4,"auth":self.token}

        res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)

        res_json = res.json()

        if "error" in res_json: raise Exception(f"项创建失败: {res_json['error']['message']} - {res_json['error'].get('data','')}")

        item_id = res_json["result"]["itemids"][0]

        print(f"✅ 成功创建监控项 {hostname} (ID: {item_id})")

        return item_id

    def create_trigger(self, host, items, trigger_name, trigger_level, trigger_count, keywords):

        severity_reverse = {"信息": 1, "警告": 2, "次要": 3, "严重": 4, "紧急": 5}

        priority = severity_reverse.get(trigger_level, 4)

        expr = f'find(/{host}/{items},,"regexp","{keywords}")=1'

        if trigger_count != "":

            expr = f'last(/{host}/{items})>{trigger_count}'

        payload = {"jsonrpc":"2.0","method":"trigger.create",

                   "params":{"description":trigger_name,"expression":expr,

                             "manual_close":1,"priority":priority,

                             "tags":[{"tag":"Application","value":"日志"}]},

                   "id":5,"auth":self.token}

        res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)

        res_json = res.json()

        if "error" in res_json: raise Exception(f"API错误: {res_json['error']['message']} - {res_json['error'].get('data','')}")

        trigger_id = res_json["result"]["triggerids"][0]

        print(f"✅ 成功创建触发器 {trigger_name} (ID: {trigger_id})")

        return trigger_id

    def batch_create(self, configs):

        """批量创建监控项和触发器,返回分类统计结果"""

        res = {"新建项":[],"已创建":[],"成功触发器":[],"失败触发器":[],"整体失败":[]}

        for config in configs:

            hostname, name, regex_path, keywords, trigger_name, trigger_count, trigger_level, trigger_time, trigger_decs = config['row']

            host = hostname

            line_num = config["line_num"]

            items = config["log_keys"]

            if not host or not hostname:

                res["整体失败"].append(f"第{line_num}行主机配置空: {host}")

                continue

            try:

                host_id = self.get_host_id(host)

                print(f"\n

0 条评论

请先 登录 后评论
王一哲
王一哲

6 篇文章

作家榜 »

  1. 乐维君 515 文章
  2. YOHOHO 14 文章
  3. 细雨闲花 13 文章
  4. 机灵小和尚 13 文章
  5. 我是一只小菜鸡 12 文章
  6. 。。。 9 文章
  7. 御前侍卫张五哥 9 文章
  8. 小黄人 8 文章