#!/usr/lib64/linuxfabrik-monitoring-plugins/venv/bin/python
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author:  Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
#          https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.

# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.md

"""See the check's README for more details."""

import argparse
import sys

import lib.args
import lib.base
import lib.db_mysql
import lib.version
from lib.globals import STATE_OK, STATE_UNKNOWN

__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2026051102'

DESCRIPTION = """Single-number health score for a MySQL/MariaDB server. Ports mysqltuner's
`calculate_health_score()` weighting (Performance 40 / Security 30 / Resilience 30) and a
useful subset of the supporting checks: anonymous accounts, empty passwords, wildcard hosts,
legacy `mysql_native_password` users on MySQL 8.0+, TLS/SSL configuration, replication lag,
redo-log sizing and InnoDB tables without a user-defined `PRIMARY KEY`. mysqltuner's broader
schema-modelling checks (mixed collations, naming conventions, foreign key hygiene) are not
ported, so on a heavily-flagged server our score can sit 5 to 15 points above mysqltuner's;
the four direct mysqltuner checks (security, perf hit rates, replication lag, log sizing)
do match. Useful as a top-level Icinga alert and Grafana KPI panel; the individual
`mysql-*` plugins still own the detailed findings and fix advice."""

DEFAULT_DEFAULTS_FILE = '/var/spool/icinga2/.my.cnf'
DEFAULT_DEFAULTS_GROUP = 'client'
# mysqltuner colours the score green > 80, yellow > 50, red otherwise.
# We map that to WARN below 70 (yellow zone) and CRIT below 50 (red zone).
# Nagios range form `N:` = OK is N to infinity; values strictly below N alert.
DEFAULT_WARN = '70:'
DEFAULT_CRIT = '50:'
DEFAULT_TIMEOUT = 3

# Schema names excluded from the metadata check (PRIMARY KEY missing). System
# schemas legitimately have tables without user-defined primary keys.
SYSTEM_SCHEMAS = ('information_schema', 'mysql', 'performance_schema', 'sys')

# mysqltuner's cut-off between "log file size is too small" and "log file
# size is large enough"; see `check-plugins/mysql-innodb-buffer-pool-size`
# for the same constant. Used for the resilience log-safety component.
LOG_SAFETY_THRESHOLD_BYTES = 256 * 1024 * 1024


def parse_args():
    """Parse command line arguments using argparse."""
    parser = argparse.ArgumentParser(description=DESCRIPTION)

    parser.add_argument(
        '-V',
        '--version',
        action='version',
        version=f'%(prog)s: v{__version__} by {__author__}',
    )

    parser.add_argument(
        '--always-ok',
        help=lib.args.help('--always-ok'),
        dest='ALWAYS_OK',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '-c',
        '--critical',
        help=lib.args.help('--critical')
        + ' Supports Nagios ranges. Default: %(default)s',
        dest='CRITICAL',
        default=DEFAULT_CRIT,
    )

    parser.add_argument(
        '--defaults-file',
        help='MySQL/MariaDB cnf file to read user, host and password from. '
        'Example: `--defaults-file=/var/spool/icinga2/.my.cnf`. '
        'Default: %(default)s',
        dest='DEFAULTS_FILE',
        default=DEFAULT_DEFAULTS_FILE,
    )

    parser.add_argument(
        '--defaults-group',
        help=lib.args.help('--defaults-group') + ' Default: %(default)s',
        dest='DEFAULTS_GROUP',
        default=DEFAULT_DEFAULTS_GROUP,
    )

    parser.add_argument(
        '--timeout',
        help=lib.args.help('--timeout') + ' Default: %(default)s (seconds)',
        dest='TIMEOUT',
        type=int,
        default=DEFAULT_TIMEOUT,
    )

    parser.add_argument(
        '-w',
        '--warning',
        help=lib.args.help('--warning')
        + ' Supports Nagios ranges. Default: %(default)s',
        dest='WARNING',
        default=DEFAULT_WARN,
    )

    args, _ = parser.parse_known_args()
    return args


# The functions below port specific mysqltuner subroutines that
# populate `@secrec` via `push_recommendation('Security', ...)`. Each
# returns the integer count this subroutine contributes to `@secrec`.
# Functions in `@secrec` reflect what mysqltuner thinks counts towards
# the Security health-score component, NOT every security-related output
# line (e.g. wildcard-host findings go to `@generalrec` only and do not
# affect the score - we mirror that).


def security_recommendations(conn, has_role_column):
    """Port of mysqltuner security_recommendations(). Returns the
    number of entries this subroutine pushes to `@secrec`:

    - 1 if any anonymous accounts exist (one combined push, not per user)
    - N for the N users with an empty password (one push per user)

    Username-as-password and wildcard-host findings push to `@generalrec`
    in mysqltuner, not `@secrec`, so they do NOT count here (mysqltuner's
    health-score deliberately ignores them).
    """
    role_filter = "IS_ROLE = 'N' and " if has_role_column else ''
    findings = 0

    sql_anonymous = f"""
        select count(*) as cnt
        from mysql.user
        where {role_filter}(trim(user) = '' or user is null)
        ;
    """  # nosec B608
    anonymous = int(
        lib.base.coe(lib.db_mysql.select(conn, sql_anonymous, fetchone=True))['cnt']
    )
    if anonymous > 0:
        findings += 1

    # MariaDB 10.4+ stores password info in mysql.global_priv as JSON;
    # older versions have a column on mysql.user. Try the modern path
    # first, then fall through.
    success, row = lib.db_mysql.select(
        conn,
        """
            select count(*) as cnt
            from mysql.global_priv
            where user != ''
                and user != 'mariadb.sys'
                and user != 'mysql.sys'
                and json_contains(priv, '"mysql_native_password"', '$.plugin')
                and json_contains(priv, '""', '$.authentication_string')
                and not json_contains(priv, '"true"', '$.account_locked')
            ;
        """,
        fetchone=True,
    )
    if not success or row is None:
        success, row = lib.db_mysql.select(
            conn,
            """
                select count(*) as cnt
                from mysql.user
                where (authentication_string = '' or authentication_string is null)
                    and user != ''
                    and user != 'mariadb.sys'
                    and user != 'mysql.sys'
                ;
            """,
            fetchone=True,
        )
    findings += int(row['cnt']) if success and row is not None else 0

    return findings


def check_auth_plugins(conn, has_role_column, myvar):
    """Port of mysqltuner check_auth_plugins(). Returns the
    number of entries this subroutine pushes to `@secrec`: one per user
    on the legacy `mysql_native_password` plugin (or `sha256_password`
    on MySQL 8.0+, which mysqltuner also flags as DEPRECATED).
    """
    version = (myvar.get('version') or '').lower()
    is_mariadb = 'mariadb' in version
    mysql_80_plus = (not is_mariadb) and lib.version.version(version) >= (8, 0, 0)

    role_filter = "IS_ROLE = 'N' and " if has_role_column else ''
    sql_legacy = f"""
        select count(*) as cnt
        from mysql.user
        where {role_filter}plugin = 'mysql_native_password'
            and user not in ('mariadb.sys', 'mysql.sys')
        ;
    """  # nosec B608
    success, row = lib.db_mysql.select(conn, sql_legacy, fetchone=True)
    legacy = int(row['cnt']) if success and row is not None else 0

    sha256 = 0
    if mysql_80_plus:
        success, row = lib.db_mysql.select(
            conn,
            f"""
                select count(*) as cnt
                from mysql.user
                where {role_filter}plugin = 'sha256_password'
                    and user not in ('mariadb.sys', 'mysql.sys')
                ;
            """,  # nosec B608
            fetchone=True,
        )
        sha256 = int(row['cnt']) if success and row is not None else 0

    return legacy + sha256


def ssl_tls_recommendations(myvar):
    """Port of mysqltuner ssl_tls_recommendations(). Returns the
    number of entries this subroutine pushes to `@secrec`. Conditions:

    - `have_ssl = DISABLED` -> 1
    - `require_secure_transport = OFF` -> 1
    - `tls_version` contains TLSv1.0 or TLSv1.1 -> 1
    - `tls_version` lacks both TLSv1.2 and TLSv1.3 -> 1
    - `ssl_cert` AND `ssl_key` both empty -> 1
    """
    findings = 0
    if (myvar.get('have_ssl') or '').upper() == 'DISABLED':
        findings += 1
    if (myvar.get('require_secure_transport') or 'OFF').upper() == 'OFF':
        findings += 1
    tls_versions = (myvar.get('tls_version') or '').lower()
    if tls_versions and ('tlsv1.0' in tls_versions or 'tlsv1.1' in tls_versions):
        findings += 1
    if tls_versions and 'tlsv1.2' not in tls_versions and 'tlsv1.3' not in tls_versions:
        findings += 1
    ssl_cert = myvar.get('ssl_cert') or ''
    ssl_key = myvar.get('ssl_key') or ''
    if ssl_cert == '' and ssl_key == '':
        findings += 1
    return findings


def check_remote_user_ssl(conn, myvar):
    """Port of mysqltuner check_remote_user_ssl(). Returns 1 if any
    user can connect from a non-localhost host without `REQUIRE SSL`, 0
    otherwise. mysqltuner pushes one entry regardless of how many users
    match (not one per user).
    """
    version = (myvar.get('version') or '').lower()
    is_mariadb_10_4_plus = 'mariadb' in version and lib.version.version(version) >= (10, 4, 0)
    if is_mariadb_10_4_plus:
        sql = """
            select count(*) as cnt
            from mysql.global_priv
            where host not in ('localhost', '127.0.0.1', '::1')
                and json_value(priv, '$.ssl_type') = ''
            ;
        """
    else:
        sql = """
            select count(*) as cnt
            from mysql.user
            where host not in ('localhost', '127.0.0.1', '::1')
                and (ssl_type = 'NONE' or ssl_type = '')
            ;
        """
    success, row = lib.db_mysql.select(conn, sql, fetchone=True)
    if not success or row is None:
        return 0
    return 1 if int(row['cnt']) > 0 else 0


def count_secrec_findings(conn, has_role_column, myvar):
    """Total `@secrec` count. Sums the per-subroutine ports."""
    return (
        security_recommendations(conn, has_role_column)
        + check_auth_plugins(conn, has_role_column, myvar)
        + ssl_tls_recommendations(myvar)
        + check_remote_user_ssl(conn, myvar)
    )


def count_tables_without_pk(conn):
    """Count InnoDB base tables (in user schemas) without a user-defined
    `PRIMARY KEY`. Used as the metadata-health input to the score. Mirrors
    the check from `mysql-table-indexes`.
    """
    excluded = ', '.join(f"'{s}'" for s in SYSTEM_SCHEMAS)
    sql = f"""
        select count(*) as cnt
        from information_schema.tables t
        where t.TABLE_SCHEMA not in ({excluded})
            and t.TABLE_TYPE = 'BASE TABLE'
            and t.ENGINE = 'InnoDB'
            and not exists (
                select 1
                from information_schema.statistics s
                where s.TABLE_SCHEMA = t.TABLE_SCHEMA
                    and s.TABLE_NAME = t.TABLE_NAME
                    and s.INDEX_NAME = 'PRIMARY'
            )
        ;
    """  # nosec B608
    row = lib.base.coe(lib.db_mysql.select(conn, sql, fetchone=True))
    return int(row['cnt'])


def score_perf_bp(mystat):
    """Buffer pool read efficiency (10 pts)."""
    bp_req = int(mystat.get('Innodb_buffer_pool_read_requests') or 0)
    bp_reads = int(mystat.get('Innodb_buffer_pool_reads') or 0)
    if bp_req + bp_reads == 0:
        return 5, None  # no data; mysqltuner awards half
    pct = bp_req / (bp_req + bp_reads) * 100
    if pct > 99:
        return 10, pct
    if pct > 95:
        return 5, pct
    return 0, pct


def score_perf_temp(mystat):
    """Temp tables on disk (10 pts). Lower is better."""
    created = int(mystat.get('Created_tmp_tables') or 0)
    on_disk = int(mystat.get('Created_tmp_disk_tables') or 0)
    if created == 0:
        return 5, None
    pct = on_disk / created * 100
    if pct < 10:
        return 10, pct
    if pct < 25:
        return 5, pct
    return 0, pct


def score_perf_thread(mystat):
    """Thread cache hit rate (10 pts). Higher is better."""
    threads_created = int(mystat.get('Threads_created') or 0)
    connections = int(mystat.get('Connections') or 0)
    if connections == 0:
        return 5, None
    pct = 100 - (threads_created / connections * 100)
    if pct > 90:
        return 10, pct
    if pct > 50:
        return 5, pct
    return 0, pct


def score_perf_conn(mystat, myvar):
    """Connection-pool usage (10 pts). Lower is better."""
    max_used = int(mystat.get('Max_used_connections') or 0)
    max_conn = int(myvar.get('max_connections') or 0)
    if max_conn == 0:
        return 5, None
    pct = min(max_used / max_conn * 100, 100)
    if pct < 80:
        return 10, pct
    return 0, pct


def score_sec(security_issues):
    """Security score: starts at 30, drops 5 per finding, floored at 0."""
    score = max(30 - 5 * security_issues, 0)
    return score, security_issues


def score_res_lag(replica_status):
    """Replication lag (10 pts). 10 if not a replica."""
    if replica_status is None:
        return 10, None
    lag = (
        replica_status.get('Seconds_Behind_Source')
        if 'Seconds_Behind_Source' in replica_status
        else replica_status.get('Seconds_Behind_Master')
    )
    if lag is None:
        # IO/SQL thread not running, or other reason; treat as worst.
        return 0, None
    lag = int(lag)
    if lag < 1:
        return 10, lag
    if lag < 60:
        return 5, lag
    return 0, lag


def score_res_logs(myvar):
    """Redo-log sizing (10 pts). 5 if `innodb_redo_log_capacity` is exposed
    (MySQL 8.0.30+) AND configured below mysqltuner's 256 MiB cut-off.
    Otherwise 10. Servers without `innodb_redo_log_capacity` (MariaDB,
    MySQL < 8.0.30) award full points, matching `mysql-innodb-buffer-pool-size`'s
    "redo-log sizing check skipped" behaviour and avoiding the contradictory
    "this looks bad" / "we do not check this" pairing in the output of the
    two plugins. mysqltuner's old 25% log-size-ratio rule on
    `innodb_log_file_size` is intentionally not ported here, same as in
    `mysql-innodb-buffer-pool-size`.
    """
    cap_raw = myvar.get('innodb_redo_log_capacity')
    if cap_raw is None:
        return 10, None
    cap = int(cap_raw)
    if cap < LOG_SAFETY_THRESHOLD_BYTES:
        return 5, cap
    return 10, cap


def score_res_meta(missing_pk_count):
    """Metadata health (10 pts). 5 if more than 5 tables miss a PRIMARY
    KEY. Mirrors mysqltuner's `scalar(@modeling) > 5` cut-off.
    """
    if missing_pk_count > 5:
        return 5, missing_pk_count
    return 10, missing_pk_count


def format_pct(value):
    """Render a percentage value or `?` when no data was available."""
    if value is None:
        return '?'
    return f'{value:.1f}%'


def main():
    """The main function. This is where the magic happens."""

    # Score formula taken from mysqltuner.pl:calculate_health_score(),
    # verified in sync with MySQLTuner. Four performance buckets
    # (40 pts), security (30 pts), three resilience buckets (30 pts).
    # Individual fix advice lives in the per-theme mysql-* plugins; this
    # plugin produces only the aggregate score.

    # parse the command line
    try:
        args = parse_args()
    except SystemExit:
        sys.exit(STATE_UNKNOWN)

    # fetch data
    mysql_connection = {
        'defaults_file': args.DEFAULTS_FILE,
        'defaults_group': args.DEFAULTS_GROUP,
        'timeout': args.TIMEOUT,
    }
    conn = lib.base.coe(lib.db_mysql.connect(mysql_connection))
    lib.base.coe(lib.db_mysql.check_privileges(conn, 'SELECT'))

    mystat = lib.db_mysql.get_all_status(conn)
    myvar = lib.db_mysql.get_all_variables(conn)
    replica_status = lib.db_mysql.get_replica_status(conn)
    has_role_column = lib.db_mysql.has_is_role_column(conn)
    security_issues = count_secrec_findings(conn, has_role_column, myvar)
    missing_pk = count_tables_without_pk(conn)

    lib.db_mysql.close(conn)

    # init some vars
    perfdata = ''

    # analyze data
    perf_bp_score, perf_bp_val = score_perf_bp(mystat)
    perf_temp_score, perf_temp_val = score_perf_temp(mystat)
    perf_thread_score, perf_thread_val = score_perf_thread(mystat)
    perf_conn_score, perf_conn_val = score_perf_conn(mystat, myvar)
    sec_score, sec_count = score_sec(security_issues)
    res_lag_score, res_lag_val = score_res_lag(replica_status)
    res_logs_score, res_logs_val = score_res_logs(myvar)
    res_meta_score, res_meta_val = score_res_meta(missing_pk)

    performance_total = (
        perf_bp_score + perf_temp_score + perf_thread_score + perf_conn_score
    )
    resilience_total = res_lag_score + res_logs_score + res_meta_score
    total_score = performance_total + sec_score + resilience_total

    state = lib.base.get_state(
        total_score,
        args.WARNING,
        args.CRITICAL,
        _operator='range',
    )

    # build the message
    summary = (
        f'Health Score: {total_score}/100 '
        f'(Performance {performance_total}/40, '
        f'Security {sec_score}/30, '
        f'Resilience {resilience_total}/30)'
        f'{lib.base.state2str(state, prefix=" ")}'
    )

    if state == STATE_OK:
        sections = ['Everything is ok. ' + summary + '.']
    else:
        sections = [summary + '.']

    lag_display = 'n/a' if res_lag_val is None else f'{res_lag_val}s'
    log_safety_line = (
        'check skipped (server does not expose `innodb_redo_log_capacity`)'
        if res_logs_val is None
        else f'redo cap {res_logs_val // (1024 * 1024)}MiB'
    )

    def _line(label, value_text, score, max_score, hint):
        """Append the fix hint only when the component is below its max."""
        base = f'* {label}: {value_text} ({score}/{max_score})'
        return f'{base} - {hint}' if score < max_score else base

    breakdown_lines = [
        'Component breakdown:',
        _line(
            'Buffer pool hit rate',
            format_pct(perf_bp_val),
            perf_bp_score,
            10,
            'aim for > 95%; see `mysql-innodb-buffer-pool-size` for fixes',
        ),
        _line(
            'Temp tables on disk',
            format_pct(perf_temp_val),
            perf_temp_score,
            10,
            'aim for < 10%; see `mysql-temp-tables` for fixes',
        ),
        _line(
            'Thread cache hit rate',
            format_pct(perf_thread_val),
            perf_thread_score,
            10,
            'aim for > 90%; see `mysql-thread-cache` for fixes',
        ),
        _line(
            'Connection usage',
            format_pct(perf_conn_val),
            perf_conn_score,
            10,
            'aim for < 80%; see `mysql-connections` for fixes',
        ),
        _line(
            'Security issues',
            str(sec_count),
            sec_score,
            30,
            'see `mysql-user-security` for fixes',
        ),
        _line(
            'Replication lag',
            lag_display,
            res_lag_score,
            10,
            'see `mysql-replica-status` for fixes',
        ),
        _line(
            'Log safety',
            log_safety_line,
            res_logs_score,
            10,
            'see `mysql-innodb-buffer-pool-size` for fixes',
        ),
        _line(
            'Metadata',
            f'{res_meta_val} InnoDB tables without `PRIMARY KEY`',
            res_meta_score,
            10,
            'see `mysql-table-indexes` for fixes',
        ),
    ]
    sections.append('\n'.join(breakdown_lines))

    msg = '\n\n'.join(sections)

    perfdata += lib.base.get_perfdata(
        'mysql_health_score',
        total_score,
        warn=args.WARNING,
        crit=args.CRITICAL,
        _min=0,
        _max=100,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_health_performance',
        performance_total,
        _min=0,
        _max=40,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_health_security',
        sec_score,
        _min=0,
        _max=30,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_health_resilience',
        resilience_total,
        _min=0,
        _max=30,
    )

    # over and out
    lib.base.oao(msg, state, perfdata, always_ok=args.ALWAYS_OK)


if __name__ == '__main__':
    try:
        main()
    except Exception:
        lib.base.cu()
