使用 Zabbix 和 Python 脚本监控 Oracle 数据库的实战指南

在企业应用中,Oracle数据库是许多关键业务系统的核心。为了确保数据库的稳定运行和性能优化,数据库管理员(DBA)需要借助有效的监控工具。Zabbix是一款强大的开源监控工具,通过自定义脚本,可以实现对Oracle数据库的全面监控。本文将详细介绍如何编写和配置一个用于监控Oracle数据库的Python脚本,并将其集成到Zabbix中。

引言

在企业应用中,Oracle 数据库是许多关键业务系统的核心。为了确保数据库的稳定运行和性能优化,数据库管理员(DBA)需要借助有效的监控工具。Zabbix 是一款强大的开源监控工具,通过自定义脚本,可以实现对 Oracle 数据库的全面监控。本文将详细介绍如何编写和配置一个用于监控 Oracle 数据库的 Python 脚本,并将其集成到 Zabbix 中。


一、环境准备

    安装 Zabbix 服务器

        下载并安装 Zabbix 服务器,详细步骤参考官方文档。

        确保 Zabbix 服务器可以访问 Oracle 数据库。

    安装 Zabbix Agent

        在 Oracle 数据库所在服务器上安装 Zabbix Agent,并配置好相应的参数。

    配置 Oracle 数据库

        确保 Oracle 数据库允许来自 Zabbix 服务器的连接。

        创建一个具有足够权限的监控用户。

二、编写监控脚本

以下是一个完整的 Python 脚本,用于监控 Oracle 数据库的多项指标。脚本利用 cx_Oracle 库连接到 Oracle 数据库,执行 SQL 查询并返回结果。

#!/usr/bin/env python
# -*- coding : utf-8 -*-
# @Time : 2024/3/21 10:02
# @Author : 我是Rain呀 -- (。♥ᴗ♥。) 
# @File : OracleMonitoring.py
# @Software : PyCharm
import argparse
import cx_Oracle
import json
from datetime import datetime
class ConnOracle(object):
    # 连接Oracle And 获取数据类
    def __init__(self, oracleUser: str, oraclePassword: str, oracleHost: str, oraclePort: int, oracleServiceName: str):
        self.username = oracleUser
        self.password = oraclePassword
        self.host = oracleHost
        self.port = oraclePort
        self.service_name = oracleServiceName
        self.connection = self.connect()
    def connect(self) -> object:
        try:
            dsn = cx_Oracle.makedsn(self.host, self.port, service_name=self.service_name)
            connection = cx_Oracle.connect(self.username, self.password, dsn)
            return connection
        except cx_Oracle.Error:
            return None
    def sqlExecute(self, query: str) -> list:
        cursor = self.connection.cursor()
        try:
            cursor.execute(query)
            rows = cursor.fetchall()
            return rows
        except cx_Oracle.Error:
            pass
        finally:
            # 释放游标
            cursor.close()
            self.disconnect()
    def disconnect(self):
        # 断开连接
        if self.connection:
            self.connection.close()
class OracleIndex(ConnOracle):
    # 自动发现规则
    def __init__(self, username, password, Host, Port, service_name):
        super().__init__(username, password, Host, Port, service_name)
    def tableSpaceIndex(self) -> json:
        # 获取所有表空间的名字
        if self.connection is not None:
            sql = "SELECT TABLESPACE_NAME FROM DBA_TABLESPACES"
            result = self.sqlExecute(sql)
            return json.dumps([{"{#INDEX}": index, "{#TABLESPACE_NAME}": row[0]}
                               for index, row in enumerate(result, start=1)])
        return json.dumps([])
    def userNameIndex(self) -> json:
        # 获取所有用户的名字
        if self.connection is not None:
            sql = "SELECT USERNAME, ACCOUNT_STATUS FROM DBA_USERS"
            result = self.sqlExecute(sql)
            return json.dumps([{"{#INDEX}": index, "{#USERNAME}": row[0], "{#ACCOUNT_STATUS}": row[1]}
                               for index, row in enumerate(result, start=1)])
        return json.dumps([])
    def userSegmentIndex(self) -> json:
        # 获取 Segments 下面所有的 Schema,为后续统计用户大小传递数据 Schema discovery
        if self.connection is not None:
            sql = "SELECT DISTINCT OWNER FROM DBA_SEGMENTS"
            result = self.sqlExecute(sql)
            return json.dumps([{"{#INDEX}": index, "{#USERNAME}": row[0]} for index, row in enumerate(result, start=1)])
        return json.dumps([])
    def asmDiskGroupIndex(self) -> json:
        # 获取所ASM磁盘组的名字
        if self.connection is not None:
            sql = "SELECT NAME FROM V$ASM_DISKGROUP"
            result = self.sqlExecute(sql)
            return json.dumps([{"{#INDEX}": index, "{#ASMNAME}": row[0]} for index, row in enumerate(result, start=1)])
        return json.dumps([])
class OracleNorm(ConnOracle):
    # 自动发现的监控项原型
    def __init__(self, username, password, Host, Port, service_name):
        super().__init__(username, password, Host, Port, service_name)
    def tablespaceUsed(self, *args) -> float:
        # 磁盘空间已使用空间 GB 为单位
        if self.connection is not None:
            sql = f"SELECT ROUND(USED_SPACE * 8 / 1024 / 1024, 2) USED_GB FROM DBA_TABLESPACE_USAGE_METRICS " \
                  f"WHERE TABLESPACE_NAME = UPPER('{args[0]}')"
            result = self.sqlExecute(sql)
            return float(result[0][0])
        return 0.0
    def tablespaceMax(self, *args) -> float:
        # 磁盘空间最大空间 GB 为单位
        if self.connection is not None:
            sql = f"SELECT ROUND(TABLESPACE_SIZE * 8 / 1024 / 1024, 2) MAX_GB FROM DBA_TABLESPACE_USAGE_METRICS " \
                  f"WHERE TABLESPACE_NAME = UPPER('{args[0]}')"
            result = self.sqlExecute(sql)
            return float(result[0][0])
        return 0.0
    def tablespaceFree(self, *args) -> float:
        # 磁盘空间剩余空间 GB 为单位
        if self.connection is not None:
            sql = f"SELECT ROUND((TABLESPACE_SIZE - USED_SPACE) * 8 / 1024 / 1024, 2) FREE_GB " \
                  f"FROM DBA_TABLESPACE_USAGE_METRICS WHERE TABLESPACE_NAME = UPPER('{args[0]}')"
            result = self.sqlExecute(sql)
            return float(result[0][0])
        return 0.0
    def tablespaceUsedPer(self, *args) -> float:
        # 磁盘空间已使用百分比 % 为单位
        if self.connection is not None:
            sql = f"SELECT ROUND(USED_PERCENT, 2) AS USED_PERCENT FROM DBA_TABLESPACE_USAGE_METRICS " \
                  f"WHERE TABLESPACE_NAME = UPPER('{args[0]}')"
            result = self.sqlExecute(sql)
            return float(result[0][0])
        return 0.0
    def userExpired(self, *args) -> int:
        # 检查用户还有多少天到期
        # 19999 - 表示无期限
        # 29999 - 表示没有这个用户或已删除
        # 39999 - 表示数据库连接失败
        if self.connection is not None:
            sql = f"SELECT EXPIRY_DATE FROM DBA_USERS WHERE USERNAME = UPPER('{args[0]}')"
            result = self.sqlExecute(sql)
            if result:
                if result[0][0] is not None:
                    given_time = result[0][0]
                    current_time = datetime.now()
                    time_difference = given_time - current_time
                    days_difference = time_difference.days
                    return int(days_difference)
                return 19999
            return 29999
        return 39999
    def asmTotal(self, *args) -> int:
        # 查看ASM 指定名称的总空间大小 (MB)
        # 199999999 - 表示没有该ASM名字的数据,或已经删除了该ASM组
        # 299999999 - 表示连接失败,无法连接到数据库
        if self.connection is not None:
            sql = f"SELECT TOTAL_MB FROM V$ASM_DISKGROUP NAME = UPPER('{args[0]}')"
            result = self.sqlExecute(sql)
            if result:
                return int(result[0][0])
            return 199999999
        return 299999999
    def asmFree(self, *args) -> int:
        # 查看ASM 指定名称的 剩余大小 (MB)
        # 199999999 - 表示没有该ASM名字的数据,或已经删除了该ASM组
        # 299999999 - 表示连接失败,无法连接到数据库
        if self.connection is not None:
            sql = f"SELECT FREE_MB FROM V$ASM_DISKGROUP NAME = UPPER('{args[0]}')"
            result = self.sqlExecute(sql)
            if result:
                return int(result[0][0])
            return 199999999
        return 299999999
    def asmUsed(self, *args) -> int:
        # 查看ASM 指定名称的 已使用大小 (MB)
        # 199999999 - 表示没有该ASM名字的数据,或已经删除了该ASM组
        # 299999999 - 表示连接失败,无法连接到数据库
        if self.connection is not None:
            sql = f"SELECT COLD_USED_MB FROM V$ASM_DISKGROUP NAME = UPPER('{args[0]}')"
            result = self.sqlExecute(sql)
            if result:
                return int(result[0][0])
            return 199999999
        return 299999999
    def asmUsedPre(self, *args) -> float:
        # 查看ASM 指定名称的 已使用百分比 (%)
        # 199999999 - 表示没有该ASM名字的数据,或已经删除了该ASM组
        # 299999999 - 表示连接失败,无法连接到数据库
        # 触发器逻辑: xxx >= 85 and xxx <= 100
        if self.connection is not None:
            sql = f"SELECT ROUND(COLD_USED_MB / TOTAL_MB * 100, 2) FROM V$ASM_DISKGROUP NAME = UPPER('{args[0]}')"
            result = self.sqlExecute(sql)
            if result:
                return float(result[0][0])
            return 199999999
        return 299999999
    def schemaSize(self, *args) -> int:
        # 统计Schema 总大小,单位为 B
        # 0 - 表示没有该用户,或该用户已删除
        # 1 - 表示连接失败,无法连接到数据库
        if self.connection is not None:
            sql = f"SELECT SUM(BYTES) FROM DBA_SEGMENTS OWNER = UPPER('{args[0]}')"
            result = self.sqlExecute(sql)
            if result:
                return int(result[0][0])
            return 0
        return 1
class OracleStatic(ConnOracle):
    # 单独指标项,非自动发现
    def __init__(self, username, password, Host, Port, service_name):
        super().__init__(username, password, Host, Port, service_name)
    def OracleInstanceStatus(self) -> str:
        # Oracle Instance 状态
        # 触发器逻辑: 非OPEN状态告警
        if self.connection is not None:
            sql = "SELECT STATUS FROM V$INSTANCE"
            result = self.sqlExecute(sql)
            return str(result[0][0])
        return "Connect Failed"
    def OracleInstanceSessionPer(self) -> float:
        # Session 连接占用百分比
        # 触发器逻辑: 连接百分比 >= 80
        if self.connection is not None:
            sql = "SELECT ROUND(B.VALUE/A.VALUE*100, 2) AS SESSION_USE FROM (SELECT VALUE FROM V$PARAMETER " \
                  "WHERE NAME='sessions') A,(SELECT COUNT(*) AS VALUE FROM V$SESSION) B"
            result = self.sqlExecute(sql)
            return float(result[0][0])
        return float(0.0)
    def OracleVersion(self) -> str:
        # Oracle数据库版本
        if self.connection is not None:
            sql = "SELECT * FROM V$VERSION WHERE BANNER LIKE 'Oracle%'"
            result = self.sqlExecute(sql)
            return str(result[0][0])
        return "Connect Failed"
    def OracleActiveSession(self) -> int:
        # 检查活跃Session数量
        if self.connection is not None:
            sql = "SELECT COUNT(*) FROM V$SESSION WHERE STATUS = 'ACTIVE'"
            result = self.sqlExecute(sql)
            return int(result[0][0])
        return 0
    def OracleInvalidObject(self) -> int:
        # 检查无效对象数量
        if self.connection is not None:
            sql = "SELECT COUNT(*) FROM DBA_OBJECTS WHERE STATUS = 'INVALID'"
            result = self.sqlExecute(sql)
            return int(result[0][0])
        return 0
    def OracleHitCacheRatio(self) -> float:
        # 高速缓存命中率
        # 缓存命中率越高越好,如果返回 0 则为数据库连接失败
        if self.connection is not None:
            sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='Buffer Cache Hit Ratio' AND GROUP_ID=2"
            result = self.sqlExecute(sql)
            return round(float(result[0][0]), 2)
        return 0
    def OracleCursorCacheRatio(self) -> float:
        # 游标缓存命中率
        # 缓存命中率越高越好,如果返回 0 则为数据库连接失败
        if self.connection is not None:
            sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='Cursor Cache Hit Ratio' AND GROUP_ID=2"
            result = self.sqlExecute(sql)
            return round(float(result[0][0]), 2)
        return 0
    def OraclePgaCacheHit(self) -> float:
        # PGA缓存命中率 %
        # 返回 0 可能是数据库连接失败
        if self.connection is not None:
            sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='PGA Cache Hit %' AND GROUP_ID=2"
            result = self.sqlExecute(sql)
            return round(float(result[0][0]), 2)
        return 0
    def OracleSoftParseHit(self) -> float:
        # SQL软解析比例 %
        # 返回 0 可能是数据库连接失败
        if self.connection is not None:
            sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='Soft Parse Ratio' AND GROUP_ID=2"
            result = self.sqlExecute(sql)
            return round(float(result[0][0]), 2)
        return 0
    def OracleGlobalName(self) -> str:
        # 全局数据库名称
        if self.connection is not None:
            sql = "SELECT PROPERTY_VALUE FROM DATABASE_PROPERTIES WHERE PROPERTY_NAME='GLOBAL_DB_NAME'"
            result = self.sqlExecute(sql)
            return str(result[0][0])
        return "Connect Failed"
    def OracleSharePoolFree(self) -> float:
        # 共享池可用百分比 %
        if self.connection is not None:
            sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='Shared Pool Free %' AND GROUP_ID=2"
            result = self.sqlExecute(sql)
            return round(float(result[0][0]), 2)
        return 0
    def OracleHostCpuUtilization(self) -> float:
        # 主机CPU使用率 %
        # 返回 0 可能是数据库连接失败
        if self.connection is not None:
            sql = "SELECT VALUE FROM V$SYSMETRIC WHERE METRIC_NAME='Host CPU Utilization (%)' AND GROUP_ID=2"
            result = self.sqlExecute(sql)
            return round(float(result[0][0]), 2)
        return 0
def main():
    def create_instance(class_name, *args):
        return class_name(*args)
    # 根据监控类型选择对应的类
    class_dict = {
        'index': OracleIndex,
        'space': OracleNorm,
        'statis': OracleStatic
    }
    # 定义参数信息的字典
    args_info = [
        {'name': '--host', 'type': str, 'help': 'Oracle database host/ipaddr', 'required': True},
        {'name': '--user', 'type': str, 'default': 'zabbix',
         'help': 'Oracle database username, The default value is zabbix'},
        {'name': '--passwd', 'type': str, 'default': 'zabbix',
         'help': 'Oracle database password, The default value is zabbix'},
        {'name': '--port', 'type': int, 'default': 1521, 'help': 'Oracle database port, The default value is 1521'},
        {'name': '--instance_name', 'type': str, 'help': 'Oracle database instance name', 'required': True},
        {'name': '--type', 'type': str, 'help': 'Monitoring type', 'choices': class_dict.keys(), 'required': True},
        {'name': 'function', 'type': str, 'help': 'Function to execute'},
        {'name': 'args', 'nargs': '*', 'help': 'Arguments for the function'}
    ]
    # 创建 ArgumentParser 对象
    parser = argparse.ArgumentParser()
    # 循环添加参数
    for arg_info in args_info:
        arg_name = arg_info.pop('name')
        parser.add_argument(arg_name, **arg_info)
    # 解析命令行参数
    arges = parser.parse_args()
    # 获取参数值
    host, user, passwd, port, instance_name, monitoring_type, function_to_execute, function_args = (
        arges.host, arges.user, arges.passwd, arges.port, arges.instance_name, arges.type, arges.function, arges.args
    )
    if monitoring_type in class_dict:
        # 创建相应的类实例
        instance = create_instance(class_dict[monitoring_type], user, passwd, host, port, instance_name)
        # 调用指定函数
        if hasattr(instance, function_to_execute):
            print(getattr(instance, function_to_execute)(*function_args))
        else:
            print("Invalid function name:", function_to_execute)
    else:
        print("Invalid monitoring type:", monitoring_type)
if __name__ == '__main__':
    main()



三、集成脚本到 Zabbix

    创建脚本文件

        将上述 Python 脚本保存为 OracleMonitoring.py。

        将脚本文件上传到 Zabbix 服务器或 Agent 上的指定目录。

    配置 Zabbix

        在 Zabbix 前端创建自定义的监控项和触发器。

        使用 Zabbix 的用户参数功能,调用 Python 脚本并获取监控数据。

四、测试与验证

    运行脚本

        手动运行脚本,验证其输出是否正确。

        确保脚本能够成功连接到 Oracle 数据库并返回预期结果。

    检查 Zabbix 前端

        确认 Zabbix 能够正确读取和显示脚本的输出数据。

        设置相应的告警规则,确保在数据库异常时能够及时通知管理员。

五、总结

     通过使用 Python 脚本和 Zabbix,DBA 可以实现对 Oracle 数据库的自动化监控。本文介绍的解决方案不仅能够帮助管理员及时发现和处理数据库问题,还可以提升整体运维效率。希望本文对广大 DBA 朋友有所帮助。




如果您在开发 Python/Shell 脚本或 Zabbix 模板过程中遇到问题,或者有开发模板或脚本的需求,欢迎随时添加 QQ:1030202304,与我一起沟通交流。

0 条评论

请先 登录 后评论
我是 Rain 呀
我是 Rain 呀

系统运维工程师/DBA 工程师

4 篇文章

作家榜 »

  1. 乐维君 410 文章
  2. YOHOHO 14 文章
  3. 机灵小和尚 13 文章
  4. 细雨闲花 12 文章
  5. 我是一只小菜鸡 12 文章
  6. 。。。 9 文章
  7. 御前侍卫张五哥 9 文章
  8. 小黄人 8 文章