Skip to content

Commit 24980db

Browse files
authored
Merge pull request #61 from converged-computing/cleanup-osu-parsing
add support for more specific osu parsing
2 parents 6368608 + bbbd5a5 commit 24980db

File tree

7 files changed

+295
-80
lines changed

7 files changed

+295
-80
lines changed

examples/python/network-osu-benchmark/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,3 +40,5 @@ $ python run-metric-multiple.py --iter 5
4040
![img/multiple/OSU-MPI_Put-Latency-Test-v5.8.png](img/multiple/OSU-MPI_Put-Latency-Test-v5.8.png)
4141

4242

43+
Note that if you add the option `timed: true` this will add extra 'wrapper' times around the above,
44+
and an extra data output for it in your results.

sdk/python/v1alpha1/CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ and **Merged pull requests**. Critical items to know are:
1414
The versions coincide with releases on pip. Only major versions will be released as tags on Github.
1515

1616
## [0.0.x](https://github.com/converged-computing/metrics-operator/tree/main) (0.0.x)
17+
- More specific parsing / control for OSU benchmarks (0.0.21)
18+
- Support for OSU benchmark parsing with timed wrappers (0.0.2)
1719
- Allow getting raw logs for any metric (without parser) (0.0.19)
1820
- Refactor of structure of Operator and addition of metrics (0.0.18)
1921
- Add wait for delete function to python parser (0.0.17)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
from .netmark import network_netmark
2+
from .osu_benchmark import network_osu_benchmark

sdk/python/v1alpha1/metricsoperator/metrics/network.py renamed to sdk/python/v1alpha1/metricsoperator/metrics/network/netmark.py

Lines changed: 1 addition & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -1,85 +1,7 @@
11
# Copyright 2023 Lawrence Livermore National Security, LLC
22
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
33

4-
5-
from .base import MetricBase
6-
7-
8-
class network_osu_benchmark(MetricBase):
9-
"""
10-
Parse the OSU benchmarks output into data!
11-
"""
12-
13-
container_name = "launcher"
14-
15-
@property
16-
def pod_prefix(self):
17-
return f"{self.name}-l-0"
18-
19-
def parse_row(self, row):
20-
"""
21-
Given a row of two values with spaces, parse.
22-
"""
23-
row = row.split(" ", 1)
24-
return [x.strip() for x in row]
25-
26-
def parse_log(self, lines):
27-
"""
28-
Given lines of output, parse and return json
29-
"""
30-
# Get the log metadata
31-
metadata = self.get_log_metadata(lines)
32-
33-
# Split lines by separator
34-
results = []
35-
sections = self.get_log_sections(lines)
36-
for section in sections:
37-
if not section.strip():
38-
continue
39-
section = section.split("\n")
40-
section = [x.strip() for x in section if x.strip()]
41-
42-
try:
43-
datum = self.parse_benchmark_section(section)
44-
except Exception:
45-
print(f"Issue parsing section {section}")
46-
continue
47-
results.append(datum)
48-
49-
return {"data": results, "metadata": metadata, "spec": self.spec}
50-
51-
def parse_benchmark_section(self, section):
52-
"""
53-
A wrapper for parsing in case there is an error we can catch!
54-
"""
55-
# Command is the first entry
56-
command = section.pop(0)
57-
58-
# Each section has some number of header lines (with #)
59-
header = []
60-
while section and section[0].startswith("#"):
61-
header.append(section.pop(0))
62-
63-
# Last row of the header are the column names
64-
columns = header.pop()
65-
columns = columns.replace("# ", "").strip()
66-
columns = self.parse_row(columns)
67-
68-
# The remainder is data, again always two points
69-
data = []
70-
for line in section:
71-
if not line:
72-
continue
73-
row = self.parse_row(line)
74-
row = [float(x) for x in row]
75-
data.append(row)
76-
77-
return {
78-
"matrix": data,
79-
"columns": columns,
80-
"header": header,
81-
"command": command,
82-
}
4+
from metricsoperator.metrics.base import MetricBase
835

846

857
class network_netmark(MetricBase):
Lines changed: 286 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,286 @@
1+
# Copyright 2023 Lawrence Livermore National Security, LLC
2+
# (c.f. AUTHORS, NOTICE.LLNS, COPYING)
3+
4+
import os
5+
import re
6+
7+
from metricsoperator.metrics.base import MetricBase
8+
9+
# prepare more consistent / formatted columns
10+
latency_size_header = ["Size", "Latency(us)"]
11+
average_latency_header = ["Size", "Avg Latency(us)"]
12+
bandwidth_size_header = ["Size", "Bandwidth (MB/s)"]
13+
column_lookup = {
14+
"osu_ibarrier": ["Overall(us)", "Compute(us)", "Pure Comm.(us)", "Overlap(%)"],
15+
"osu_barrier": ["Avg Latency(us)"],
16+
"osu_mbw_mr": ["Size", "MB/s", "Messages/s"],
17+
"osu_init": ["nprocs", "min", "max", "avg"],
18+
"osu_multi_lat": latency_size_header,
19+
"osu_get_acc_latency": latency_size_header,
20+
"osu_latency": latency_size_header,
21+
"osu_fop_latency": latency_size_header,
22+
"osu_put_latency": latency_size_header,
23+
"osu_get_latency": latency_size_header,
24+
"osu_acc_latency": latency_size_header,
25+
"osu_mt_latency": latency_size_header,
26+
"osu_get_latency": latency_size_header,
27+
"osu_cas_latency": latency_size_header,
28+
"osu_latency_mp": latency_size_header,
29+
"osu_latency_mt": latency_size_header,
30+
"osu_bibw": bandwidth_size_header,
31+
"osu_get_bw": bandwidth_size_header,
32+
"osu_put_bw": bandwidth_size_header,
33+
"osu_put_bibw": bandwidth_size_header,
34+
"osu_bw": bandwidth_size_header,
35+
"osu_allgather": average_latency_header,
36+
"osu_allreduce": average_latency_header,
37+
}
38+
39+
40+
def parse_commented_header(section):
41+
"""
42+
Parse a commented (#) header
43+
"""
44+
# Each section has some number of header lines (with #)
45+
header = []
46+
while section and section[0].startswith("#"):
47+
header.append(section.pop(0))
48+
return header
49+
50+
51+
def parse_row(row):
52+
"""
53+
Given a row of two values with spaces, parse.
54+
"""
55+
row = row.split(" ", 2)
56+
return [x.strip() for x in row if x]
57+
58+
59+
def parse_multi_value_row(row):
60+
"""
61+
Parse a row with multiple values.
62+
"""
63+
row = row.split(" ")
64+
return [x.strip() for x in row if x]
65+
66+
67+
def parse_hello_section(section):
68+
"""
69+
Parse the osu hello section
70+
71+
This only has a print line for output
72+
"""
73+
# Command is the first entry
74+
command = section.pop(0)
75+
header = parse_commented_header(section)
76+
77+
# The next row is just a print line
78+
message = section.pop(0)
79+
timed = parse_timed_section(section)
80+
result = {
81+
"matrix": [[message]],
82+
"columns": ["message"],
83+
"header": header,
84+
"command": command,
85+
}
86+
if timed:
87+
result["timed"] = timed
88+
return result
89+
90+
91+
def parse_init_section(section):
92+
"""
93+
Parse the osu init section
94+
95+
This section has one column, and all the values there!
96+
"""
97+
# Command is the first entry
98+
command = section.pop(0)
99+
header = parse_commented_header(section)
100+
101+
# The next row has all the data!
102+
row = section.pop(0)
103+
values = [x.strip() for x in row.split(",")]
104+
data = {}
105+
for entry in values:
106+
field, value = [x.strip() for x in entry.split(":")]
107+
# Do we have a unit?
108+
unit = ""
109+
if " " in value:
110+
value, unit = [x.strip() for x in value.split(" ")]
111+
if unit:
112+
field = f"{field}-{unit}"
113+
data[field] = float(value)
114+
115+
# If we have additional sections (possibly with times)
116+
timed = parse_timed_section(section)
117+
result = {
118+
"matrix": [list(data.values())],
119+
"columns": list(data.keys()),
120+
"header": header,
121+
"command": command,
122+
}
123+
if timed:
124+
result["timed"] = timed
125+
return result
126+
127+
128+
def parse_timed_section(section):
129+
"""
130+
If the remainder is wrapped in time, parse it.
131+
"""
132+
timed = {}
133+
for line in section:
134+
if line and re.search("^(real|user|sys\t)", line):
135+
time_type, ts = line.strip().split("\t")
136+
timed[time_type] = ts
137+
return timed
138+
139+
140+
def parse_barrier_section(section):
141+
"""
142+
Parse a barrier section.
143+
144+
This section is unique in that it has two columns
145+
but the header is not preceded with a #
146+
"""
147+
# Command is the first entry
148+
command = section.pop(0)
149+
header = parse_commented_header(section)
150+
151+
# The columns are the last row of the header
152+
section.pop(0)
153+
result = parse_value_matrix(section)
154+
result.update({"header": header, "command": command})
155+
return result
156+
157+
158+
def parse_multi_section(section):
159+
"""
160+
Parse a multi-value section.
161+
162+
This section has standard format, but >2 values in the matrix
163+
"""
164+
# Command is the first entry
165+
command = section.pop(0)
166+
header = parse_commented_header(section)
167+
168+
# The columns are the last row of the header
169+
header.pop()
170+
result = parse_value_matrix(section)
171+
result.update({"header": header, "command": command})
172+
return result
173+
174+
175+
def parse_value_matrix(section):
176+
"""
177+
Parse a matrix of times
178+
"""
179+
# The remainder is data, again always two points
180+
# If there are real / user / sys at the end, we ran with timed:true
181+
data = []
182+
timed = {}
183+
for line in section:
184+
if not line:
185+
continue
186+
187+
# We found a time
188+
if re.search("^(real|user|sys\t)", line):
189+
time_type, ts = line.strip().split("\t")
190+
timed[time_type] = ts
191+
continue
192+
193+
row = parse_multi_value_row(line)
194+
row = [float(x) for x in row]
195+
data.append(row)
196+
197+
result = {"matrix": data}
198+
if timed:
199+
result["timed"] = timed
200+
return result
201+
202+
203+
def run_parsing_function(section):
204+
"""
205+
Parsing functions for different sections
206+
"""
207+
# The command is the first line
208+
command = os.path.basename(section[0])
209+
result = None
210+
if command in ["osu_ibarrier"]:
211+
result = parse_barrier_section(section)
212+
elif command in [
213+
"osu_bw",
214+
"osu_bibw",
215+
"osu_barrier",
216+
"osu_get_bw",
217+
"osu_put_bw",
218+
"osu_put_bibw",
219+
"osu_mbw_mr",
220+
"osu_multi_lat",
221+
"osu_allgather",
222+
"osu_latency",
223+
"osu_cas_latency",
224+
"osu_put_latency",
225+
"osu_get_latency",
226+
"osu_latency_mp",
227+
"osu_latency_mt",
228+
"osu_fop_latency",
229+
"osu_acc_latency",
230+
"osu_get_acc_latency",
231+
"osu_allreduce",
232+
]:
233+
result = parse_multi_section(section)
234+
235+
# Special snowflakes
236+
elif command == "osu_init":
237+
result = parse_init_section(section)
238+
239+
# This only potentially has a time :)
240+
elif command == "osu_hello":
241+
result = parse_hello_section(section)
242+
243+
# Columns aren't predictible, so we ensure they are more consistent this way
244+
# Some parsers do their own columns
245+
if "columns" not in result:
246+
result["columns"] = column_lookup[command]
247+
248+
return result
249+
250+
251+
class network_osu_benchmark(MetricBase):
252+
"""
253+
Parse the OSU benchmarks output into data!
254+
255+
For pair to pair we had a common format (two values with size and another field)
256+
but adding on the multi- benchmarks, we have a new challenge that there is slight
257+
variance in format, so I needed to extend the class to be more specific to parsing.
258+
"""
259+
260+
container_name = "launcher"
261+
262+
@property
263+
def pod_prefix(self):
264+
return f"{self.name}-l-0"
265+
266+
def parse_log(self, lines):
267+
"""
268+
Given lines of output, parse and return json
269+
"""
270+
# Get the log metadata
271+
metadata = self.get_log_metadata(lines)
272+
273+
# Split lines by separator
274+
results = []
275+
sections = self.get_log_sections(lines)
276+
for section in sections:
277+
if not section.strip():
278+
continue
279+
section = section.split("\n")
280+
section = [x.strip() for x in section if x.strip()]
281+
282+
# Parse the section. If this fails, we want to know
283+
datum = run_parsing_function(section)
284+
results.append(datum)
285+
286+
return {"data": results, "metadata": metadata, "spec": self.spec}

0 commit comments

Comments
 (0)