Skip to content

Commit 3cef6b7

Browse files
cmeestersdlaehnemanncoderabbitai[bot]
authored
feat: measuring compute efficiency per job (#221)
The aim of this PR is: - measure compute efficiency similar to `seff` - report compute efficiency and memory footprint per job of the workflow (in a logfile) and - issue a warning, if it falls below a configurable threshold Note: `seff` reports so-called "Memory Efficiency". What is meant is: "Memory Usage", because an application which reserve a compute node to compute and hardly uses RAM to do so, will have an apparently low "Memory Efficiency". It needs to reserve the memory of that node, might be highly efficient, but will not have used memory. The resulting code of this PR will hence NOT report warnings about memory usage. This PR is related to issue #147 <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit - **New Features** - Introduced an option to generate an efficiency report for SLURM jobs, summarizing resource usage for all workflow jobs. - Added settings to enable efficiency reporting, specify the report file location, and set a CPU efficiency threshold for warnings. - Efficiency reports are automatically generated upon executor shutdown when enabled. - **Documentation** - Added instructions on generating and interpreting job efficiency reports with the new reporting option. - **Bug Fixes** - Improved error messages and logging for log file cleanup and SLURM job data handling. - **Tests** - Added automated tests to verify the creation and content of the efficiency report. - **Chores** - Added pandas and numpy as new dependencies and upgraded Snakemake and pandas versions in development dependencies. <!-- end of auto-generated comment: release notes by coderabbit.ai --> --------- Co-authored-by: David Laehnemann <david.laehnemann@hhu.de> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent fc4b71e commit 3cef6b7

File tree

6 files changed

+309
-13
lines changed

6 files changed

+309
-13
lines changed

.github/workflows/announce-release.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ jobs:
1616
runs-on: ubuntu-latest
1717
steps:
1818
- name: Post to Mastodon
19-
uses: snakemake/mastodon-release-post-action@v1.4.1
19+
uses: snakemake/mastodon-release-post-action@v1.5.0
2020
with:
2121
access-token: ${{ secrets.MASTODONBOT }}
2222
pr-title: ${{ github.event.head_commit.message }}

docs/further.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -618,6 +618,10 @@ This configuration directs SLURM logs to a centralized location, making them eas
618618
Running Snakemake within an active SLURM job can lead to unpredictable behavior, as the execution environment may not be properly configured for job submission.
619619
To mitigate potential issues, the SLURM executor plugin detects when it's operating inside a SLURM job and issues a warning, pausing for 5 seconds before proceeding.
620620

621+
### Getting Job Efficiency Information
622+
623+
With `--slurm-efficiency-report` you can generate a table of all efficiency data. A logfile `efficiency_report_<workflow_id>.log` will be generated in your current directory. This is equivalent to the information with `seff <jobid>` for individual jobs. It works best if "comments" are stored as a job property on your cluster as this plugin uses the "comment" parameter to store the rule name.
624+
621625
### Submittings Jobs into SLURM reservations
622626

623627
The plugin allows specifying a flag `--slurm-reservation=<name>` to use a particular reservation.

pyproject.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,14 +18,17 @@ python = "^3.11"
1818
snakemake-interface-common = "^1.13.0"
1919
snakemake-interface-executor-plugins = "^9.1.1"
2020
snakemake-executor-plugin-slurm-jobstep = "^0.3.0"
21+
pandas = "^2.2.3"
22+
numpy = "^1.26.4"
2123
throttler = "^1.2.2"
2224

2325
[tool.poetry.group.dev.dependencies]
2426
black = "^23.7.0"
2527
flake8 = "^6.1.0"
2628
coverage = "^7.3.1"
2729
pytest = "^8.3.5"
28-
snakemake = "^9.4.0"
30+
snakemake = "^9.6.0"
31+
pandas = "^2.2.3"
2932

3033
[tool.coverage.run]
3134
omit = [".*", "*/site-packages/*", "Snakefile"]

snakemake_executor_plugin_slurm/__init__.py

Lines changed: 66 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
__email__ = "johannes.koester@uni-due.de"
44
__license__ = "MIT"
55

6-
import atexit
76
import csv
87
from io import StringIO
98
import os
@@ -16,6 +15,7 @@
1615
from datetime import datetime, timedelta
1716
from typing import List, Generator, Optional
1817
import uuid
18+
1919
from snakemake_interface_executor_plugins.executors.base import SubmittedJobInfo
2020
from snakemake_interface_executor_plugins.executors.remote import RemoteExecutor
2121
from snakemake_interface_executor_plugins.settings import (
@@ -27,7 +27,12 @@
2727
)
2828
from snakemake_interface_common.exceptions import WorkflowError
2929

30-
from .utils import delete_slurm_environment, delete_empty_dirs, set_gres_string
30+
from .utils import (
31+
delete_slurm_environment,
32+
delete_empty_dirs,
33+
set_gres_string,
34+
)
35+
from .efficiency_report import create_efficiency_report
3136
from .submit_string import get_submit_command
3237

3338

@@ -106,6 +111,35 @@ class ExecutorSettings(ExecutorSettingsBase):
106111
"required": False,
107112
},
108113
)
114+
efficiency_report: bool = field(
115+
default=False,
116+
metadata={
117+
"help": "Generate an efficiency report at the end of the workflow. "
118+
"This flag has no effect, if not set.",
119+
"env_var": False,
120+
"required": False,
121+
},
122+
)
123+
efficiency_report_path: Optional[Path] = field(
124+
default=None,
125+
metadata={
126+
"help": "Path to the efficiency report file. "
127+
"If not set, the report will be written to "
128+
"the current working directory with the name "
129+
"'efficiency_report_<run_uuid>.csv'. "
130+
"This flag has no effect, if not set.",
131+
"env_var": False,
132+
"required": False,
133+
},
134+
)
135+
efficiency_threshold: Optional[float] = field(
136+
default=0.8,
137+
metadata={
138+
"help": "The efficiency threshold for the efficiency report. "
139+
"Jobs with an efficiency below this threshold will be reported. "
140+
"This flag has no effect, if not set.",
141+
},
142+
)
109143
reservation: Optional[str] = field(
110144
default=None,
111145
metadata={
@@ -157,7 +191,26 @@ def __post_init__(self, test_mode: bool = False):
157191
if self.workflow.executor_settings.logdir
158192
else Path(".snakemake/slurm_logs").resolve()
159193
)
160-
atexit.register(self.clean_old_logs)
194+
195+
def shutdown(self) -> None:
196+
"""
197+
Shutdown the executor.
198+
This method is overloaded, to include the cleaning of old log files
199+
and to optionally create an efficiency report.
200+
"""
201+
# First, we invoke the original shutdown method
202+
super().shutdown()
203+
204+
# Next, clean up old log files, unconditionally.
205+
self.clean_old_logs()
206+
# If the efficiency report is enabled, create it.
207+
if self.workflow.executor_settings.efficiency_report:
208+
create_efficiency_report(
209+
e_threshold=self.workflow.executor_settings.efficiency_threshold,
210+
run_uuid=self.run_uuid,
211+
e_report_path=self.workflow.executor_settings.efficiency_report_path,
212+
logger=self.logger,
213+
)
161214

162215
def clean_old_logs(self) -> None:
163216
"""Delete files older than specified age from the SLURM log directory."""
@@ -168,20 +221,23 @@ def clean_old_logs(self) -> None:
168221
return
169222
cutoff_secs = age_cutoff * 86400
170223
current_time = time.time()
171-
self.logger.info(f"Cleaning up log files older than {age_cutoff} day(s)")
224+
self.logger.info(f"Cleaning up log files older than {age_cutoff} day(s).")
225+
172226
for path in self.slurm_logdir.rglob("*.log"):
173227
if path.is_file():
174228
try:
175229
file_age = current_time - path.stat().st_mtime
176230
if file_age > cutoff_secs:
177231
path.unlink()
178232
except (OSError, FileNotFoundError) as e:
179-
self.logger.warning(f"Could not delete logfile {path}: {e}")
233+
self.logger.error(f"Could not delete logfile {path}: {e}")
180234
# we need a 2nd iteration to remove putatively empty directories
181235
try:
182236
delete_empty_dirs(self.slurm_logdir)
183237
except (OSError, FileNotFoundError) as e:
184-
self.logger.warning(f"Could not delete empty directory {path}: {e}")
238+
self.logger.error(
239+
f"Could not delete empty directories in {self.slurm_logdir}: {e}"
240+
)
185241

186242
def warn_on_jobcontext(self, done=None):
187243
if not done:
@@ -741,10 +797,10 @@ def check_slurm_extra(self, job):
741797
jobname = re.compile(r"--job-name[=?|\s+]|-J\s?")
742798
if re.search(jobname, job.resources.slurm_extra):
743799
raise WorkflowError(
744-
"The --job-name option is not allowed in the 'slurm_extra' "
745-
"parameter. The job name is set by snakemake and must not be "
746-
"overwritten. It is internally used to check the stati of the "
747-
"all submitted jobs by this workflow."
800+
"The --job-name option is not allowed in the 'slurm_extra' parameter. "
801+
"The job name is set by snakemake and must not be overwritten. "
802+
"It is internally used to check the stati of the all submitted jobs "
803+
"by this workflow."
748804
"Please consult the documentation if you are unsure how to "
749805
"query the status of your jobs."
750806
)
Lines changed: 185 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,185 @@
1+
import re
2+
import pandas as pd
3+
from pathlib import Path
4+
import subprocess
5+
import shlex
6+
7+
import os # only temporarily needed for printf debugging
8+
import numpy as np
9+
10+
11+
def time_to_seconds(time_str):
12+
"""Convert SLURM time format to seconds."""
13+
if pd.isna(time_str) or time_str.strip() == "":
14+
return 0
15+
parts = time_str.split(":")
16+
17+
if len(parts) == 3: # H:M:S
18+
return int(parts[0]) * 3600 + int(parts[1]) * 60 + float(parts[2])
19+
elif len(parts) == 2: # M:S
20+
return int(parts[0]) * 60 + float(parts[1])
21+
elif len(parts) == 1: # S
22+
return float(parts[0])
23+
return 0
24+
25+
26+
def parse_maxrss(maxrss):
27+
"""Convert MaxRSS to MB."""
28+
if pd.isna(maxrss) or maxrss.strip() == "" or maxrss == "0":
29+
return 0
30+
match = re.match(r"(\d+(?:\.\d+)?)([KMG]?)", maxrss)
31+
if match:
32+
value, unit = match.groups()
33+
value = float(value)
34+
unit_multipliers = {"K": 1 / 1024, "M": 1, "G": 1024}
35+
return value * unit_multipliers.get(unit, 1)
36+
return 0
37+
38+
39+
def parse_reqmem(reqmem, number_of_nodes=1):
40+
"""Convert requested memory to MB."""
41+
if pd.isna(reqmem) or reqmem.strip() == "":
42+
return 0
43+
# 4Gc (per-CPU) / 16Gn (per-node) / 2.5G
44+
match = re.match(r"(\d+(?:\.\d+)?)([KMG])?([cn]|/node)?", reqmem)
45+
if match:
46+
value, unit, per_unit = match.groups()
47+
value = float(value)
48+
unit_multipliers = {"K": 1 / 1024, "M": 1, "G": 1024}
49+
mem_mb = value * unit_multipliers.get(unit, 1)
50+
if per_unit in ("n", "/node"): # per-node
51+
nodes = 1 if pd.isna(number_of_nodes) else number_of_nodes
52+
return mem_mb * nodes
53+
# `/c` or `c` → per-CPU; caller may multiply later
54+
return mem_mb # Default case (per CPU or total)
55+
return 0
56+
57+
58+
def create_efficiency_report(e_threshold, run_uuid, e_report_path, logger):
59+
"""
60+
Fetch sacct job data for a Snakemake workflow
61+
and compute efficiency metrics.
62+
"""
63+
cmd = f"sacct --name={run_uuid} --parsable2 --noheader"
64+
cmd += (
65+
" --format=JobID,JobName,Comment,Elapsed,TotalCPU," "NNodes,NCPUS,MaxRSS,ReqMem"
66+
)
67+
68+
try:
69+
result = subprocess.run(
70+
shlex.split(cmd), capture_output=True, text=True, check=True
71+
)
72+
raw = result.stdout.strip()
73+
if not raw:
74+
logger.warning(f"No job data found for workflow {run_uuid}.")
75+
return None
76+
lines = raw.split("\n")
77+
78+
except subprocess.CalledProcessError:
79+
logger.error(f"Failed to retrieve job data for workflow {run_uuid}.")
80+
return None
81+
82+
# Convert to DataFrame
83+
df = pd.DataFrame(
84+
(line.split("|") for line in lines),
85+
columns=[
86+
"JobID",
87+
"JobName",
88+
"Comment",
89+
"Elapsed",
90+
"TotalCPU",
91+
"NNodes",
92+
"NCPUS",
93+
"MaxRSS",
94+
"ReqMem",
95+
],
96+
)
97+
98+
# If the "Comment" column is empty,
99+
# a) delete the column
100+
# b) issue a warning
101+
if df["Comment"].replace("", pd.NA).isna().all():
102+
logger.warning(
103+
f"No comments found for workflow {run_uuid}. "
104+
"This field is used to store the rule name. "
105+
"Please ensure that the 'comment' field is set for your cluster. "
106+
"Administrators can set this up in the SLURM configuration."
107+
)
108+
df.drop(columns=["Comment"], inplace=True)
109+
# remember, that the comment column is not available
110+
nocomment = True
111+
# else: rename the column to 'RuleName'
112+
else:
113+
df.rename(columns={"Comment": "RuleName"}, inplace=True)
114+
nocomment = False
115+
# Convert types
116+
df["NNodes"] = pd.to_numeric(df["NNodes"], errors="coerce")
117+
df["NCPUS"] = pd.to_numeric(df["NCPUS"], errors="coerce")
118+
119+
# Convert time fields
120+
df["Elapsed_sec"] = df["Elapsed"].apply(time_to_seconds)
121+
df["TotalCPU_sec"] = df["TotalCPU"].apply(time_to_seconds)
122+
123+
# Compute CPU efficiency
124+
df["CPU Efficiency (%)"] = (
125+
df["TotalCPU_sec"]
126+
/ (df["Elapsed_sec"].clip(lower=1) * df["NCPUS"].clip(lower=1))
127+
) * 100
128+
df.replace([np.inf, -np.inf], 0, inplace=True)
129+
130+
# Convert MaxRSS
131+
df["MaxRSS_MB"] = df["MaxRSS"].apply(parse_maxrss)
132+
133+
# Convert ReqMem and calculate memory efficiency
134+
df["RequestedMem_MB"] = df.apply(
135+
lambda row: parse_reqmem(row["ReqMem"], row["NNodes"]), axis=1
136+
)
137+
df["Memory Usage (%)"] = df.apply(
138+
lambda row: (
139+
(row["MaxRSS_MB"] / row["RequestedMem_MB"] * 100)
140+
if row["RequestedMem_MB"] > 0
141+
else 0
142+
),
143+
axis=1,
144+
)
145+
146+
df["Memory Usage (%)"] = df["Memory Usage (%)"].fillna(0).round(2)
147+
148+
# Drop all rows containing "batch" or "extern" as job names
149+
df = df[~df["JobName"].str.contains("batch|extern", na=False)]
150+
151+
# Log warnings for low efficiency
152+
for _, row in df.iterrows():
153+
if row["CPU Efficiency (%)"] < e_threshold:
154+
if nocomment:
155+
logger.warning(
156+
f"Job {row['JobID']} ({row['JobName']}) "
157+
f"has low CPU efficiency: {row['CPU Efficiency (%)']}%."
158+
)
159+
else:
160+
# if the comment column is available, we can use it to
161+
# identify the rule name
162+
logger.warning(
163+
f"Job {row['JobID']} for rule '{row['RuleName']}' "
164+
f"({row['JobName']}) has low CPU efficiency: "
165+
f"{row['CPU Efficiency (%)']}%."
166+
)
167+
168+
# we construct a path object to allow for a customi
169+
# logdir, if specified
170+
p = Path()
171+
172+
# Save the report to a CSV file
173+
logfile = f"efficiency_report_{run_uuid}.csv"
174+
if e_report_path:
175+
logfile = Path(e_report_path) / logfile
176+
else:
177+
logfile = p.cwd() / logfile
178+
# ensure the directory exists
179+
logfile.parent.mkdir(parents=True, exist_ok=True)
180+
df.to_csv(logfile)
181+
182+
# write out the efficiency report at normal verbosity in any case
183+
logger.info(f"Efficiency report for workflow {run_uuid} saved to {logfile}.")
184+
# state directory contents for debugging purposes
185+
logger.debug(f"Current directory contents in '{p.cwd()}': {os.listdir(p.cwd())}")

0 commit comments

Comments
 (0)