diff --git a/docs/reference/slurmctld.md b/docs/reference/slurmctld.md index f13419fe..e0e1baa7 100644 --- a/docs/reference/slurmctld.md +++ b/docs/reference/slurmctld.md @@ -6,3 +6,4 @@ title: slurmctld handler: python options: members: yes + members_order: source diff --git a/pyslurm/core/slurmctld/__init__.py b/pyslurm/core/slurmctld/__init__.py index fe8f970b..68be857f 100644 --- a/pyslurm/core/slurmctld/__init__.py +++ b/pyslurm/core/slurmctld/__init__.py @@ -5,6 +5,18 @@ CgroupConfig, ) from .enums import ShutdownMode +from .stats import ( + diag, + Statistics, + ScheduleExitStatistics, + BackfillExitStatistics, + RPCPending, + RPCUser, + RPCType, + RPCPendingStatistics, + RPCUserStatistics, + RPCTypeStatistics, +) from .base import ( PingResponse, ping, diff --git a/pyslurm/core/slurmctld/stats.pxd b/pyslurm/core/slurmctld/stats.pxd new file mode 100644 index 00000000..5be8d4b5 --- /dev/null +++ b/pyslurm/core/slurmctld/stats.pxd @@ -0,0 +1,446 @@ +######################################################################### +# slurmctld/stats.pxd - pyslurm slurmctld statistics api (sdiag) +######################################################################### +# Copyright (C) 2025 Toni Harzendorf +# +######################################################################### +# Much of the documentation here (with some modifications) has been taken from: +# - https://slurm.schedmd.com/sdiag.html +# - https://github.com/SchedMD/slurm/blob/c28fcf4f15981f891df7893099bceda21e2c5e6e/src/sdiag/sdiag.c +# +# So for completeness, the appropriate Copyright notices are also written +# below: +# +# Copyright (C) 2010-2011 Barcelona Supercomputing Center. +# Copyright (C) 2010-2022 SchedMD LLC. +# +# Please also check the Slurm DISCLAIMER at: pyslurm/slurm/SLURM_DISCLAIMER +######################################################################### + +# This file is part of PySlurm +# +# PySlurm is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +# PySlurm is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with PySlurm; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# cython: c_string_type=unicode, c_string_encoding=default +# cython: language_level=3 + +from libc.string cimport memset +from pyslurm cimport slurm +from pyslurm.slurm cimport ( + stats_info_response_msg_t, + stats_info_request_msg_t, + slurm_get_statistics, + slurm_reset_statistics, + slurm_free_stats_response_msg, + xfree, + xmalloc, +) +from pyslurm.utils cimport cstr +from libc.stdint cimport uint8_t, uint16_t, uint32_t, uint64_t, int64_t +from pyslurm.utils.uint cimport ( + u16_parse, + u32_parse, + u64_parse, + u16_parse_bool, +) + +cdef extern const char *rpc_num2string(uint16_t msg_type) + +cdef parse_response(stats_info_response_msg_t *ptr) + + +cdef class ScheduleExitStatistics: + """Conditions reached at the end of a scheduling run + + Each attribute is simply a counter that describes how many times a specific + condition was met during the main scheduling run. + + Attributes: + end_of_job_queue (int): + Times the end of the job queue was reached. + default_queue_depth (int): + Reached the number of jobs allowed to be tested limit + max_job_start (int): + Reached the number of jobs allowed to start limit + blocked_on_licenses (int): + Times the scheduler blocked on licenses. + max_rpc_count (int): + Reached RPC Limit. + max_time (int): + Reached maximum allowed scheduler time for a cycle. + """ + cdef public: + end_of_job_queue + default_queue_depth + max_job_start + blocked_on_licenses + max_rpc_count + max_time + + @staticmethod + cdef ScheduleExitStatistics from_ptr(stats_info_response_msg_t *ptr) + + +cdef class BackfillExitStatistics: + """Conditions reached at the end of a Backfill scheduling run. + + Each attribute is simply a counter that describes how many times a specific + condition was met during the Backfill scheduling run. + + Attributes: + end_of_job_queue (int): + Times the end of the job queue was reached. + max_job_start (int): + Reached the number of jobs allowed to start limit. + max_job_test (int): + Reached the number of jobs allowed to attempt backfill scheduling + for. + max_time (int): + Reached maximum allowed scheduler time for a cycle. + node_space_size (int): + Reached the node_space table size limit. + state_changed (int): + System state changes. + """ + cdef public: + end_of_job_queue + max_job_start + max_job_test + max_time + node_space_size + state_changed + + @staticmethod + cdef BackfillExitStatistics from_ptr(stats_info_response_msg_t *ptr) + + +cdef class RPCPending: + """Statistics for a pending RPC. + + Attributes: + id (int): + The numeric ID of the RPC type. + name (str): + The string representation of the RPC. + count (int): + How many RPCs are pending of this type. + """ + cdef public: + id + name + count + + +cdef class RPCType: + """Statistics for a specific RPC Type. + + Attributes: + id (int): + The numeric ID of the RPC Type + name (str): + The string representation of the RPC + count (int): + How many times this RPC was issued since the last time the + statistics were cleared. + time (int): + How much total time it has taken to process this RPC. The unit is + microseconds + average_time (int): + How much time on average it has taken to process this RPC. The unit + is microseconds. + queued (int): + How many of these RPCs are still queued. + dropped (int): + How many of these RPCs have been dropped. + cycle_last (int): + Count of RPCs processed within the last RPC queue cycle. + cycle_max (int): + Maximum count of RPCs that have been processed within a RPC queue + cycle. + """ + cdef public: + id + name + count + time + average_time + queued + dropped + cycle_last + cycle_max + + +cdef class RPCUser: + """RPC Statistics for a specific User. + + Attributes: + user_id (int): + The numeric ID of the User. + user_name (str): + The name of the User. + count (int): + How many times the User issued RPCs since the last time the + statistics were cleared. + time (int): + How much total time it has taken to process RPCs by this User. The + unit is microseconds + average_time (int): + How much time on average it has taken to process RPCs by this User. + The unit is microseconds. + """ + cdef public: + user_id + user_name + count + time + average_time + + +cdef class RPCTypeStatistics(dict): + """Collection of [pyslurm.slurmctld.RPCType][] objects. + + Attributes: + count (int): + Total amount of RPCs made to the `slurmctld` since last reset. + time (int): + Total amount of time it has taken to process all RPCs made yet. + queued (int): + Total amount of RPCs queued. + dropped (int): + Total amount of RPCs dropped. + """ + @staticmethod + cdef RPCTypeStatistics from_ptr(stats_info_response_msg_t *ptr, rpc_queue_enabled) + + +cdef class RPCUserStatistics(dict): + """Collection of [pyslurm.slurmctld.RPCUser][] objects. + + Attributes: + count (int): + Total amount of RPCs made to the `slurmctld` since last reset. + time (int): + Total amount of time it has taken to process all RPCs made yet. + """ + @staticmethod + cdef RPCUserStatistics from_ptr(stats_info_response_msg_t *ptr) + + +cdef class RPCPendingStatistics(dict): + """Collection of [pyslurm.slurmctld.RPCPending][] objects. + + Attributes: + count (int): + Total amount of RPCs currently pending. + """ + @staticmethod + cdef RPCPendingStatistics from_ptr(stats_info_response_msg_t *ptr) + + +cdef class Statistics: + """Statistics for the `slurmctld`. + + For more information, also check out the Slurm [sdiag documentation](https://slurm.schedmd.com/sdiag.html). + + Attributes: + request_time (int): + Time when the data was requested. This is a unix timestamp. + data_since (int): + The date when `slurmctld` started gathering statistics. This is a + unix timestamp. + server_thread_count (int): + The number of current active `slurmctld` threads. + rpc_queue_enabled (bool): + Whether RPC queuing is enabled. + agent_queue_size (int): + Count of enqueued outgoing RPC requests in an internal retry list. + agent_count (int): + Number of agent threads. + agent_thread_count (int): + Total count of active threads created by all the agent threads. + dbd_agent_queue_size (int): + Number of messages intended for the `slurmdbd`. If the `slurmdbd` + goes down, then this number starts going up. + jobs_submitted (int): + Number of jobs submitted since last reset + jobs_started (int): + Number of jobs started since last reset. This includes backfilled + jobs. + jobs_completed (int): + Number of jobs completed since last reset. + jobs_canceled (int): + Number of jobs canceled since last reset. + jobs_failed (int): + Number of jobs failed due to `slurmd` or other internal issues since + last reset. + jobs_pending (int): + Number of jobs pending. + jobs_running (int): + Number of jobs running. + schedule_cycle_last (int): + Time in microseconds for last scheduling cycle. + schedule_cycle_max (int): + Maximum time in microseconds for any scheduling cycle since last + reset. + schedule_cycle_counter (int): + Total amount of scheduling cycles ran since last reset. + schedule_cycle_mean (int): + Mean time in microseconds for all scheduling cycles since last + reset. + schedule_cycle_mean_depth (int): + Mean of cycle depth. Depth means number of jobs processed in a + scheduling cycle. + schedule_cycle_sum (int): + Total run time in microseconds for all scheduling cycles since last + reset. + schedule_cycles_per_minute (int): + Counter of scheduling executions per minute. + schedule_queue_length (int): + Length of jobs pending queue. + backfill_active (bool): + Whether these statistics have been gathered during backfilling + operation. + backfilled_jobs (int): + Number of jobs started thanks to backfilling since last slurm + start. + last_backfilled_jobs (int): + Number of jobs started thanks to backfilling since last time stats + where reset. (which is midnight UTC time in this case) + backfilled_het_jobs (int): + Number of heterogeneous job components started thanks to + backfilling since last Slurm start. + backfill_cycle_counter (int): + Number of backfill scheduling cycles since last reset. + backfill_cycle_last_when (int): + Time when last backfill scheduling cycle happened. This is a unix + timestamp. + backfill_cycle_last (int): + Time in microseconds of last backfill scheduling cycle. + backfill_cycle_max (int): + Time in microseconds of maximum backfill scheduling cycle execution + since last reset. + backfill_cycle_mean (int): + Mean time in microseconds of backfilling scheduling cycles since + last reset. + backfill_cycle_sum (int): + Total time in microseconds of backfilling scheduling cycles since + last reset. + backfill_last_depth (int): + Number of processed jobs during last backfilling scheduling cycle. + It counts every job even if that job can not be started due to + dependencies or limits. + backfill_depth_sum (int): + Total number of jobs processed during all backfilling scheduling + cycles since last reset. + backfill_last_depth_try (int): + Number of processed jobs during last backfilling scheduling cycle. + It counts only jobs with a chance to start using available + resources. + backfill_depth_try_sum (int): + Subset of `backfill_depth_sum` that the backfill scheduler + attempted to schedule. + backfill_mean_depth (int): + Mean count of jobs processed during all backfilling scheduling + cycles since last reset. Jobs which are found to be ineligible to + run when examined by the backfill scheduler are not counted. + backfill_mean_depth_try (int): + The subset of `backfill_mean_depth` that the backfill + scheduler attempted to schedule. + backfill_queue_length (int): + Number of jobs pending to be processed by backfilling algorithm. A + job is counted once for each partition it is queued to use. + backfill_queue_length_sum (int): + Total number of jobs pending to be processed by backfilling + algorithm since last reset. + backfill_queue_length_mean (int): + Mean count of jobs pending to be processed by backfilling + algorithm. + backfill_table_size (int): + Count of different time slots tested by the backfill scheduler in + its last iteration. + backfill_table_size_sum (int): + Total number of different time slots tested by the backfill + scheduler. + backfill_table_size_mean (int): + Mean count of different time slots tested by the backfill + scheduler. Larger counts increase the time required for the + backfill operation. + gettimeofday_latency (int): + Latency of 1000 calls to the gettimeofday() syscall in + microseconds, as measured at controller startup. + rpcs_by_type (pyslurm.slurmctld.RPCTypeStatistics): + RPC Statistics organized by Type. + rpcs_by_user (pyslurm.slurmctld.RPCUserStatistics): + RPC Statistics organized by User. + rpcs_pending (pyslurm.slurmctld.RPCPendingStatistics): + Statistics for pending RPCs. + """ + cdef public: + request_time + data_since + server_thread_count + rpc_queue_enabled + agent_queue_size + agent_count + agent_thread_count + dbd_agent_queue_size + + jobs_submitted + jobs_started + jobs_completed + jobs_canceled + jobs_failed + jobs_pending + jobs_running + + schedule_cycle_last + schedule_cycle_max + schedule_cycle_counter + schedule_cycle_mean + schedule_cycle_mean_depth + schedule_cycle_sum + schedule_cycles_per_minute + schedule_queue_length + schedule_exit + + backfill_active + backfilled_jobs + last_backfilled_jobs + backfilled_het_jobs + backfill_cycle_counter + backfill_cycle_last_when + backfill_cycle_last + backfill_cycle_max + backfill_cycle_mean + backfill_cycle_sum + backfill_last_depth + backfill_depth_sum + backfill_last_depth_try + backfill_depth_try_sum + backfill_mean_depth + backfill_mean_depth_try + backfill_queue_length + backfill_queue_length_sum + backfill_queue_length_mean + backfill_table_size + backfill_table_size_sum + backfill_table_size_mean + backfill_exit + + gettimeofday_latency + + rpcs_by_type + rpcs_by_user + rpcs_pending diff --git a/pyslurm/core/slurmctld/stats.pyx b/pyslurm/core/slurmctld/stats.pyx new file mode 100644 index 00000000..3990591b --- /dev/null +++ b/pyslurm/core/slurmctld/stats.pyx @@ -0,0 +1,494 @@ +######################################################################### +# slurmctld/stats.pyx - pyslurm slurmctld statistics api (sdiag) +######################################################################### +# Copyright (C) 2025 Toni Harzendorf + +######################################################################### +# The implementation here is inspired by: +# - https://github.com/SchedMD/slurm/blob/c28fcf4f15981f891df7893099bceda21e2c5e6e/src/sdiag/sdiag.c +# +# So for completeness, the appropriate Copyright notices are also written +# below: +# +# Copyright (C) 2010-2011 Barcelona Supercomputing Center. +# Copyright (C) 2010-2022 SchedMD LLC. +# +# Please also check the Slurm DISCLAIMER at: pyslurm/slurm/SLURM_DISCLAIMER +######################################################################### +# +# This file is part of PySlurm +# +# PySlurm is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +# PySlurm is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with PySlurm; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +# +# cython: c_string_type=unicode, c_string_encoding=default +# cython: language_level=3 + +from pyslurm.core.error import verify_rpc, RPCError +from pyslurm.utils.ctime import _raw_time +from pyslurm.utils.helpers import ( + instance_to_dict, + uid_to_name, +) +from pyslurm.utils import cstr +from pyslurm import xcollections + + +# Make sure this is in sync with the current Slurm release we are targeting. +# Check in Slurm source at src/slurmctld/slurmctld.h +BF_EXIT_COUNT = 6 +SCHED_EXIT_COUNT = 6 + + +cdef class ScheduleExitStatistics: + + def __init__(self): + self.end_of_job_queue = 0 + self.default_queue_depth = 0 + self.max_job_start = 0 + self.blocked_on_licenses = 0 + self.max_rpc_count = 0 + self.max_time = 0 + + @staticmethod + cdef ScheduleExitStatistics from_ptr(stats_info_response_msg_t *ptr): + if ptr.schedule_exit_cnt != SCHED_EXIT_COUNT: + raise RPCError(msg="schedule_exit_cnt has an unexpected size. " + f"Got {ptr.schedule_exit_cnt}, expected {SCHED_EXIT_COUNT}.") + + out = ScheduleExitStatistics() + out.end_of_job_queue = ptr.schedule_exit[0] + out.default_queue_depth = ptr.schedule_exit[1] + out.max_job_start = ptr.schedule_exit[2] + out.blocked_on_licenses = ptr.schedule_exit[3] + out.max_rpc_count = ptr.schedule_exit[4] + out.max_time = ptr.schedule_exit[5] + return out + + def to_dict(self): + return instance_to_dict(self) + + +cdef class BackfillExitStatistics: + + def __init__(self): + self.end_of_job_queue = 0 + self.max_job_start = 0 + self.max_job_test = 0 + self.max_time = 0 + self.node_space_size = 0 + self.state_changed = 0 + + @staticmethod + cdef BackfillExitStatistics from_ptr(stats_info_response_msg_t *ptr): + if ptr.bf_exit_cnt != BF_EXIT_COUNT: + raise RPCError(msg="bf_exit_cnt has an unexpected size. " + f"Got {ptr.bf_exit_cnt}, expected {BF_EXIT_COUNT}.") + + out = BackfillExitStatistics() + out.end_of_job_queue = ptr.bf_exit[0] + out.max_job_start = ptr.bf_exit[1] + out.max_job_test = ptr.bf_exit[2] + out.max_time = ptr.bf_exit[3] + out.node_space_size = ptr.bf_exit[4] + out.state_changed = ptr.bf_exit[5] + return out + + def to_dict(self): + return instance_to_dict(self) + + +cdef class RPCPending: + + def __init__(self): + self.id = 0 + self.name = None + self.count = 0 + + def to_dict(self): + return instance_to_dict(self) + + +cdef class RPCType: + + def __init__(self): + self.id = 0 + self.name = None + self.count = 0 + self.time = 0 + self.average_time = 0 + self.queued = 0 + self.dropped = 0 + self.cycle_last = 0 + self.cycle_max = 0 + + def to_dict(self): + return instance_to_dict(self) + + +cdef class RPCUser: + + def __init__(self): + self.user_id = 0 + self.user_name = None + self.count = 0 + self.time = 0 + self.average_time = 0 + + def to_dict(self): + return instance_to_dict(self) + + +cdef class RPCTypeStatistics(dict): + + def __init__(self): + super().__init__() + + @staticmethod + cdef RPCTypeStatistics from_ptr(stats_info_response_msg_t *ptr, + rpc_queue_enabled): + out = RPCTypeStatistics() + + for i in range(ptr.rpc_type_size): + stats = RPCType() + stats.id = ptr.rpc_type_id[i] + stats.name = rpc_num2string(ptr.rpc_type_id[i]) + stats.count = ptr.rpc_type_cnt[i] + stats.time = ptr.rpc_type_time[i] + + if ptr.rpc_type_cnt[i]: + stats.average_time = int(ptr.rpc_type_time[i] / ptr.rpc_type_cnt[i]) + + if rpc_queue_enabled: + stats.queued = ptr.rpc_type_queued[i] + stats.dropped = ptr.rpc_type_dropped[i] + stats.cycle_last = ptr.rpc_type_cycle_last[i] + stats.cycle_max = ptr.rpc_type_cycle_max[i] + + out[stats.name] = stats + + return out + + @property + def count(self): + return xcollections.sum_property(self, RPCType.count) + + @property + def time(self): + return xcollections.sum_property(self, RPCType.time) + + @property + def queued(self): + return xcollections.sum_property(self, RPCType.queued) + + @property + def dropped(self): + return xcollections.sum_property(self, RPCType.dropped) + + +cdef class RPCUserStatistics(dict): + + def __init__(self): + super().__init__() + + @staticmethod + cdef RPCUserStatistics from_ptr(stats_info_response_msg_t *ptr): + out = RPCUserStatistics() + + for i in range(ptr.rpc_user_size): + user_id = ptr.rpc_user_id[i] + user = uid_to_name(user_id, err_on_invalid=False) + stats = RPCUser() + stats.user_id = ptr.rpc_user_id[i] + stats.user_name = user + stats.count = ptr.rpc_user_cnt[i] + stats.time = ptr.rpc_user_time[i] + + if ptr.rpc_user_cnt[i]: + stats.average_time = int(ptr.rpc_user_time[i] / ptr.rpc_user_cnt[i]) + + key = user if user is not None else str(user_id) + out[key] = stats + + return out + + @property + def count(self): + return xcollections.sum_property(self, RPCUser.count) + + @property + def time(self): + return xcollections.sum_property(self, RPCUser.time) + + +cdef class RPCPendingStatistics(dict): + + def __init__(self): + super().__init__() + + @staticmethod + cdef RPCPendingStatistics from_ptr(stats_info_response_msg_t *ptr): + out = RPCPendingStatistics() + + for i in range(ptr.rpc_queue_type_count): + stats = RPCPending() + stats.id = ptr.rpc_queue_type_id[i] + stats.name = rpc_num2string(ptr.rpc_queue_type_id[i]) + stats.count = ptr.rpc_queue_count[i] + out[stats.name] = stats + + return out + + @property + def count(self): + return xcollections.sum_property(self, RPCPendingStatistics.count) + + +cdef class Statistics: + + def __init__(self): + self.schedule_cycle_mean = 0 + self.schedule_cycle_mean_depth = 0 + self.schedule_cycles_per_minute = 0 + self.backfill_cycle_mean = 0 + self.backfill_cycle_sum = 0 + self.backfill_mean_depth = 0 + self.backfill_mean_depth_try = 0 + self.backfill_queue_length_mean = 0 + self.backfill_table_size_mean = 0 + self.backfill_queue_length_sum = 0 + self.backfill_table_size_sum = 0 + + @staticmethod + def load(): + """Load the Statistics of the `slurmctld`. + + Returns: + (pyslurm.slurmctld.Statistics): The Controller statistics. + + Raises: + (pyslurm.RPCError): When fetching the Statistics failed. + + Examples: + >>> from pyslurm import slurmctld + >>> stats = slurmctld.Statistics.load() + >>> print(stats.jobs_completed, stats.schedule_cycle_counter) + 10 20 + """ + cdef: + stats_info_request_msg_t req + stats_info_response_msg_t *resp = NULL + Statistics out = None + + req.command_id = slurm.STAT_COMMAND_GET + verify_rpc(slurm_get_statistics(&resp, &req)) + + try: + out = parse_response(resp) + except Exception as e: + raise e + finally: + slurm_free_stats_response_msg(resp) + + return out + + @staticmethod + def reset(): + """Reset the Statistics of the `slurmctld`. + + Raises: + (pyslurm.RPCError): When resetting the Statistics failed. + + Examples: + >>> from pyslurm import slurmctld + >>> slurmctld.Statistics.reset() + """ + cdef stats_info_request_msg_t req + req.command_id = slurm.STAT_COMMAND_RESET + verify_rpc(slurm_reset_statistics(&req)) + + def to_dict(self): + """Convert the statistics to a dictionary. + + Returns: + (dict): Statistics as a dict. + + Examples: + >>> from pyslurm import slurmctld + >>> stats = slurmctld.Statistics.load() + >>> stats_dict = stats.to_dict() + """ + out = instance_to_dict(self) + out["rpcs_by_type"] = xcollections.dict_recursive(self.rpcs_by_type) + out["rpcs_by_user"] = xcollections.dict_recursive(self.rpcs_by_user) + out["rpcs_pending"] = xcollections.dict_recursive(self.rpcs_pending) + out["schedule_exit"] = self.schedule_exit.to_dict() + out["backfill_exit"] = self.backfill_exit.to_dict() + return out + + +def diag(): + """Load the Statistics of the `slurmctld`. + + This is a shortcut for [pyslurm.slurmctld.Statistics.load][] + + Returns: + (pyslurm.slurmctld.Statistics): The Controller statistics. + + Raises: + (pyslurm.RPCError): When fetching the Statistics failed. + + Examples: + >>> from pyslurm import slurmctld + >>> stats = slurmctld.Statistics.load() + >>> print(stats.jobs_completed, stats.schedule_cycle_counter) + 10 20 + """ + return Statistics.load() + + +cdef parse_response(stats_info_response_msg_t *ptr): + cdef Statistics out = Statistics() + + cycle_count = ptr.schedule_cycle_counter + bf_cycle_count = ptr.bf_cycle_counter + + out.request_time = ptr.req_time + out.data_since = ptr.req_time_start + out.server_thread_count = ptr.server_thread_count + out.rpc_queue_enabled = True if ptr.rpc_queue_enabled else False + out.agent_queue_size = ptr.agent_queue_size + out.agent_count = ptr.agent_count + out.agent_thread_count = ptr.agent_thread_count + out.dbd_agent_queue_size = ptr.dbd_agent_queue_size + out.jobs_submitted = ptr.jobs_submitted + out.jobs_started = ptr.jobs_started + out.jobs_completed = ptr.jobs_completed + out.jobs_canceled = ptr.jobs_canceled + out.jobs_failed = ptr.jobs_failed + out.jobs_pending = ptr.jobs_pending + out.jobs_running = ptr.jobs_running + out.schedule_cycle_last = int(ptr.schedule_cycle_last) + out.schedule_cycle_max = int(ptr.schedule_cycle_max) + out.schedule_cycle_counter = int(cycle_count) + out.schedule_queue_length = int(ptr.schedule_queue_len) + out.schedule_cycle_sum = int(ptr.schedule_cycle_sum) + + if cycle_count > 0: + out.schedule_cycle_mean = int(ptr.schedule_cycle_sum / cycle_count) + out.schedule_cycle_mean_depth = int(ptr.schedule_cycle_depth / cycle_count) + + ts = ptr.req_time - ptr.req_time_start + if ts > 60: + out.schedule_cycles_per_minute = int(cycle_count / (ts / 60)) + + out.backfill_active = bool(ptr.bf_active) + out.backfilled_jobs = ptr.bf_backfilled_jobs + out.last_backfilled_jobs = ptr.bf_last_backfilled_jobs + out.backfilled_het_jobs = ptr.bf_backfilled_het_jobs + out.backfill_cycle_last_when = ptr.bf_when_last_cycle + out.backfill_cycle_last = ptr.bf_cycle_last + out.backfill_cycle_max = ptr.bf_cycle_max + out.backfill_cycle_counter = bf_cycle_count + out.backfill_cycle_sum = ptr.bf_cycle_sum + out.backfill_last_depth = ptr.bf_last_depth + out.backfill_last_depth_try = ptr.bf_last_depth_try + out.backfill_queue_length = ptr.bf_queue_len + out.backfill_queue_length_sum = ptr.bf_queue_len_sum + out.backfill_table_size = ptr.bf_table_size + out.backfill_table_size_sum = ptr.bf_table_size_sum + out.backfill_depth_sum = ptr.bf_depth_sum + out.backfill_depth_try_sum = ptr.bf_depth_try_sum + + if bf_cycle_count > 0: + out.backfill_cycle_mean = int(ptr.bf_cycle_sum / bf_cycle_count) + out.backfill_mean_depth = int(ptr.bf_depth_sum / bf_cycle_count) + out.backfill_mean_depth_try = int(ptr.bf_depth_try_sum / bf_cycle_count) + out.backfill_queue_length_mean = int(ptr.bf_queue_len_sum / bf_cycle_count) + out.backfill_table_size_mean = int(ptr.bf_table_size_sum / bf_cycle_count) + + out.gettimeofday_latency = ptr.gettimeofday_latency + + out.rpcs_by_type = RPCTypeStatistics.from_ptr(ptr, out.rpc_queue_enabled) + out.rpcs_by_user = RPCUserStatistics.from_ptr(ptr) + out.rpcs_pending = RPCPendingStatistics.from_ptr(ptr) + out.schedule_exit = ScheduleExitStatistics.from_ptr(ptr) + out.backfill_exit = BackfillExitStatistics.from_ptr(ptr) + + return out + + +# Prepare some test data +def _parse_test_data(): + import datetime + + cdef stats_info_response_msg_t stats + memset(&stats, 0, sizeof(stats)) + + stats.req_time = int(datetime.datetime.now().timestamp()) + stats.req_time_start = int(datetime.datetime.now().timestamp()) - 200 + stats.jobs_submitted = 20 + stats.jobs_running = 3 + stats.schedule_cycle_counter = 10 + stats.schedule_cycle_last = 40 + stats.schedule_cycle_sum = 45 + + stats.bf_cycle_counter = 100 + stats.bf_active = 0 + stats.bf_backfilled_jobs = 10 + stats.bf_cycle_sum = 200 + stats.bf_depth_try_sum = 300 + stats.bf_queue_len_sum = 600 + stats.bf_table_size_sum = 200 + + stats.rpc_type_size = 3 + stats.rpc_type_id = xmalloc(sizeof(uint16_t) * stats.rpc_type_size) + stats.rpc_type_cnt = xmalloc(sizeof(uint32_t) * stats.rpc_type_size) + stats.rpc_type_time = xmalloc(sizeof(uint64_t) * stats.rpc_type_size) + + for i in range(stats.rpc_type_size): + stats.rpc_type_id[i] = 2000+i + stats.rpc_type_cnt[i] = i+1 + stats.rpc_type_time[i] = i+2 + + stats.rpc_user_size = 1 + stats.rpc_user_id = xmalloc(sizeof(uint32_t) * stats.rpc_user_size) + stats.rpc_user_cnt = xmalloc(sizeof(uint32_t) * stats.rpc_user_size) + stats.rpc_user_time = xmalloc(sizeof(uint64_t) * stats.rpc_user_size) + + for i in range(stats.rpc_user_size): + stats.rpc_user_id[i] = i + stats.rpc_user_cnt[i] = i+1 + stats.rpc_user_time[i] = i+2 + + stats.bf_exit_cnt = BF_EXIT_COUNT + stats.bf_exit = xmalloc(sizeof(uint32_t) * BF_EXIT_COUNT) + for i in range(stats.bf_exit_cnt): + stats.bf_exit[i] = i+1 + + stats.schedule_exit_cnt = SCHED_EXIT_COUNT + stats.schedule_exit = xmalloc(sizeof(uint32_t) * SCHED_EXIT_COUNT) + + for i in range(stats.schedule_exit_cnt): + stats.schedule_exit[i] = i+1 + + stats.rpc_queue_type_count = 5 + stats.rpc_queue_count = xmalloc(sizeof(uint32_t) * stats.rpc_queue_type_count) + stats.rpc_queue_type_id = xmalloc(sizeof(uint32_t) * stats.rpc_queue_type_count) + + for i in range(stats.rpc_queue_type_count): + stats.rpc_queue_count[i] = i+1 + stats.rpc_queue_type_id[i] = 2000+i + + return parse_response(&stats) diff --git a/scripts/slurm_msg_type_dict.py b/scripts/slurm_msg_type_dict.py deleted file mode 100755 index dbd2d0dc..00000000 --- a/scripts/slurm_msg_type_dict.py +++ /dev/null @@ -1,46 +0,0 @@ -#! /usr/bin/env python3 -""" -Parse $slurmrepo/src/common/slurm_protocol_defs.h and create -a small C program that generates a mapping of the numeric -slurm msg types to their symbolic names. - -Example: - ./slurm_msg_type_dict.py $slurmrepo/src/common/slurm_protocol_defs.h > msgdict.c - gcc -o msgdict msgdict.c - ./msgdict -""" - -import re -import sys -import argparse - -def generate_c(header_file_name): - typedef_re = re.compile(r"\s*typedef\s+enum\s*{(.*?)}\s*slurm_msg_type_t\s*;", re.DOTALL) - symbol_re = re.compile(r"^\s*([A-Z0-9_]+)\s*[,=\n]") - - with open(header_file_name, mode="r", encoding="utf-8") as header_file: - header = header_file.read() - typedef = typedef_re.search(header) - if typedef is None: - print("could not identify the slurm_msg_type_t typedef in the header file") - sys.exit(1) - - print("""#include """) - print(typedef.group(0)) - print("""\n\nint main(void) {""") - for line in typedef.group(1).split("\n"): - symbol = symbol_re.match(line) - if symbol is not None: - print(f""" printf("%d: \\\"%s\\\",\\n", {symbol.group(1)}, "{symbol.group(1)}");""") - else: - print(f""" printf("\\n");""") - print(""" return 0;\n}""") - -def main(): - parser = argparse.ArgumentParser(description=__doc__, formatter_class=argparse.RawTextHelpFormatter) - parser.add_argument("header", help="$slurmrepo/src/common/slurm_protocol_defs.h") - args = parser.parse_args() - generate_c(args.header) - -if __name__ == "__main__": - main() diff --git a/tests/integration/test_slurmctld.py b/tests/integration/test_slurmctld.py index 4d1f1004..5ad1551c 100644 --- a/tests/integration/test_slurmctld.py +++ b/tests/integration/test_slurmctld.py @@ -107,3 +107,16 @@ def test_fair_share_dampening_factor(): with pytest.raises(pyslurm.RPCError, match=r"Invalid Dampening*"): slurmctld.set_fair_share_dampening_factor(99999999) + + +def test_statistics(): + stats = slurmctld.diag() + assert stats.to_dict() + assert len(stats.rpcs_by_type) > 0 + data_since = stats.data_since + + slurmctld.Statistics.reset() + new_stats = slurmctld.Statistics.load() + assert new_stats.to_dict() + # Check that resetting it was actually successful. + assert data_since < new_stats.data_since diff --git a/tests/unit/test_slurmctld.py b/tests/unit/test_slurmctld.py new file mode 100644 index 00000000..71d36d34 --- /dev/null +++ b/tests/unit/test_slurmctld.py @@ -0,0 +1,69 @@ +######################################################################### +# test_slurmctld.py - slurmctld unit tests +######################################################################### +# Copyright (C) 2025 Toni Harzendorf +# +# This file is part of PySlurm +# +# PySlurm is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. + +# PySlurm is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License along +# with PySlurm; if not, write to the Free Software Foundation, Inc., +# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. +"""test_slurmctld.py - Unit test basic slurmctld functionalities.""" + +import pyslurm +from pyslurm import slurmctld +from pyslurm.core.slurmctld.stats import _parse_test_data + + +def test_statistics(): + stats = _parse_test_data() + assert stats.to_dict() + + assert len(stats.rpcs_by_type) == 3 + for typ, val in stats.rpcs_by_type.items(): + assert val.count > 0 + assert val.time > 0 + assert val.average_time > 0 + assert typ is not None + + assert len(stats.rpcs_pending) == 5 + for typ, val in stats.rpcs_pending.items(): + assert val.count > 0 + assert typ is not None + assert isinstance(typ, str) + + assert len(stats.rpcs_by_user) == 1 + for typ, val in stats.rpcs_by_user.items(): + assert val.user_id == 0 + assert val.user_name == "root" + assert val.time > 0 + assert val.average_time > 0 + assert typ is not None + + assert stats.schedule_exit + assert stats.backfill_exit + assert stats.jobs_submitted == 20 + assert stats.jobs_running == 3 + assert stats.schedule_cycle_last == 40 + assert stats.schedule_cycle_sum == 45 + assert stats.schedule_cycle_mean == 4 + assert stats.schedule_cycle_counter == 10 + + assert stats.backfill_cycle_counter == 100 + assert stats.backfill_active is False + assert stats.backfilled_jobs == 10 + assert stats.backfill_cycle_sum == 200 + assert stats.backfill_depth_try_sum == 300 + assert stats.backfill_queue_length_sum == 600 + assert stats.backfill_table_size_sum == 200 + assert stats.backfill_cycle_mean == 2