#!/usr/lib64/linuxfabrik-monitoring-plugins/venv/bin/python
# -*- coding: utf-8; py-indent-offset: 4 -*-
#
# Author:  Linuxfabrik GmbH, Zurich, Switzerland
# Contact: info (at) linuxfabrik (dot) ch
#          https://www.linuxfabrik.ch/
# License: The Unlicense, see LICENSE file.

# https://github.com/Linuxfabrik/monitoring-plugins/blob/main/CONTRIBUTING.md

"""See the check's README for more details."""

import argparse
import sys

import lib.args
import lib.base
import lib.db_mysql
import lib.txt
from lib.globals import STATE_OK, STATE_UNKNOWN

__author__ = 'Linuxfabrik GmbH, Zurich/Switzerland'
__version__ = '2026051101'

DESCRIPTION = """Checks the replication status of a MySQL/MariaDB replica: I/O thread state, SQL
thread state, replication lag (`Seconds_Behind_Master` / `Seconds_Behind_Source`),
`read_only` mode, semi-synchronous replication mode and (on MariaDB 10.5+) parallel
replication settings. Also reports whether this server is a Galera node and how
many downstream replicas it is feeding. Alerts when replication is broken,
configured but not running, or lagging behind."""

DEFAULT_DEFAULTS_FILE = '/var/spool/icinga2/.my.cnf'
DEFAULT_DEFAULTS_GROUP = 'client'
DEFAULT_LAG_WARN = 0
DEFAULT_SEVERITY = 'warn'
DEFAULT_TIMEOUT = 3


def parse_args():
    """Parse command line arguments using argparse."""
    parser = argparse.ArgumentParser(description=DESCRIPTION)

    parser.add_argument(
        '-V',
        '--version',
        action='version',
        version=f'%(prog)s: v{__version__} by {__author__}',
    )

    parser.add_argument(
        '--always-ok',
        help=lib.args.help('--always-ok'),
        dest='ALWAYS_OK',
        action='store_true',
        default=False,
    )

    parser.add_argument(
        '--defaults-file',
        help='MySQL/MariaDB cnf file to read user, host and password from. '
        'Example: `--defaults-file=/var/spool/icinga2/.my.cnf`. '
        'Default: %(default)s',
        dest='DEFAULTS_FILE',
        default=DEFAULT_DEFAULTS_FILE,
    )

    parser.add_argument(
        '--defaults-group',
        help=lib.args.help('--defaults-group') + ' Default: %(default)s',
        dest='DEFAULTS_GROUP',
        default=DEFAULT_DEFAULTS_GROUP,
    )

    parser.add_argument(
        '--lag-warning',
        help='Seconds of replication lag at which the WARN flag is raised. '
        'A value of 0 means any lag (the historic default; matches the '
        'mysqltuner cut-off). '
        'Default: %(default)s',
        dest='LAG_WARN',
        type=int,
        default=DEFAULT_LAG_WARN,
    )

    parser.add_argument(
        '--lag-critical',
        help='Seconds of replication lag at which the CRIT flag is raised. '
        'If omitted, lag never escalates to CRIT.',
        dest='LAG_CRIT',
        type=int,
        default=None,
    )

    parser.add_argument(
        '--severity',
        help='Severity for alerts that do not depend on thresholds '
        '(IO/SQL thread not running, `read_only` disabled). '
        'One of `warn` or `crit`. '
        'Default: %(default)s',
        dest='SEVERITY',
        default=DEFAULT_SEVERITY,
        choices=['warn', 'crit'],
    )

    parser.add_argument(
        '--timeout',
        help=lib.args.help('--timeout') + ' Default: %(default)s (seconds)',
        dest='TIMEOUT',
        type=int,
        default=DEFAULT_TIMEOUT,
    )

    args, _ = parser.parse_known_args()
    return args


def get_vars(conn):
    # Do not implement `get_all_vars()`, just fetch the ones we need for this check.
    # Without the GLOBAL modifier, SHOW VARIABLES displays the values that are used for
    # the current connection to MariaDB.
    sql = """
        show global variables
        where variable_name like 'binlog_format'
            or variable_name like 'innodb_support_xa'
            or variable_name like 'read_only'
            or variable_name like 'replica_parallel_mode'
            or variable_name like 'replica_parallel_threads'
            or variable_name like 'rpl_semi_sync_master_enabled'
            or variable_name like 'rpl_semi_sync_replica_enabled'
            or variable_name like 'rpl_semi_sync_slave_enabled'
            or variable_name like 'rpl_semi_sync_source_enabled'
            or variable_name like 'slave_parallel_mode'
            or variable_name like 'slave_parallel_threads'
            or variable_name like 'version'
            or variable_name like 'wsrep_on'
            or variable_name like 'wsrep_provider_options'
            ;
          """
    return lib.base.coe(lib.db_mysql.select(conn, sql))


def has_galera(myvar):
    """True when `wsrep_on = ON` and `wsrep_provider_options` carries any value."""
    return bool(
        myvar.get('wsrep_provider_options', '') and myvar.get('wsrep_on') != 'OFF'
    )


def either(myvar, *names, default=None):
    """Return the first defined value among `names`, mirroring mysqltuner's
    `// fallback` (defined-or) operator. Used to bridge the
    `Slave_*` / `Replica_*` rename across MySQL and MariaDB versions.
    """
    for name in names:
        value = myvar.get(name)
        if value is not None:
            return value
    return default


def is_mariadb(myvar):
    version = myvar.get('version', '')
    return 'MariaDB' in version


def main():
    """The main function. This is where the magic happens."""

    # logic taken from mysqltuner.pl:get_replication_status(),
    # verified in sync with MySQLTuner

    # parse the command line
    try:
        args = parse_args()
    except SystemExit:
        sys.exit(STATE_UNKNOWN)

    # fetch data
    mysql_connection = {
        'defaults_file': args.DEFAULTS_FILE,
        'defaults_group': args.DEFAULTS_GROUP,
        'timeout': args.TIMEOUT,
    }
    conn = lib.base.coe(lib.db_mysql.connect(mysql_connection))
    lib.base.coe(
        lib.db_mysql.check_privileges(
            conn,
            ['REPLICATION CLIENT', 'SLAVE MONITOR', 'REPLICA MONITOR'],
        ),
    )

    myvar = lib.db_mysql.lod2dict(get_vars(conn))

    # Returns first row (a dict) or None when the server is not a replica.
    # Tries `SHOW REPLICA STATUS` (MariaDB 10.5+ / MySQL 8.0.22+) and silently
    # falls back to `SHOW SLAVE STATUS` on older servers.
    myrepl = lib.db_mysql.get_replica_status(conn)
    myslaves = lib.base.coe(lib.db_mysql.select(conn, 'show slave hosts;'))

    lib.db_mysql.close(conn)

    # init some vars
    state = STATE_OK
    sections = []
    facts = []
    perfdata = ''
    # All recommendations from all WARN/CRIT paths land here and render once at
    # the end as a `Recommendations:\n* ...` bulleted block, regardless of which
    # combinations of WARN paths fire.
    recommendations = []
    severity_state = lib.base.str2state(args.SEVERITY)
    galera = has_galera(myvar)

    # analyze data
    # IO + SQL thread state. mysqltuner reads both names (Slave_* / Replica_*)
    # and uses the first that is defined.
    io_running = ''
    sql_running = ''
    seconds_behind = None
    if myrepl:
        io_running = either(
            myrepl, 'Slave_IO_Running', 'Replica_IO_Running', default=''
        )
        sql_running = either(
            myrepl, 'Slave_SQL_Running', 'Replica_SQL_Running', default=''
        )
        seconds_behind = either(
            myrepl, 'Seconds_Behind_Master', 'Seconds_Behind_Source'
        )

    io_ok = io_running.lower() == 'yes'
    sql_ok = sql_running.lower() == 'yes'

    # build the message
    facts.append(f'Galera synchronous replication: {"YES" if galera else "NO"}')
    if myslaves:
        facts.append(
            f'Acting as primary for {len(myslaves)}'
            f' {lib.txt.pluralize("server", len(myslaves))}'
        )
    facts.append(f'`binlog_format` is `{myvar["binlog_format"]}`')
    facts.append(f'`innodb_support_xa` is `{myvar.get("innodb_support_xa", "ON")}`')

    semi_master = either(
        myvar,
        'rpl_semi_sync_source_enabled',
        'rpl_semi_sync_master_enabled',
    )
    facts.append(
        f'Semi-sync primary: `{semi_master}`'
        if semi_master
        else 'Semi-sync primary: not activated'
    )
    semi_replica = either(
        myvar,
        'rpl_semi_sync_replica_enabled',
        'rpl_semi_sync_slave_enabled',
    )
    facts.append(
        f'Semi-sync replica: `{semi_replica}`'
        if semi_replica
        else 'Semi-sync replica: not activated'
    )

    # Standalone / non-replica fast paths. No replica-side checks apply.
    if not myrepl and not myslaves:
        facts.append('This is a standalone server')
    elif not myrepl:
        facts.append('No replication setup for this server, or replication not started')
    else:
        if not io_ok or not sql_ok:
            state = lib.base.get_worst(state, severity_state)
            facts.append(
                f'Replica is not running but seems to be configured'
                f' (IO: `{io_running}`, SQL: `{sql_running}`)'
                f'{lib.base.state2str(severity_state, prefix=" ")}'
            )
            recommendations.append(
                'Investigate the IO/SQL thread errors via'
                ' `SHOW REPLICA STATUS` (look at `Last_IO_Error`,'
                ' `Last_SQL_Error`)'
            )
        else:
            # Replica is running. Check read_only and lag.
            if myvar.get('read_only') == 'OFF':
                state = lib.base.get_worst(state, severity_state)
                facts.append(
                    f'Replica is running with `read_only` = `OFF`'
                    f'{lib.base.state2str(severity_state, prefix=" ")}'
                )
                recommendations.append(
                    'Set `read_only` = `ON` on the replica to prevent'
                    ' application writes from drifting away from the primary'
                )

            lag_state = STATE_OK
            if seconds_behind is None:
                lag_state = severity_state
                facts.append(
                    f'Replica lag: unknown (`Seconds_Behind_Master` /'
                    f' `Seconds_Behind_Source` is NULL)'
                    f'{lib.base.state2str(lag_state, prefix=" ")}'
                )
            else:
                lag = int(seconds_behind)
                # `gt` (strict >) so lag = LAG_WARN (default 0) stays OK and
                # only lag > LAG_WARN flips WARN, matching mysqltuner's
                # "> 0.000001" intent. `None` LAG_CRIT is skipped by get_state.
                lag_state = lib.base.get_state(
                    lag,
                    args.LAG_WARN,
                    args.LAG_CRIT,
                    _operator='gt',
                )
                facts.append(
                    f'Replica lag: {lag}s behind primary'
                    f'{lib.base.state2str(lag_state, prefix=" ")}'
                )
                if lag_state != STATE_OK:
                    recommendations.append(
                        'Investigate why the replica is lagging: long-running'
                        ' transactions on the primary, slow queries on the'
                        ' replica, or insufficient parallel-replication threads'
                    )
                perfdata += lib.base.get_perfdata(
                    'mysql_replication_seconds_behind',
                    lag,
                    uom='s',
                    warn=args.LAG_WARN if args.LAG_WARN > 0 else None,
                    crit=args.LAG_CRIT,
                    _min=0,
                )
            state = lib.base.get_worst(state, lag_state)

        # Parallel-replication knobs (MariaDB 10.5+). mysqltuner skips these on
        # non-MariaDB servers, so we do too. The variable pair was renamed in
        # MariaDB 11.x; mysqltuner reads the first defined name.
        if is_mariadb(myvar):
            parallel_threads_raw = either(
                myvar,
                'replica_parallel_threads',
                'slave_parallel_threads',
                default='0',
            )
            try:
                parallel_threads = int(parallel_threads_raw)
            except (TypeError, ValueError):
                parallel_threads = 0
            if parallel_threads > 1:
                facts.append(f'Parallel replication: {parallel_threads} threads')
                parallel_mode = either(
                    myvar,
                    'replica_parallel_mode',
                    'slave_parallel_mode',
                    default='',
                )
                if parallel_mode and parallel_mode != 'optimistic':
                    state = lib.base.get_worst(state, severity_state)
                    facts.append(
                        f'Parallel replication mode: `{parallel_mode}`'
                        f'{lib.base.state2str(severity_state, prefix=" ")}'
                    )
                    recommendations.append(
                        'Set `replica_parallel_mode` = `optimistic` on'
                        ' MariaDB 10.5+ for better parallel-replication'
                        ' throughput'
                    )
                if myvar.get('binlog_format') != 'ROW':
                    recommendations.append(
                        'Ensure `binlog_format` = `ROW` on the primary for'
                        ' parallel replication to be effective'
                    )
            else:
                facts.append('Parallel replication: disabled')
                if myrepl:
                    recommendations.append(
                        'Set `replica_parallel_threads` to the number of vCPUs'
                        ' to enable parallel replication'
                    )

    # Compose the OK/WARN message. "Everything is ok." leads the OK output so
    # the admin sees the verdict first.
    facts_text = '. '.join(facts) + '.'
    if state == STATE_OK:
        sections.append('Everything is ok. ' + facts_text)
    else:
        sections.append(facts_text)
    if recommendations:
        sections.append(
            'Recommendations:\n' + '\n'.join(f'* {r}' for r in recommendations)
        )
    msg = '\n\n'.join(sections)

    # Always-on perfdata for trending and dashboards.
    perfdata += lib.base.get_perfdata(
        'mysql_replication_io_running',
        1 if io_ok else 0,
        _min=0,
        _max=1,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_replication_sql_running',
        1 if sql_ok else 0,
        _min=0,
        _max=1,
    )
    perfdata += lib.base.get_perfdata(
        'mysql_replication_slave_count',
        len(myslaves),
        _min=0,
    )

    # over and out
    lib.base.oao(msg, state, perfdata, always_ok=args.ALWAYS_OK)


if __name__ == '__main__':
    try:
        main()
    except Exception:
        lib.base.cu()
