概述
基于 Zabbix API 和 Python 实现批量日志监控自动化,通过 Excel 导入监控配置,一键创建监控项和触发器,适用于服务器日志的大量监控场景。
一、环境准备与基础方法
1. 环境准备
依赖项 版本要求
Python >= 3.6
Zabbix Server (本文基于 6.4 验证)
pip install requests pandas openpyxl
前置条件:Zabbix Server 已启用日志监控所需的 logrt / log 相关 Agent 主动/被动检查,目标主机上 zabbix_agent2.conf 已配置日志文件访问权限。
2. 登录认证
Zabbix API 采用 JSON-RPC 2.0 协议,操作需携带认证 Token。通过 user.login 方法获取:
import requests
import json
requests.packages.urllib3.disable_warnings()
class PerseusZAPIClient:
def __init__(self, url: str, user: str, pwd: str):
self.url = url + "/api_jsonrpc.php"
self.user = user
self.pwd = pwd
self.token = None
self.headers = {"Content-Type": "application/json"}
self._login()
def _login(self):
payload = {"jsonrpc":"2.0","method":"user.login",
"params":{"username":self.user,"password":self.pwd},
"id":1,"auth":None}
res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)
self.token = res.json()["result"]
print(f"✅ 认证成功,Token: {self.token[:10]}...")
verify=False:内网环境常使用自签名证书,生产环境建议配置 CA 证书路径。
3. 主机查询与监控项检查
批量创建前需要获取主机 ID,同时使用 check_item 检查监控项是否已存在以避免重复创建:
def get_host_id(self, host):
"""根据主机名获取主机 ID"""
payload = {"jsonrpc":"2.0","method":"host.get",
"params":{"filter":{"host":[host]},"output":["hostid"]},
"id":2,"auth":self.token}
res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)
return res.json()["result"][0]["hostid"]
def check_item(self, host_id, key):
"""检查监控项是否已存在,返回 (是否存在, item_id)"""
payload = {"jsonrpc":"2.0","method":"item.get",
"params":{"hostids":[host_id],"filter":{"key_":key},"output":["itemid"]},
"id":3,"auth":self.token}
res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)
res_json = res.json()
return (True, res_json["result"][0]["itemid"]) if res_json["result"] else (False, "")
二、创建日志监控项
1. 日志监控项类型
Zabbix 原生支持以下日志类 Item 类型:
类型键值 说明 关键参数
log 监控固定路径日志文件 file, regexp
logrt 监控轮转日志(支持通配符/正则) file, regexp, output
Key 自动构建规则(调用脚本中根据"发现关键字次数"自动选择):
发现关键字次数 生成的 Key 说明
非空(如 0) logrt.count["{path}","{keywords}"] 计数模式,配合 last() 触发器
为空 logrt["{path}", "{keywords}",, skip] 匹配模式,配合 find() 触发器
2. 创建监控项实现
参数说明:
参数 类型 说明
host_id str 目标主机 ID
items str 监控项 Key(如 logrt[...])
delay str 采集间隔
name str 监控项名称
hostname str 主机显示名(用于日志输出)
item_type / value_type 说明:
创建时固定 type=7(Zabbix agent 主动式)、value_type=2(日志),同时自动添加 Application: 日志 标签用于分类筛选。
item_type 说明 value_type 说明
0 Zabbix agent 0 浮点数
2 Zabbix trapper 1 字符
7 Zabbix agent (主动式) 2 日志
19 HTTP agent 3 整数
21 脚本 4 文本
def create_item(self, host_id, items, delay, name, hostname):
"""创建单个日志监控项,已存在则抛出 ITEM_EXISTS 异常"""
exists, item_id = self.check_item(host_id, items)
if exists:
raise Exception(f"ITEM_EXISTS:{item_id}")
payload = {"jsonrpc":"2.0","method":"item.create",
"params":{"hostid":host_id,"name":name,"key_":items,
"type":7,"value_type":2,"interfaceid":0,"delay":delay,
"tags":[{"tag":"Application","value":"日志"}]},
"id":4,"auth":self.token}
res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)
res_json = res.json()
if "error" in res_json:
raise Exception(f"项创建失败: {res_json['error']['message']} - {res_json['error'].get('data','')}")
item_id = res_json["result"]["itemids"][0]
print(f"✅ 成功创建监控项 {hostname} (ID: {item_id})")
return item_id
已存在的监控项会抛出 ITEM_EXISTS:{item_id} 异常,上层 batch_create 会捕获并复用已有 ID,保证幂等性。
3. Key 语法参考
log[file,<regexp>,<encoding>,<maxlines>,<mode>,<output>,<maxdelay>,<options>]
logrt[file_regexp,<regexp>,<encoding>,<maxlines>,<mode>,<output>,<maxdelay>,<options>]
# 匹配模式 - 关键字告警(skip 跳过旧数据)
logrt["/var/log/nginx/error.log.*", "5\d{2}",, skip]
# 计数模式 - 次数超阈值告警
logrt.count["/var/log/nginx/error.log.*","5\d{2}"]
三、创建日志触发器
1. 触发器优先级
脚本使用中文等级映射:
值 等级 值 等级
0 未分类 1 信息
2 警告 3 次要
4 严重 5 紧急
severity_reverse = {"信息": 1, "警告": 2, "次要": 3, "严重": 4, "紧急": 5}
2. 创建触发器实现
参数说明:
参数 类型 说明
host str Zabbix 主机名(用于构建表达式路径)
items str 监控项 Key
trigger_name str 触发器名称
trigger_level str 严重等级(信息/警告/次要/严重/紧急)
trigger_count str 阈值,非空时使用 last() 表达式
keywords str 匹配关键字,使用 find() + regexp 表达式
表达式自动构建逻辑:
条件 生成的表达式 说明
trigger_count 非空 last(/host/key) > {trigger_count} 值超阈值触发
trigger_count 为空 find(/host/key,,"regexp","{keywords}") = 1 匹配关键字触发
def create_trigger(self, host, items, trigger_name, trigger_level, trigger_count, keywords):
"""创建单个日志触发器,自动构建表达式"""
severity_reverse = {"信息": 1, "警告": 2, "次要": 3, "严重": 4, "紧急": 5}
priority = severity_reverse.get(trigger_level, 4)
expr = f'find(/{host}/{items},,"regexp","{keywords}")=1'
if trigger_count != "":
expr = f'last(/{host}/{items})>{trigger_count}'
payload = {"jsonrpc":"2.0","method":"trigger.create",
"params":{"description":trigger_name,"expression":expr,
"manual_close":1,"priority":priority,
"tags":[{"tag":"Application","value":"日志"}]},
"id":5,"auth":self.token}
res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)
res_json = res.json()
if "error" in res_json:
raise Exception(f"API错误: {res_json['error']['message']} - {res_json['error'].get('data','')}")
trigger_id = res_json["result"]["triggerids"][0]
print(f"✅ 成功创建触发器 {trigger_name} (ID: {trigger_id})")
return trigger_id
manual_close: 1 允许手动关闭告警,Application: 日志 标签与监控项保持一致。
3. 触发器表达式参考
# 匹配到关键字即告警(find 模式 - 脚本默认)
find(/Web-Server-01/logrt["/var/log/nginx/error.log.*","500"],,"regexp","500")=1
# 值超过阈值触发(last 模式 - trigger_count 非空时)
last(/App-Server-01/logrt.count["/var/log/app/error.log","CRITICAL"])>0
四、完整代码
1. 核心类 items_creants.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import requests
import json
requests.packages.urllib3.disable_warnings()
class PerseusZAPIClient:
def __init__(self, url: str, user: str, pwd: str):
self.url = url + "/api_jsonrpc.php"
self.user = user
self.pwd = pwd
self.token = None
self.headers = {"Content-Type": "application/json"}
self._login()
def _login(self):
payload = {"jsonrpc":"2.0","method":"user.login",
"params":{"username":self.user,"password":self.pwd},
"id":1,"auth":None}
res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)
self.token = res.json()["result"]
print(f"✅ 认证成功,Token: {self.token[:10]}...")
def get_host_id(self, host):
payload = {"jsonrpc":"2.0","method":"host.get",
"params":{"filter":{"host":[host]},"output":["hostid"]},
"id":2,"auth":self.token}
res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)
return res.json()["result"][0]["hostid"]
def check_item(self, host_id, key):
payload = {"jsonrpc":"2.0","method":"item.get",
"params":{"hostids":[host_id],"filter":{"key_":key},"output":["itemid"]},
"id":3,"auth":self.token}
res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)
res_json = res.json()
return (True, res_json["result"][0]["itemid"]) if res_json["result"] else (False, "")
def create_item(self, host_id, items, delay, name, hostname):
exists, item_id = self.check_item(host_id, items)
if exists: raise Exception(f"ITEM_EXISTS:{item_id}")
payload = {"jsonrpc":"2.0","method":"item.create",
"params":{"hostid":host_id,"name":name,"key_":items,
"type":7,"value_type":2,"interfaceid":0,"delay":delay,
"tags":[{"tag":"Application","value":"日志"}]},
"id":4,"auth":self.token}
res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)
res_json = res.json()
if "error" in res_json: raise Exception(f"项创建失败: {res_json['error']['message']} - {res_json['error'].get('data','')}")
item_id = res_json["result"]["itemids"][0]
print(f"✅ 成功创建监控项 {hostname} (ID: {item_id})")
return item_id
def create_trigger(self, host, items, trigger_name, trigger_level, trigger_count, keywords):
severity_reverse = {"信息": 1, "警告": 2, "次要": 3, "严重": 4, "紧急": 5}
priority = severity_reverse.get(trigger_level, 4)
expr = f'find(/{host}/{items},,"regexp","{keywords}")=1'
if trigger_count != "":
expr = f'last(/{host}/{items})>{trigger_count}'
payload = {"jsonrpc":"2.0","method":"trigger.create",
"params":{"description":trigger_name,"expression":expr,
"manual_close":1,"priority":priority,
"tags":[{"tag":"Application","value":"日志"}]},
"id":5,"auth":self.token}
res = requests.post(self.url, data=json.dumps(payload), headers=self.headers, verify=False)
res_json = res.json()
if "error" in res_json: raise Exception(f"API错误: {res_json['error']['message']} - {res_json['error'].get('data','')}")
trigger_id = res_json["result"]["triggerids"][0]
print(f"✅ 成功创建触发器 {trigger_name} (ID: {trigger_id})")
return trigger_id
def batch_create(self, configs):
"""批量创建监控项和触发器,返回分类统计结果"""
res = {"新建项":[],"已创建":[],"成功触发器":[],"失败触发器":[],"整体失败":[]}
for config in configs:
hostname, name, regex_path, keywords, trigger_name, trigger_count, trigger_level, trigger_time, trigger_decs = config['row']
host = hostname
line_num = config["line_num"]
items = config["log_keys"]
if not host or not hostname:
res["整体失败"].append(f"第{line_num}行主机配置空: {host}")
continue
try:
host_id = self.get_host_id(host)
print(f"\n
错误信息