#!/usr/lib64/linuxfabrik-monitoring-plugins/venv/bin/python
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author:  Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
#          https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.

# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.md

"""See the check's README for more details."""

import argparse
import sys

import lib.args
import lib.base
import lib.db_mysql
import lib.txt
import lib.version
from lib.globals import STATE_OK, STATE_UNKNOWN

__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2026051204'

DESCRIPTION = """Checks MySQL/MariaDB user security: anonymous accounts (empty user name),
accounts with empty passwords, accounts whose password matches the username (the classic
`root/root` weak-password pattern), accounts whose password matches a small dictionary of
common defaults (`password`, `admin`, `root`, `letmein`, ...), accounts that accept
connections from any host (`'%'` wildcard), and accounts still on the legacy SHA1-based
`mysql_native_password` (or `sha256_password` on MySQL 8.0+) authentication plugin. Each
finding maps to a copy-pasteable SQL recommendation."""

DEFAULT_DEFAULTS_FILE = '/var/spool/icinga2/.my.cnf'
DEFAULT_DEFAULTS_GROUP = 'client'
DEFAULT_SEVERITY = 'warn'
DEFAULT_TIMEOUT = 3

# Placeholder password in suggested SQL statements. Self-evidently not a real
# value, so an admin who copies the SQL has to set their own password instead
# of accidentally going live with whatever the plugin printed.
PASSWORD_PLACEHOLDER = '<replace-with-strong-password>'

# Hostname placeholder for the `RENAME USER ... TO ...` advice.
HOST_PLACEHOLDER = 'LimitedIPRangeOrLocalhost'

# Possible password column names across MySQL / MariaDB versions, tried in
# order. The last entry handles the MySQL 5.6-era split where the column
# depends on the auth plugin.
PASS_COLUMN_NAMES = [
    'password',
    'authentication_string',
    'IF(plugin="mysql_native_password", authentication_string, password)',
]

# Dictionary of common default / weak passwords. Merged from two sources
# and kept alphabetical:
# - SecLists xato-net-10-million-passwords-100 (real-world breach corpus,
#   https://github.com/danielmiessler/SecLists), filtered to drop empty
#   entries and any candidate containing an apostrophe.
# - A small set of MySQL/MariaDB-specific defaults that ship with
#   packaged stacks (`admin`, `changeme`, `default`, `mariadb`, `mysql`,
#   `root`, `welcome`, ...).
# Compared at server side via the `PASSWORD()` function, so the
# candidates never leave the database. `PASSWORD()` was removed in MySQL
# 8.0; the check returns [] silently there (caller does not need to
# special-case the version).
WEAK_PASSWORDS = (
    '000000',
    '1111',
    '111111',
    '11111111',
    '112233',
    '121212',
    '123123',
    '123321',
    '1234',
    '12345',
    '123456',
    '1234567',
    '12345678',
    '123456789',
    '1234567890',
    '123qwe',
    '131313',
    '159753',
    '1qaz2wsx',
    '2000',
    '555555',
    '654321',
    '666666',
    '6969',
    '696969',
    '777777',
    '7777777',
    '987654321',
    'aaaaaa',
    'abc123',
    'access',
    'admin',
    'amanda',
    'andrew',
    'asdfgh',
    'ashley',
    'asshole',
    'austin',
    'baseball',
    'batman',
    'biteme',
    'buster',
    'changeme',
    'charlie',
    'cheese',
    'chelsea',
    'computer',
    'dallas',
    'daniel',
    'default',
    'dragon',
    'football',
    'freedom',
    'fuck',
    'fuckme',
    'fuckyou',
    'george',
    'ginger',
    'guest',
    'harley',
    'hockey',
    'hunter',
    'iloveyou',
    'jennifer',
    'jessica',
    'jordan',
    'joshua',
    'killer',
    'klaster',
    'letmein',
    'love',
    'maggie',
    'mariadb',
    'master',
    'matthew',
    'michael',
    'michelle',
    'monkey',
    'mustang',
    'mysql',
    'nicole',
    'pass',
    'password',
    'pepper',
    'princess',
    'pussy',
    'qazwsx',
    'qwerty',
    'qwertyuiop',
    'qwertzuiop',
    'ranger',
    'robert',
    'root',
    'secret',
    'shadow',
    'soccer',
    'starwars',
    'summer',
    'sunshine',
    'superman',
    'taylor',
    'test',
    'thomas',
    'thunder',
    'tigger',
    'trustno1',
    'user',
    'welcome',
    'yankees',
    'zxcvbn',
    'zxcvbnm',
)


def parse_args():
    """Parse command line arguments using argparse."""
    parser = argparse.ArgumentParser(description=DESCRIPTION)

    parser.add_argument(
        '-V',
        '--version',
        action='version',
        version=f'%(prog)s: v{__version__} by {__author__}',
    )

    parser.add_argument(
        '--always-ok',
        help=lib.args.help('--always-ok'),
        dest='ALWAYS_OK',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '--defaults-file',
        help='MySQL/MariaDB cnf file to read user, host and password from. '
        'Example: `--defaults-file=/var/spool/icinga2/.my.cnf`. '
        'Default: %(default)s',
        dest='DEFAULTS_FILE',
        default=DEFAULT_DEFAULTS_FILE,
    )

    parser.add_argument(
        '--defaults-group',
        help=lib.args.help('--defaults-group') + ' Default: %(default)s',
        dest='DEFAULTS_GROUP',
        default=DEFAULT_DEFAULTS_GROUP,
    )

    parser.add_argument(
        '--severity',
        help='Severity for the threshold-less security findings (anonymous '
        'accounts, empty passwords, weak passwords, wildcard hosts). '
        'One of `warn` or `crit`. '
        'Default: %(default)s',
        dest='SEVERITY',
        default=DEFAULT_SEVERITY,
        choices=['warn', 'crit'],
    )

    parser.add_argument(
        '--timeout',
        help=lib.args.help('--timeout') + ' Default: %(default)s (seconds)',
        dest='TIMEOUT',
        type=int,
        default=DEFAULT_TIMEOUT,
    )

    args, _ = parser.parse_known_args()
    return args


def validate_password_active(conn):
    """Return True if the `validate_password` plugin is loaded and active.

    On MySQL 5.7+ with `validate_password` enabled, comparing a column to
    `PASSWORD(user)` from the plugin fails with an error (Bug #80860).
    mysqltuner skips the username-as-password check in that case and we
    mirror.
    """
    sql = """
        select count(*) as cnt
        from information_schema.plugins
        where PLUGIN_NAME = 'validate_password'
            and PLUGIN_STATUS = 'ACTIVE'
        ;
    """
    success, row = lib.db_mysql.select(conn, sql, fetchone=True)
    if not success or row is None:
        return False
    return int(row['cnt']) > 0


def find_anonymous_users(conn, role_filter):
    sql = f"""
        select concat(quote(user), '@', quote(host)) as user
        from mysql.user
        where {role_filter}(trim(user) = '' or user is null)
        ;
    """  # nosec B608
    return lib.base.coe(lib.db_mysql.select(conn, sql))


def find_empty_password_users(conn):
    # MariaDB 10.4+ stores priv data in `mysql.global_priv` as JSON. The
    # mysql.user view's password column is no longer authoritative there.
    sql = """
        select concat(quote(user), '@', quote(host)) as user
        from mysql.global_priv
        where user != ''
            and user != 'mariadb.sys'
            and user != 'mysql.sys'
            and json_contains(priv, '"mysql_native_password"', '$.plugin')
            and json_contains(priv, '""', '$.authentication_string')
            and not json_contains(priv, '"true"', '$.account_locked')
        ;
    """
    success, users = lib.db_mysql.select(conn, sql)
    if success:
        return users
    # Older MySQL / MariaDB: fall through the known password column names.
    for pass_column in PASS_COLUMN_NAMES:
        sql = f"""
            select concat(quote(user), '@', quote(host)) as user
            from mysql.user
            where ({pass_column} = '' or {pass_column} is null)
                and user != ''
                and user != 'mariadb.sys'
                and user != 'mysql.sys'
                /*!50501 and plugin not in ('auth_socket', 'unix_socket',
                                            'win_socket', 'auth_pam_compat') */
                /*!80000 and account_locked = 'N' and password_expired = 'N' */
            ;
        """  # nosec B608
        success, users = lib.db_mysql.select(conn, sql)
        if success:
            return users
    return []


def find_username_as_password_users(conn, role_filter):
    # `PASSWORD(...)` was removed in MySQL 8.0 and this whole check no
    # longer compiles there; the caller already guards by version. We try
    # the known password column names in order and stop at the first that
    # parses against the running server.
    for pass_column in PASS_COLUMN_NAMES:
        sql = f"""
            select concat(quote(user), '@', quote(host)) as user
            from mysql.user
            where {role_filter}user != ''
                and user != 'mariadb.sys'
                and user != 'mysql.sys'
                and (
                    cast({pass_column} as binary) = password(user)
                    or cast({pass_column} as binary) = password(upper(user))
                    or cast({pass_column} as binary) = password(
                        concat(upper(left(user, 1)),
                               substring(user, 2, length(user)))
                    )
                )
            ;
        """  # nosec B608
        success, users = lib.db_mysql.select(conn, sql)
        if success:
            return users
    return []


def find_weak_password_users(conn, role_filter):
    """Match each user's stored password hash against the WEAK_PASSWORDS
    dictionary. Comparison runs server-side via `cast(<col> as binary) =
    password('<candidate>')`, so candidates and hashes never leave the
    database and the existing pass-column detection ladder is reused.
    `PASSWORD()` was removed in MySQL 8.0, so on those servers every
    candidate query errors out and the function returns []. Returns a
    list of `{'user': '...', 'password': '<weak-value>'}` dicts.
    """
    # Build the candidate set as a SQL UNION once per column attempt so
    # the join below stays a single round-trip per column.
    candidates_union = ' union all '.join(
        f"select '{p}' as password"
        for p in WEAK_PASSWORDS
    )
    for pass_column in PASS_COLUMN_NAMES:
        sql = f"""
            select concat(quote(u.user), '@', quote(u.host)) as user,
                w.password as password
            from mysql.user u
            join ({candidates_union}) w
                on cast(u.{pass_column} as binary) = password(w.password)
            where {role_filter}u.user != ''
                and u.user != 'mariadb.sys'
                and u.user != 'mysql.sys'
            order by u.user, u.host, w.password
            ;
        """  # nosec B608
        success, rows = lib.db_mysql.select(conn, sql)
        if success:
            return rows
    return []


def find_legacy_auth_plugin_users(conn, role_filter, mysql_80_plus):
    """Port of mysqltuner check_auth_plugins(). Returns the list
    of users on the legacy SHA1-based `mysql_native_password` plugin (and
    on `sha256_password` on MySQL 8.0+, which Oracle marked DEPRECATED
    too). MySQL 8.0+ recommends `caching_sha2_password`; MariaDB 10.4+
    recommends `ed25519` or `unix_socket`.
    """
    plugins = "'mysql_native_password'"
    if mysql_80_plus:
        plugins += ", 'sha256_password'"
    sql = f"""
        select concat(quote(user), '@', quote(host)) as user,
            plugin
        from mysql.user
        where {role_filter}plugin in ({plugins})
            and user not in ('mariadb.sys', 'mysql.sys')
        ;
    """  # nosec B608
    success, users = lib.db_mysql.select(conn, sql)
    return users if success else []


def find_wildcard_host_users(conn):
    sql = """
        select concat(quote(user), '@', quote(host)) as user
        from mysql.user
        where host = '%'
        ;
    """
    return lib.base.coe(lib.db_mysql.select(conn, sql))


def count_non_system_users(conn, role_filter):
    """Total non-system user accounts. Excludes roles when `IS_ROLE` exists."""
    sql = f"""
        select count(*) as cnt
        from mysql.user
        where {role_filter}user not in ('mariadb.sys', 'mysql.sys')
        ;
    """  # nosec B608
    row = lib.base.coe(lib.db_mysql.select(conn, sql, fetchone=True))
    return int(row['cnt'])


def count_roles(conn, has_role_column):
    """Number of MariaDB roles. Returns 0 when the column does not exist."""
    if not has_role_column:
        return 0
    sql = """
        select count(*) as cnt
        from mysql.user
        where IS_ROLE = 'Y'
        ;
    """
    row = lib.base.coe(lib.db_mysql.select(conn, sql, fetchone=True))
    return int(row['cnt'])


def main():
    """The main function. This is where the magic happens."""

    # logic taken from mysqltuner.pl:security_recommendations(), verified in
    # sync with MySQLTuner. The basic password dictionary file check
    # that mysqltuner runs after the SQL-level checks is intentionally not
    # ported - it depends on a local password word-list and is not the kind
    # of test that fits a recurring monitoring plugin.

    # parse the command line
    try:
        args = parse_args()
    except SystemExit:
        sys.exit(STATE_UNKNOWN)

    severity_state = lib.base.str2state(args.SEVERITY)

    # fetch data
    mysql_connection = {
        'defaults_file': args.DEFAULTS_FILE,
        'defaults_group': args.DEFAULTS_GROUP,
        'timeout': args.TIMEOUT,
    }
    conn = lib.base.coe(lib.db_mysql.connect(mysql_connection))
    lib.base.coe(lib.db_mysql.check_privileges(conn, 'SELECT'))

    # Role-filter fragment: when `IS_ROLE` exists, exclude rows representing
    # roles (MariaDB 10.0.5+). Otherwise the fragment is empty.
    has_role_column = lib.db_mysql.has_is_role_column(conn)
    role_filter = "IS_ROLE = 'N' and " if has_role_column else ''
    skip_username_as_password = validate_password_active(conn)

    myvar = lib.db_mysql.get_all_variables(conn)
    version = (myvar.get('version') or '').lower()
    is_mariadb = 'mariadb' in version
    mysql_80_plus = (not is_mariadb) and lib.version.version(version) >= (8, 0, 0)
    is_mariadb_10_4_plus = is_mariadb and lib.version.version(version) >= (10, 4, 0)

    anonymous_users = find_anonymous_users(conn, role_filter)
    empty_password_users = find_empty_password_users(conn)
    username_pw_users = (
        []
        if skip_username_as_password
        else find_username_as_password_users(conn, role_filter)
    )
    # Same `validate_password`-plugin caveat applies to the weak-password
    # dictionary check (MySQL Bug #80860 makes the comparison fail when
    # the plugin is active).
    weak_password_users = (
        []
        if skip_username_as_password
        else find_weak_password_users(conn, role_filter)
    )
    wildcard_host_users = find_wildcard_host_users(conn)
    legacy_auth_users = find_legacy_auth_plugin_users(conn, role_filter, mysql_80_plus)
    user_count = count_non_system_users(conn, role_filter)
    role_count = count_roles(conn, has_role_column)

    lib.db_mysql.close(conn)

    # init some vars
    state = STATE_OK
    sections = []
    findings = []
    # All recommendations land here and render once at the end as a
    # `Recommendations:\n* ...` bulleted block, regardless of which findings
    # fire. Copy-pasteable SQL statements are kept verbatim so the admin can
    # apply them after substituting the placeholder.
    recommendations = []
    perfdata = ''

    # analyze data
    # Each check produces one facts line: "N <category>" with a state marker
    # when N > 0, or "0 <category>" on a clean check.
    n_anon = len(anonymous_users)
    if n_anon:
        state = lib.base.get_worst(state, severity_state)
    findings.append(
        f'{n_anon} anonymous {lib.txt.pluralize("account", n_anon)}'
        f'{lib.base.state2str(severity_state, prefix=" ") if n_anon else ""}'
    )
    for u in anonymous_users:
        recommendations.append(f'`DROP USER {u["user"]};`')

    n_empty = len(empty_password_users)
    if n_empty:
        state = lib.base.get_worst(state, severity_state)
    findings.append(
        f'{n_empty} {lib.txt.pluralize("user", n_empty)} without password'
        f'{lib.base.state2str(severity_state, prefix=" ") if n_empty else ""}'
    )
    for u in empty_password_users:
        recommendations.append(
            f"`SET PASSWORD FOR {u['user']} = PASSWORD('{PASSWORD_PLACEHOLDER}');`"
        )

    if skip_username_as_password:
        findings.append(
            'username-as-password and weak-password checks skipped '
            '(`validate_password` plugin is active; MySQL Bug #80860)'
        )
    else:
        n_uname = len(username_pw_users)
        if n_uname:
            state = lib.base.get_worst(state, severity_state)
        findings.append(
            f'{n_uname} {lib.txt.pluralize("user", n_uname)} '
            f'with username as password'
            f'{lib.base.state2str(severity_state, prefix=" ") if n_uname else ""}'
        )
        for u in username_pw_users:
            recommendations.append(
                f"`SET PASSWORD FOR {u['user']} = PASSWORD('{PASSWORD_PLACEHOLDER}');`"
            )

        n_weak = len(weak_password_users)
        if n_weak:
            state = lib.base.get_worst(state, severity_state)
        findings.append(
            f'{n_weak} {lib.txt.pluralize("user", n_weak)} '
            f'with a weak default password'
            f'{lib.base.state2str(severity_state, prefix=" ") if n_weak else ""}'
        )
        for u in weak_password_users:
            recommendations.append(
                f"{u['user']}: password is `{u['password']}` (well-known default); "
                f"`SET PASSWORD FOR {u['user']} = PASSWORD('{PASSWORD_PLACEHOLDER}');`"
            )

    n_wildcard = len(wildcard_host_users)
    if n_wildcard:
        state = lib.base.get_worst(state, severity_state)
    findings.append(
        f'{n_wildcard} {lib.txt.pluralize("account", n_wildcard)} '
        f'without hostname restriction'
        f'{lib.base.state2str(severity_state, prefix=" ") if n_wildcard else ""}'
    )
    for u in wildcard_host_users:
        renamed = u['user'].replace("'%'", f"'{HOST_PLACEHOLDER}'")
        recommendations.append(f'`RENAME USER {u["user"]} TO {renamed};`')

    n_legacy = len(legacy_auth_users)
    if n_legacy:
        state = lib.base.get_worst(state, severity_state)
    findings.append(
        f'{n_legacy} {lib.txt.pluralize("user", n_legacy)} '
        f'on a legacy authentication plugin'
        f'{lib.base.state2str(severity_state, prefix=" ") if n_legacy else ""}'
    )
    # Version-aware modern plugin: caching_sha2_password on MySQL 8.0+,
    # ed25519 on MariaDB 10.4+. Older MySQL/MariaDB stay on
    # mysql_native_password but we still flag it so an admin sees the
    # SHA1 exposure.
    if mysql_80_plus:
        for u in legacy_auth_users:
            recommendations.append(
                f"`ALTER USER {u['user']} IDENTIFIED WITH caching_sha2_password "
                f"BY '{PASSWORD_PLACEHOLDER}';`"
            )
    elif is_mariadb_10_4_plus:
        for u in legacy_auth_users:
            recommendations.append(
                f"`ALTER USER {u['user']} IDENTIFIED VIA ed25519 "
                f"USING PASSWORD('{PASSWORD_PLACEHOLDER}');`"
            )
    else:
        for u in legacy_auth_users:
            recommendations.append(
                f"{u['user']}: server version predates "
                f'`caching_sha2_password` / `ed25519`; upgrade the server first.'
            )

    # build the message
    scope = f'{user_count} non-system user '
    scope += lib.txt.pluralize('account', user_count)
    if has_role_column:
        scope += f', {role_count} {lib.txt.pluralize("role", role_count)}'
    facts_text = scope + '. ' + '. '.join(findings) + '.'
    if state == STATE_OK:
        sections.append('Everything is ok. ' + facts_text)
    else:
        sections.append(facts_text)

    if recommendations:
        sections.append(
            'Recommendations:\n' + '\n'.join(f'* {r}' for r in recommendations)
        )

    msg = '\n\n'.join(sections)

    perfdata += lib.base.get_perfdata(
        'mysql_user_count',
        user_count,
        _min=0,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_role_count',
        role_count,
        _min=0,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_anonymous_users',
        len(anonymous_users),
        _min=0,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_users_without_password',
        len(empty_password_users),
        _min=0,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_users_with_username_as_password',
        len(username_pw_users),
        _min=0,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_users_with_weak_password',
        len(weak_password_users),
        _min=0,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_users_with_wildcard_host',
        len(wildcard_host_users),
        _min=0,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_users_on_legacy_auth_plugin',
        len(legacy_auth_users),
        _min=0,
    )

    # over and out
    lib.base.oao(msg, state, perfdata, always_ok=args.ALWAYS_OK)


if __name__ == '__main__':
    try:
        main()
    except Exception:
        lib.base.cu()
