#!/usr/lib64/linuxfabrik-monitoring-plugins/venv/bin/python
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author:  Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
#          https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.

# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.md

"""See the check's README for more details."""

import argparse
import os
import re
import sys

import lib.args
import lib.base
import lib.cache
import lib.db_mysql
import lib.disk
import lib.human
import lib.shell
import lib.time
import lib.txt
from lib.globals import STATE_CRIT, STATE_OK, STATE_UNKNOWN, STATE_WARN

__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2026051201'

DESCRIPTION = """Scans the MySQL/MariaDB error log for errors, warnings, startups and
shutdowns. On MySQL 8.0.22+ the plugin prefers the `performance_schema.error_log` table
(reachable over the network, no shell access to the log file needed). Otherwise it
reads the on-disk log file, or fetches recent log lines from a container
(`docker:`/`podman:`/`kubectl:`) or systemd unit (`systemd:`).
The on-disk file path is taken from MySQL/MariaDB's `log_error` variable, with
common fallback locations probed when that variable is empty. The discovered path
is cached so the check still works briefly when the database is down.
Severity is detected from the bracketed log tags (`[ERROR]`, `[Warning]`), which
matches MySQL/MariaDB output and avoids false positives on lines that merely
mention "error" or "warning". Recommendations are grouped under a single block at
the end of the output.
Reading the on-disk log file usually requires root/sudo (typical mysql logs are
owned by `mysql:mysql` mode `0640`). The `performance_schema.error_log` path needs
SELECT on that table but no filesystem access."""

DEFAULT_CACHE_EXPIRE = 5 * 24 * 60  # in minutes (= 5 days)
DEFAULT_DEFAULTS_FILE = '/var/spool/icinga2/.my.cnf'
DEFAULT_DEFAULTS_GROUP = 'client'
DEFAULT_HOSTNAME = '127.0.0.1'
DEFAULT_PORT = '3306'
DEFAULT_TIMEOUT = 3

LOGFILE_BIG_THRESHOLD = 32 * 1024 * 1024  # 32 MiB - matches mysqltuner's cutoff
MAXLINES = 30000  # maximum log lines to consider from the end of the source


def parse_args():
    """Parse command line arguments using argparse."""
    parser = argparse.ArgumentParser(description=DESCRIPTION)

    parser.add_argument(
        '-V',
        '--version',
        action='version',
        version=f'%(prog)s: v{__version__} by {__author__}',
    )

    parser.add_argument(
        '--always-ok',
        help=lib.args.help('--always-ok'),
        dest='ALWAYS_OK',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '--cache-expire',
        help=lib.args.help('--cache-expire') + ' Default: %(default)s',
        dest='CACHE_EXPIRE',
        type=int,
        default=DEFAULT_CACHE_EXPIRE,
    )

    parser.add_argument(
        '--defaults-file',
        help='MySQL/MariaDB cnf file to read user, host and password from. '
        'Example: `--defaults-file=/var/spool/icinga2/.my.cnf`. '
        'Default: %(default)s',
        dest='DEFAULTS_FILE',
        default=DEFAULT_DEFAULTS_FILE,
    )

    parser.add_argument(
        '--defaults-group',
        help=lib.args.help('--defaults-group') + ' Default: %(default)s',
        dest='DEFAULTS_GROUP',
        default=DEFAULT_DEFAULTS_GROUP,
    )

    parser.add_argument(
        '-H',
        '--hostname',
        help='MySQL/MariaDB hostname or IP address. Default: %(default)s',
        dest='HOSTNAME',
        default=DEFAULT_HOSTNAME,
    )

    parser.add_argument(
        '--ignore-pattern',
        help='Any line containing this pattern will be ignored. '
        'Must be lowercase. '
        'Can be specified multiple times.',
        action='append',
        default=None,
        dest='IGNORE_PATTERN',
    )

    parser.add_argument(
        '--ignore-regex',
        help=lib.args.help('--ignore-regex'),
        action='append',
        default=None,
        dest='IGNORE_REGEX',
    )

    parser.add_argument(
        '--port',
        help='MySQL/MariaDB port number. Default: %(default)s',
        dest='PORT',
        type=int,
        default=DEFAULT_PORT,
    )

    parser.add_argument(
        '--server-log',
        help='Log source to read from. '
        'Accepts a file path, `docker:CONTAINER`, `podman:CONTAINER`, '
        '`kubectl:CONTAINER` or `systemd:UNITNAME`. '
        'If omitted, the check first probes `performance_schema.error_log` '
        '(MySQL 8.0.22+) and then falls back to the file from `log_error`.',
        dest='SERVER_LOG',
    )

    parser.add_argument(
        '--timeout',
        help=lib.args.help('--timeout') + ' Default: %(default)s (seconds)',
        dest='TIMEOUT',
        type=int,
        default=DEFAULT_TIMEOUT,
    )

    args, _ = parser.parse_known_args()
    return args


def get_vars(conn):
    # Do not implement `get_all_vars()`, just fetch the ones we need for this check.
    # Without the GLOBAL modifier, SHOW VARIABLES displays the values that are used for
    # the current connection to MariaDB.
    sql = """
        show global variables
        where variable_name like 'datadir'
            or variable_name like 'hostname'
            or variable_name like 'log_error';
          """
    return lib.base.coe(lib.db_mysql.select(conn, sql))


def get_log_file_real_path(file, hostname, datadir):
    if file and os.path.isfile(file):
        return file
    if os.path.isfile(f'{hostname}.log'):
        return f'{hostname}.log'
    if os.path.isfile(f'{hostname}.err'):
        return f'{hostname}.err'
    if os.path.isfile(os.path.join(datadir, f'{hostname}.err')):
        return os.path.join(datadir, f'{hostname}.err')
    if os.path.isfile(os.path.join(datadir, f'{hostname}.log')):
        return os.path.join(datadir, f'{hostname}.log')
    if os.path.isfile(os.path.join(datadir, 'mysql_error.log')):
        return os.path.join(datadir, 'mysql_error.log')
    if os.path.isfile('/var/log/mysql.log'):
        return '/var/log/mysql.log'
    if os.path.isfile('/var/log/mysqld.log'):
        return '/var/log/mysqld.log'
    if os.path.isfile(f'/var/log/mysql/{hostname}.err'):
        return f'/var/log/mysql/{hostname}.err'
    if os.path.isfile(f'/var/log/mysql/{hostname}.log'):
        return f'/var/log/mysql/{hostname}.log'
    if os.path.isfile('/var/log/mysql/mysql_error.log'):
        return '/var/log/mysql/mysql_error.log'
    return file


def has_pfs_error_log(conn):
    """True if `performance_schema.error_log` exists and is visible to this user.
    information_schema.tables only lists tables the current user can access, so a
    non-empty result implies the user has at least some privilege on it.
    """
    sql = (
        "SELECT 1 FROM information_schema.tables"
        " WHERE table_schema = 'performance_schema'"
        " AND table_name = 'error_log' LIMIT 1;"
    )
    success, rows = lib.db_mysql.select(conn, sql)
    return success and bool(rows)


def read_pfs_error_log(conn):
    """Pull the most recent rows from `performance_schema.error_log` and rebuild
    text lines that look like ordinary MySQL/MariaDB log entries (so the
    downstream bracket-tag parser does not need its own PFS branch). Returns the
    joined log text on success or None on SQL failure (caller falls back to
    file mode).
    """
    sql = (
        'SELECT LOGGED, PRIO, ERROR_CODE, SUBSYSTEM, DATA'
        ' FROM performance_schema.error_log'
        f' ORDER BY LOGGED DESC LIMIT {MAXLINES};'
    )
    success, rows = lib.db_mysql.select(conn, sql)
    if not success:
        return None
    lines = []
    # Reverse to put oldest first, so "last: ..." in the summary points to the
    # most recent row.
    for row in reversed(rows or []):
        ts = row.get('LOGGED', '')
        prio = row.get('PRIO', '')
        err_code = row.get('ERROR_CODE', '')
        subsystem = row.get('SUBSYSTEM', '')
        data = row.get('DATA', '')
        lines.append(
            f'{ts} 0 [{prio}] [{err_code}] [{subsystem}] {data}'
        )
    return '\n'.join(lines)


def read_container_logs(prefix, target):
    """Run `<prefix> logs --tail=<MAXLINES> <target>`; abort on non-zero exit."""
    cmd = f"{prefix} logs --tail={MAXLINES} '{target}'"
    stdout, stderr, retc = lib.base.coe(lib.shell.shell_exec(cmd))
    if retc != 0:
        lib.base.cu(f'`{cmd}` exited with error ({retc}, {stderr.strip()}).')
    return stdout


def read_systemd_logs(unit):
    cmd = f"journalctl --lines {MAXLINES} --boot --unit '{unit}'"
    stdout, stderr, retc = lib.base.coe(lib.shell.shell_exec(cmd))
    if retc != 0:
        lib.base.cu(f'`{cmd}` exited with error ({retc}, {stderr.strip()}).')
    return stdout


def shorten(items, head=5, tail=5):
    """Collapse a list to head + ellipsis + tail when it is longer than head+tail."""
    if len(items) > head + tail:
        return [*items[:head], '...', *items[-tail:]]
    return items


def main():
    """The main function. This is where the magic happens."""

    # logic taken from mysqltuner.pl:log_file_recommendations(),
    # verified in sync with MySQLTuner

    # parse the command line
    try:
        args = parse_args()
    except SystemExit:
        sys.exit(STATE_UNKNOWN)

    if args.IGNORE_PATTERN is None:
        args.IGNORE_PATTERN = []
    if args.IGNORE_REGEX is None:
        args.IGNORE_REGEX = []

    # fetch data
    # `source_label` always describes what the admin sees ("Source: ..."),
    # `source_size` is set only for plain on-disk files (so we can emit the
    # 32 MiB cutoff check and a perfdata series).
    source_label = None
    source_size = None
    logfile = None
    log_error = None
    myvar = {}

    if args.SERVER_LOG:
        log_error = args.SERVER_LOG
    else:
        mysql_connection = {
            'defaults_file': args.DEFAULTS_FILE,
            'defaults_group': args.DEFAULTS_GROUP,
            'timeout': args.TIMEOUT,
        }
        success, conn = lib.db_mysql.connect(mysql_connection)
        if success:
            lib.base.coe(lib.db_mysql.check_privileges(conn))
            myvar = lib.db_mysql.lod2dict(get_vars(conn))
            # Prefer the structured PFS table on MySQL 8.0.22+ where it exists
            # and is visible to this user. Avoids needing shell access to the
            # error log file (works against remote DBs too).
            if has_pfs_error_log(conn):
                pfs_text = read_pfs_error_log(conn)
                if pfs_text is not None:
                    logfile = pfs_text
                    log_error = 'performance_schema.error_log'
                    source_label = '`performance_schema.error_log`'
            lib.db_mysql.close(conn)
        if log_error is None:
            # DB unreachable or PFS not available: use the file path from the
            # config, fall back to the cached value from a previous run.
            log_error = myvar.get('log_error')
            if not log_error:
                log_error = lib.cache.get(
                    f'{args.HOSTNAME}-{args.PORT}',
                    filename='linuxfabrik-monitoring-plugins-mysql-logfile.db',
                )
            if log_error:
                log_error = get_log_file_real_path(
                    log_error,
                    myvar.get('hostname', ''),
                    myvar.get('datadir', ''),
                )
        if not log_error:
            lib.base.cu(
                "No log file set (set `log_error` in MySQL/MariaDB config or "
                "use the check's `--server-log` parameter)."
            )

    # Cache the discovered on-disk path so the next run still has it when the
    # DB is unreachable. PFS does not need caching (only works with a live conn).
    if log_error and log_error != 'performance_schema.error_log':
        lib.cache.set(
            f'{args.HOSTNAME}-{args.PORT}',
            log_error,
            lib.time.now() + args.CACHE_EXPIRE * 60,
            filename='linuxfabrik-monitoring-plugins-mysql-logfile.db',
        )

    # read log content (unless PFS already provided it)
    if logfile is None:
        if log_error == 'stderr':
            lib.base.cu(
                "log_error is set to STDERR, so this check can't read stderr."
            )
        container = re.match(r'^(docker|podman|kubectl):(.*)$', log_error)
        systemd_match = re.match(r'^systemd:(.*)$', log_error)
        if container:
            logfile = read_container_logs(
                container.group(1).strip(),
                container.group(2).strip(),
            )
            source_label = f'`{log_error}`'
        elif systemd_match:
            logfile = read_systemd_logs(systemd_match.group(1).strip())
            source_label = f'`{log_error}`'
        else:
            # plain on-disk file
            if not os.path.isabs(log_error):
                log_error = os.path.join(myvar.get('datadir', ''), log_error)
            if not os.path.isfile(log_error):
                lib.base.oao(
                    f'Logging seems to be configured, but `{log_error}` does'
                    f' not seem to be an existing regular file. Check the path'
                    f' and file permissions, or provide the `--server-log`'
                    f' parameter.',
                    STATE_WARN,
                )
            source_size = os.path.getsize(log_error)
            if source_size == 0:
                # An empty log file is a deterministic "no errors / warnings
                # observed" state, not an unknown one - typical right after
                # logrotate fires. Both the auto-detected and the explicit
                # --server-log code paths land here.
                lib.base.oao(
                    f'Log file `{log_error}` is empty. Assuming log-rotation.',
                    STATE_OK,
                )
            logfile = lib.base.coe(lib.disk.read_file(log_error))
            source_label = f'`{log_error}`'

    # init some vars
    state = STATE_OK
    sections = []
    facts = []
    # All recommendations from all WARN/CRIT paths land here and render once at
    # the end as a `Recommendations:\n* ...` bulleted block, regardless of
    # which combinations of paths fire.
    recommendations = []
    last_errs, last_warns, last_shutdowns, last_starts = [], [], [], []
    compiled_ignore_regex = [re.compile(item) for item in args.IGNORE_REGEX]

    # analyze data
    for log_line in logfile.splitlines():
        haystack = log_line.lower()
        if any(
            pattern.lower() in haystack for pattern in args.IGNORE_PATTERN
        ) or any(item.search(haystack) for item in compiled_ignore_regex):
            continue
        # MySQL/MariaDB tag the severity in brackets like `[ERROR]` / `[Warning]`.
        # Match the bracketed form so words like "errors" inside table names or
        # paths do not falsely trip the count (mysqltuner uses the same regex).
        if '[error]' in haystack:
            last_errs.append(log_line)
        if '[warning]' in haystack:
            last_warns.append(log_line)
        if 'shutdown complete' in haystack and 'innodb' not in haystack:
            last_shutdowns.append(log_line)
        if 'ready for connections' in log_line:
            last_starts.append(log_line)

    # build the message

    if source_size is not None:
        size_human = lib.human.bytes2human(source_size)
        threshold_human = lib.human.bytes2human(LOGFILE_BIG_THRESHOLD)
        if source_size >= LOGFILE_BIG_THRESHOLD:
            state = lib.base.get_worst(state, STATE_WARN)
            facts.append(
                f'Source: {source_label} (size: {size_human} >'
                f' {threshold_human})'
                f'{lib.base.state2str(STATE_WARN, prefix=" ")}'
            )
            recommendations.append(
                f'Log file is > {threshold_human}; analyse why or set up log'
                f' rotation (e.g. logrotate)'
            )
        else:
            facts.append(
                f'Source: {source_label} (size: {size_human} < {threshold_human})'
            )
    else:
        facts.append(f'Source: {source_label}')

    n_err = len(last_errs)
    if n_err:
        state = lib.base.get_worst(state, STATE_CRIT)
        facts.append(
            f'{n_err} {lib.txt.pluralize("error", n_err)} found'
            f'{lib.base.state2str(STATE_CRIT, prefix=" ")}'
            f' (last: {last_errs[-1]})'
        )
        recommendations.append(
            f'Check the {lib.txt.pluralize("error", n_err)} in {source_label}'
        )
    else:
        facts.append('No errors found')

    n_warn = len(last_warns)
    if n_warn:
        state = lib.base.get_worst(state, STATE_WARN)
        facts.append(
            f'{n_warn} {lib.txt.pluralize("warning", n_warn)} found'
            f'{lib.base.state2str(STATE_WARN, prefix=" ")}'
            f' (last: {last_warns[-1]})'
        )
        recommendations.append(
            f'Check the {lib.txt.pluralize("warning", n_warn)} in {source_label}'
        )
    else:
        facts.append('No warnings found')

    n_start = len(last_starts)
    if n_start:
        facts.append(
            f'{n_start} {lib.txt.pluralize("startup", n_start)} detected'
            f' (last: {last_starts[-1]})'
        )
    else:
        facts.append('No startups detected')

    n_shut = len(last_shutdowns)
    if n_shut:
        facts.append(
            f'{n_shut} {lib.txt.pluralize("shutdown", n_shut)} detected'
            f' (last: {last_shutdowns[-1]})'
        )
    else:
        facts.append('No shutdowns detected')

    sections.append('. '.join(facts) + '.')

    if last_errs:
        sections.append(
            'Errors:\n' + '\n'.join(f'* {line}' for line in shorten(last_errs))
        )
    if last_warns:
        sections.append(
            'Warnings:\n' + '\n'.join(
                f'* {line}' for line in shorten(last_warns)
            )
        )
    if last_starts:
        sections.append(
            'Startups:\n' + '\n'.join(
                f'* {line}' for line in shorten(last_starts)
            )
        )
    if last_shutdowns:
        sections.append(
            'Shutdowns:\n' + '\n'.join(
                f'* {line}' for line in shorten(last_shutdowns)
            )
        )

    if recommendations:
        sections.append(
            'Recommendations:\n' + '\n'.join(f'* {r}' for r in recommendations)
        )

    msg = '\n\n'.join(sections)

    # perfdata
    perfdata = ''
    if source_size is not None:
        perfdata += lib.base.get_perfdata(
            'mysql_logfile_size',
            source_size,
            uom='B',
            _min=0,
        )
    perfdata += lib.base.get_perfdata('mysql_error_lines', n_err, _min=0)
    perfdata += lib.base.get_perfdata('mysql_warning_lines', n_warn, _min=0)
    perfdata += lib.base.get_perfdata('mysql_startups', n_start, _min=0)
    perfdata += lib.base.get_perfdata('mysql_shutdowns', n_shut, _min=0)

    # over and out
    lib.base.oao(msg, state, perfdata, always_ok=args.ALWAYS_OK)


if __name__ == '__main__':
    try:
        main()
    except Exception:
        lib.base.cu()
