Skip to content

Commit d7a013f

Browse files
authored
Merge pull request #2202 from sthaha/feat-max-terminated-process
feat: implement energy-based terminated workload tracking
2 parents d9d383c + 299d43a commit d7a013f

28 files changed

+2067
-280
lines changed

config/config.go

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,10 @@ type (
4949
Interval time.Duration `yaml:"interval"` // Interval for monitoring resources
5050
Staleness time.Duration `yaml:"staleness"` // Time after which calculated values are considered stale
5151

52+
// MaxTerminated controls terminated workload tracking behavior:
53+
// <0: Any negative value indicates to track unlimited terminated workloads (no capacity limit)
54+
// =0: Disable terminated workload tracking completely
55+
// >0: Track top N terminated workloads by energy consumption
5256
MaxTerminated int `yaml:"maxTerminated"`
5357
}
5458

@@ -297,8 +301,8 @@ func RegisterFlags(app *kingpin.Application) ConfigUpdaterFn {
297301
// monitor
298302
monitorInterval := app.Flag(MonitorIntervalFlag,
299303
"Interval for monitoring resources (processes, container, vm, etc...); 0 to disable").Default("5s").Duration()
300-
maxTerminated := app.Flag(MonitorMaxTerminatedFlag,
301-
"Maximum number of terminated workloads to keep in memory until exported; 0 for unlimited").Default("500").Int()
304+
monitorMaxTerminated := app.Flag(MonitorMaxTerminatedFlag,
305+
"Maximum number of terminated workloads to track; 0 to disable, -1 for unlimited").Default("500").Int()
302306

303307
enablePprof := app.Flag(pprofEnabledFlag, "Enable pprof debug endpoints").Default("false").Bool()
304308
webConfig := app.Flag(WebConfigFlag, "Web config file path").Default("").String()
@@ -338,9 +342,8 @@ func RegisterFlags(app *kingpin.Application) ConfigUpdaterFn {
338342
if flagsSet[MonitorIntervalFlag] {
339343
cfg.Monitor.Interval = *monitorInterval
340344
}
341-
342345
if flagsSet[MonitorMaxTerminatedFlag] {
343-
cfg.Monitor.MaxTerminated = *maxTerminated
346+
cfg.Monitor.MaxTerminated = *monitorMaxTerminated
344347
}
345348

346349
if flagsSet[pprofEnabledFlag] {

internal/device/cpu_power_meter.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,4 +32,9 @@ type CPUPowerMeter interface {
3232

3333
// Zones() returns a slice of the energy measurement zones
3434
Zones() ([]EnergyZone, error)
35+
36+
// PrimaryEnergyZone() returns the zone with the highest energy coverage/priority
37+
// This zone represents the most comprehensive energy measurement available
38+
// E.g. Psys > Package > Core > DRAM > Uncore
39+
PrimaryEnergyZone() (EnergyZone, error)
3540
}

internal/device/fake_cpu_power_meter.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"log/slog"
99
"math/rand"
1010
"path/filepath"
11+
"strings"
1112
"sync"
1213
)
1314

@@ -151,3 +152,25 @@ func (m *fakeRaplMeter) Name() string {
151152
func (m *fakeRaplMeter) Zones() ([]EnergyZone, error) {
152153
return m.zones, nil
153154
}
155+
156+
// PrimaryEnergyZone returns the zone with the highest energy coverage/priority
157+
func (m *fakeRaplMeter) PrimaryEnergyZone() (EnergyZone, error) {
158+
zones, err := m.Zones()
159+
if err != nil {
160+
return nil, err
161+
}
162+
163+
if len(zones) == 0 {
164+
return nil, fmt.Errorf("no zones available in fake meter")
165+
}
166+
167+
// For fake meter, prefer package if available, otherwise first zone
168+
for _, zone := range zones {
169+
if strings.Contains(strings.ToLower(zone.Name()), "package") {
170+
return zone, nil
171+
}
172+
}
173+
174+
// Fallback to first zone
175+
return zones[0], nil
176+
}

internal/device/rapl_sysfs_power_meter.go

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ type raplPowerMeter struct {
1717
cachedZones []EnergyZone
1818
logger *slog.Logger
1919
zoneFilter []string
20+
topZone EnergyZone
2021
}
2122

2223
type OptionFn func(*raplPowerMeter)
@@ -192,6 +193,43 @@ func (r *raplPowerMeter) zoneNames(zones []EnergyZone) []string {
192193
return names
193194
}
194195

196+
// PrimaryEnergyZone returns the zone with the highest energy coverage/priority
197+
func (r *raplPowerMeter) PrimaryEnergyZone() (EnergyZone, error) {
198+
// Return cached zone if already initialized
199+
if r.topZone != nil {
200+
return r.topZone, nil
201+
}
202+
203+
zones, err := r.Zones()
204+
if err != nil {
205+
return nil, err
206+
}
207+
208+
if len(zones) == 0 {
209+
return nil, fmt.Errorf("no energy zones available")
210+
}
211+
212+
zoneMap := map[string]EnergyZone{}
213+
for _, zone := range zones {
214+
zoneMap[strings.ToLower(zone.Name())] = zone
215+
}
216+
217+
// Priority hierarchy for RAPL zones (highest to lowest priority)
218+
priorityOrder := []string{"psys", "package", "core", "dram", "uncore"}
219+
220+
// Find highest priority zone available
221+
for _, p := range priorityOrder {
222+
if zone, exists := zoneMap[p]; exists {
223+
r.topZone = zone
224+
return zone, nil
225+
}
226+
}
227+
228+
// Fallback to first zone if none match our preferences
229+
r.topZone = zones[0]
230+
return zones[0], nil
231+
}
232+
195233
// isStandardRaplPath checks if a RAPL zone path is in the standard format
196234
func isStandardRaplPath(path string) bool {
197235
return strings.Contains(path, "/intel-rapl:")

internal/device/rapl_sysfs_power_meter_test.go

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -445,3 +445,143 @@ func TestCPUPowerMeter_InitNoZones(t *testing.T) {
445445
assert.Equal(t, "no RAPL zones found", err.Error(), "Start() should return a specific error message")
446446
mockReader.AssertExpectations(t)
447447
}
448+
449+
// TestPrimaryEnergyZone tests the PrimaryEnergyZone method
450+
func TestPrimaryEnergyZone(t *testing.T) {
451+
t.Run("Priority hierarchy", func(t *testing.T) {
452+
tests := []struct {
453+
name string
454+
zones []EnergyZone
455+
expected string
456+
}{{
457+
name: "psys has highest priority",
458+
zones: []EnergyZone{
459+
mockZone{name: "package", index: 0},
460+
mockZone{name: "psys", index: 0},
461+
mockZone{name: "core", index: 0},
462+
},
463+
expected: "psys",
464+
}, {
465+
name: "package has priority over core",
466+
zones: []EnergyZone{
467+
mockZone{name: "core", index: 0},
468+
mockZone{name: "package", index: 0},
469+
mockZone{name: "dram", index: 0},
470+
},
471+
expected: "package",
472+
}, {
473+
name: "core has priority over dram",
474+
zones: []EnergyZone{
475+
mockZone{name: "dram", index: 0},
476+
mockZone{name: "core", index: 0},
477+
mockZone{name: "uncore", index: 0},
478+
},
479+
expected: "core",
480+
}, {
481+
name: "dram has priority over uncore",
482+
zones: []EnergyZone{
483+
mockZone{name: "uncore", index: 0},
484+
mockZone{name: "dram", index: 0},
485+
},
486+
expected: "dram",
487+
}}
488+
489+
for _, tt := range tests {
490+
t.Run(tt.name, func(t *testing.T) {
491+
mockReader := &mockRaplReader{}
492+
mockReader.On("Zones").Return(tt.zones, nil)
493+
494+
meter := &raplPowerMeter{reader: mockReader, logger: slog.Default()}
495+
zone, err := meter.PrimaryEnergyZone()
496+
497+
assert.NoError(t, err)
498+
assert.Equal(t, tt.expected, zone.Name())
499+
mockReader.AssertExpectations(t)
500+
})
501+
}
502+
})
503+
504+
t.Run("Case insensitive matching", func(t *testing.T) {
505+
mockReader := &mockRaplReader{}
506+
mockReader.On("Zones").Return([]EnergyZone{
507+
mockZone{name: "PACKAGE", index: 0},
508+
mockZone{name: "Core", index: 0},
509+
}, nil)
510+
511+
meter := &raplPowerMeter{reader: mockReader, logger: slog.Default()}
512+
zone, err := meter.PrimaryEnergyZone()
513+
514+
assert.NoError(t, err)
515+
assert.Equal(t, "PACKAGE", zone.Name())
516+
mockReader.AssertExpectations(t)
517+
})
518+
519+
t.Run("Fallback to first zone", func(t *testing.T) {
520+
zones := []EnergyZone{
521+
mockZone{name: "unknown1", index: 0},
522+
mockZone{name: "unknown2", index: 1},
523+
}
524+
mockReader := &mockRaplReader{}
525+
mockReader.On("Zones").Return(zones, nil)
526+
527+
meter := &raplPowerMeter{reader: mockReader, logger: slog.Default()}
528+
zone, err := meter.PrimaryEnergyZone()
529+
530+
assert.NoError(t, err)
531+
// NOTE: since reader.Zones() does not guarantee the order after filtering,
532+
// we cannot assert zone.Name() == "unknown1", thus assert the zone returned
533+
// any of the zones passed as input
534+
zoneName := zone.Name()
535+
assert.Contains(t, []string{"unknown1", "unknown2"}, zoneName)
536+
mockReader.AssertExpectations(t)
537+
})
538+
539+
t.Run("Caching behavior", func(t *testing.T) {
540+
mockReader := &mockRaplReader{}
541+
mockReader.On("Zones").Return([]EnergyZone{
542+
mockZone{name: "package", index: 0},
543+
}, nil).Once()
544+
545+
meter := &raplPowerMeter{reader: mockReader, logger: slog.Default()}
546+
547+
// First call should read from zones and cache topZone
548+
zone1, err := meter.PrimaryEnergyZone()
549+
assert.NoError(t, err)
550+
assert.Equal(t, "package", zone1.Name())
551+
552+
// Second call should use cached topZone directly
553+
zone2, err := meter.PrimaryEnergyZone()
554+
assert.NoError(t, err)
555+
assert.Equal(t, "package", zone2.Name())
556+
557+
mockReader.AssertExpectations(t)
558+
})
559+
560+
t.Run("Error handling", func(t *testing.T) {
561+
t.Run("Zones() returns error", func(t *testing.T) {
562+
mockReader := &mockRaplReader{}
563+
mockReader.On("Zones").Return([]EnergyZone{}, errors.New("zones error"))
564+
565+
meter := &raplPowerMeter{reader: mockReader, logger: slog.Default()}
566+
zone, err := meter.PrimaryEnergyZone()
567+
568+
assert.Error(t, err)
569+
assert.Nil(t, zone)
570+
assert.Contains(t, err.Error(), "zones error")
571+
mockReader.AssertExpectations(t)
572+
})
573+
574+
t.Run("Empty zones list", func(t *testing.T) {
575+
mockReader := &mockRaplReader{}
576+
mockReader.On("Zones").Return([]EnergyZone{}, nil)
577+
578+
meter := &raplPowerMeter{reader: mockReader, logger: slog.Default()}
579+
zone, err := meter.PrimaryEnergyZone()
580+
581+
assert.Error(t, err)
582+
assert.Nil(t, zone)
583+
assert.Contains(t, err.Error(), "no RAPL zones found")
584+
mockReader.AssertExpectations(t)
585+
})
586+
})
587+
}

internal/exporter/prometheus/collector/power_collector.go

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -311,13 +311,12 @@ func (c *PowerCollector) collectProcessMetrics(ch chan<- prometheus.Metric, stat
311311

312312
// No need to lock, already done by the calling function
313313
for pid, proc := range processes {
314-
pidStr := fmt.Sprintf("%d", pid)
315314

316315
ch <- prometheus.MustNewConstMetric(
317316
c.processCPUTimeDescriptor,
318317
prometheus.CounterValue,
319318
proc.CPUTotalTime,
320-
pidStr, proc.Comm, proc.Exe, string(proc.Type),
319+
pid, proc.Comm, proc.Exe, string(proc.Type),
321320
proc.ContainerID, proc.VirtualMachineID,
322321
)
323322

@@ -327,7 +326,7 @@ func (c *PowerCollector) collectProcessMetrics(ch chan<- prometheus.Metric, stat
327326
c.processCPUJoulesDescriptor,
328327
prometheus.CounterValue,
329328
usage.EnergyTotal.Joules(),
330-
pidStr, proc.Comm, proc.Exe, string(proc.Type), state,
329+
pid, proc.Comm, proc.Exe, string(proc.Type), state,
331330
proc.ContainerID, proc.VirtualMachineID,
332331
zoneName,
333332
)
@@ -336,7 +335,7 @@ func (c *PowerCollector) collectProcessMetrics(ch chan<- prometheus.Metric, stat
336335
c.processCPUWattsDescriptor,
337336
prometheus.GaugeValue,
338337
usage.Power.Watts(),
339-
pidStr, proc.Comm, proc.Exe, string(proc.Type), state,
338+
pid, proc.Comm, proc.Exe, string(proc.Type), state,
340339
proc.ContainerID, proc.VirtualMachineID,
341340
zoneName,
342341
)

internal/exporter/prometheus/collector/power_collector_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -215,7 +215,7 @@ func TestPowerCollector(t *testing.T) {
215215
}
216216

217217
testProcesses := monitor.Processes{
218-
123: {
218+
"123": {
219219
PID: 123,
220220
Comm: "test-process",
221221
Exe: "/usr/bin/123",
@@ -527,7 +527,7 @@ func TestTerminatedProcessExport(t *testing.T) {
527527
},
528528
},
529529
Processes: monitor.Processes{
530-
123: &monitor.Process{
530+
"123": &monitor.Process{
531531
PID: 123,
532532
Comm: "running-proc",
533533
Exe: "/usr/bin/running-proc",
@@ -544,7 +544,7 @@ func TestTerminatedProcessExport(t *testing.T) {
544544
},
545545
},
546546
TerminatedProcesses: monitor.Processes{
547-
456: &monitor.Process{
547+
"456": &monitor.Process{
548548
PID: 456,
549549
Comm: "terminated-proc",
550550
Exe: "/usr/bin/terminated-proc",
@@ -633,7 +633,7 @@ func TestEnhancedErrorReporting(t *testing.T) {
633633
},
634634
},
635635
Processes: monitor.Processes{
636-
123: &monitor.Process{
636+
"123": &monitor.Process{
637637
PID: 123,
638638
Comm: "actual-proc",
639639
Exe: "/usr/bin/actual-proc",
@@ -650,7 +650,7 @@ func TestEnhancedErrorReporting(t *testing.T) {
650650
},
651651
},
652652
TerminatedProcesses: monitor.Processes{
653-
456: &monitor.Process{
653+
"456": &monitor.Process{
654654
PID: 456,
655655
Comm: "terminated-proc",
656656
Exe: "/usr/bin/terminated-proc",
@@ -771,7 +771,7 @@ func TestPowerCollector_MetricsLevelFiltering(t *testing.T) {
771771
UsageRatio: 0.5,
772772
},
773773
Processes: monitor.Processes{
774-
123: &monitor.Process{
774+
"123": &monitor.Process{
775775
PID: 123,
776776
Comm: "test-process",
777777
Exe: "/usr/bin/test-process",

0 commit comments

Comments
 (0)