Skip to content

Commit f7243fd

Browse files
authored
Merge pull request #2209 from sthaha/fix-terminated-min-power
feat: allow setting energy threshold for terminated workload tracking
2 parents d7a013f + 700ea62 commit f7243fd

File tree

20 files changed

+554
-152
lines changed

20 files changed

+554
-152
lines changed

cmd/kepler/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ func createServices(logger *slog.Logger, cfg *config.Config) ([]service.Service,
154154
monitor.WithInterval(cfg.Monitor.Interval),
155155
monitor.WithMaxStaleness(cfg.Monitor.Staleness),
156156
monitor.WithMaxTerminated(cfg.Monitor.MaxTerminated),
157+
monitor.WithMinTerminatedEnergyThreshold(monitor.Energy(cfg.Monitor.MinTerminatedEnergyThreshold)*monitor.Joule),
157158
)
158159

159160
apiServer := server.NewAPIServer(

compose/dev/kepler-dev/etc/kepler/config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ monitor:
2424
# to be kept in memory until the data is exported; 0 disables the limit
2525
maxTerminated: 500
2626

27+
# minimum energy threshold (in joules) for terminated workloads
28+
# terminated workloads with energy consumption below this threshold will be filtered out
29+
minTerminatedEnergyThreshold: 10
30+
2731
host:
2832
sysfs: /host/sys # Path to sysfs filesystem (default: /sys)
2933
procfs: /host/proc # Path to procfs filesystem (default: /proc)

config/config.go

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,12 @@ type (
5454
// =0: Disable terminated workload tracking completely
5555
// >0: Track top N terminated workloads by energy consumption
5656
MaxTerminated int `yaml:"maxTerminated"`
57+
58+
// MinTerminatedEnergyThreshold sets the minimum energy consumption threshold for terminated workloads
59+
// Only terminated workloads with energy consumption above this threshold will be tracked
60+
// Value is in joules (e.g., 10 = 10 joules)
61+
// TODO: Add support for parsing energy units like "10J", "500mJ", "2kJ"
62+
MinTerminatedEnergyThreshold int64 `yaml:"minTerminatedEnergyThreshold"`
5763
}
5864

5965
// Exporter configuration
@@ -201,7 +207,8 @@ func DefaultConfig() *Config {
201207
Interval: 5 * time.Second,
202208
Staleness: 500 * time.Millisecond,
203209

204-
MaxTerminated: 500,
210+
MaxTerminated: 500,
211+
MinTerminatedEnergyThreshold: 10, // 10 Joules
205212
},
206213
Exporter: Exporter{
207214
Stdout: StdoutExporter{
@@ -476,8 +483,9 @@ func (c *Config) Validate(skips ...SkipValidation) error {
476483
if c.Monitor.Staleness < 0 {
477484
errs = append(errs, fmt.Sprintf("invalid monitor staleness: %s can't be negative", c.Monitor.Staleness))
478485
}
479-
if c.Monitor.MaxTerminated < 0 {
480-
errs = append(errs, fmt.Sprintf("invalid monitor max terminated: %d can't be negative", c.Monitor.MaxTerminated))
486+
487+
if c.Monitor.MinTerminatedEnergyThreshold < 0 {
488+
errs = append(errs, fmt.Sprintf("invalid monitor min terminated energy threshold: %d can't be negative", c.Monitor.MinTerminatedEnergyThreshold))
481489
}
482490
}
483491
{ // Kubernetes

config/config_test.go

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -633,14 +633,29 @@ func TestMonitorConfig(t *testing.T) {
633633
assert.NoError(t, cfg.Validate())
634634

635635
cfg.Monitor.MaxTerminated = -10
636-
assert.ErrorContains(t, cfg.Validate(), "invalid configuration: invalid monitor max terminated")
636+
assert.NoError(t, cfg.Validate(), "invalid configuration: invalid monitor max terminated")
637637

638638
cfg.Monitor.MaxTerminated = 0
639639
assert.NoError(t, cfg.Validate(), "maxTerminated=0 should be valid (unlimited)")
640640

641641
cfg.Monitor.MaxTerminated = 1000
642642
assert.NoError(t, cfg.Validate())
643643
})
644+
645+
t.Run("minTerminatedEnergyThreshold", func(t *testing.T) {
646+
cfg := DefaultConfig()
647+
assert.Equal(t, int64(10), cfg.Monitor.MinTerminatedEnergyThreshold, "default minTerminatedEnergyThreshold should be 10")
648+
assert.NoError(t, cfg.Validate())
649+
650+
cfg.Monitor.MinTerminatedEnergyThreshold = -10
651+
assert.ErrorContains(t, cfg.Validate(), "invalid configuration: invalid monitor min terminated energy threshold")
652+
653+
cfg.Monitor.MinTerminatedEnergyThreshold = 0
654+
assert.NoError(t, cfg.Validate(), "minTerminatedEnergyThreshold=0 should be valid (no filtering)")
655+
656+
cfg.Monitor.MinTerminatedEnergyThreshold = 1000
657+
assert.NoError(t, cfg.Validate())
658+
})
644659
}
645660

646661
func TestMonitorConfigFlags(t *testing.T) {
@@ -676,9 +691,9 @@ func TestMonitorConfigFlags(t *testing.T) {
676691
args: []string{"--monitor.max-terminated=0"},
677692
expected: expect{interval: 5 * time.Second, staleness: 500 * time.Millisecond, maxTerminated: 0, parseError: nil},
678693
}, {
679-
name: "invalid-max-terminated",
694+
name: "negative-max-terminated",
680695
args: []string{"--monitor.max-terminated=-10"},
681-
expected: expect{cfgErr: fmt.Errorf("invalid configuration: invalid monitor max terminated")},
696+
expected: expect{interval: 5 * time.Second, staleness: 500 * time.Millisecond, maxTerminated: -10, parseError: nil},
682697
}}
683698

684699
for _, tc := range tt {
@@ -731,15 +746,50 @@ monitor:
731746
assert.Equal(t, 0, cfg.Monitor.MaxTerminated)
732747
})
733748

734-
t.Run("yaml-config-maxTerminated-invalid", func(t *testing.T) {
749+
t.Run("yaml-config-maxTerminated-negative", func(t *testing.T) {
735750
yamlData := `
736751
monitor:
737752
maxTerminated: -100
753+
`
754+
reader := strings.NewReader(yamlData)
755+
cfg, err := Load(reader)
756+
assert.NoError(t, err)
757+
assert.Equal(t, -100, cfg.Monitor.MaxTerminated)
758+
})
759+
}
760+
761+
func TestMonitorMinTerminatedEnergyThresholdYAML(t *testing.T) {
762+
t.Run("yaml-config-minTerminatedEnergyThreshold", func(t *testing.T) {
763+
yamlData := `
764+
monitor:
765+
minTerminatedEnergyThreshold: 50
766+
`
767+
reader := strings.NewReader(yamlData)
768+
cfg, err := Load(reader)
769+
assert.NoError(t, err)
770+
assert.Equal(t, int64(50), cfg.Monitor.MinTerminatedEnergyThreshold)
771+
})
772+
773+
t.Run("yaml-config-minTerminatedEnergyThreshold-zero", func(t *testing.T) {
774+
yamlData := `
775+
monitor:
776+
minTerminatedEnergyThreshold: 0
777+
`
778+
reader := strings.NewReader(yamlData)
779+
cfg, err := Load(reader)
780+
assert.NoError(t, err)
781+
assert.Equal(t, int64(0), cfg.Monitor.MinTerminatedEnergyThreshold)
782+
})
783+
784+
t.Run("yaml-config-minTerminatedEnergyThreshold-invalid", func(t *testing.T) {
785+
yamlData := `
786+
monitor:
787+
minTerminatedEnergyThreshold: -100
738788
`
739789
reader := strings.NewReader(yamlData)
740790
_, err := Load(reader)
741791
assert.Error(t, err)
742-
assert.Contains(t, err.Error(), "invalid monitor max terminated")
792+
assert.Contains(t, err.Error(), "invalid monitor min terminated energy threshold")
743793
})
744794
}
745795

docs/configuration/configuration.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ monitor:
8383
interval: 5s # Monitor refresh interval (default: 5s)
8484
staleness: 1000ms # Duration after which data is considered stale (default: 1000ms)
8585
maxTerminated: 500 # Maximum number of terminated workloads to keep in memory (default: 500)
86+
minTerminatedEnergyThreshold: 10 # Minimum energy threshold for terminated workloads (default: 10)
8687

8788
host:
8889
sysfs: /sys # Path to sysfs filesystem (default: /sys)
@@ -154,6 +155,7 @@ monitor:
154155
interval: 5s
155156
staleness: 1000ms
156157
maxTerminated: 500
158+
minTerminatedEnergyThreshold: 10
157159
```
158160

159161
- **interval**: The monitor's refresh interval. All processes with a lifetime less than this interval will be ignored. Setting to 0s disables monitor refreshes.
@@ -162,6 +164,8 @@ monitor:
162164

163165
- **maxTerminated**: Maximum number of terminated workloads (processes, containers, VMs, pods) to keep in memory until the data is exported. This prevents unbounded memory growth in high-churn environments. Set to 0 for unlimited (no limit). When the limit is reached, the least power consuming terminated workloads are removed first.
164166

167+
- **minTerminatedEnergyThreshold**: Minimum energy consumption threshold (in joules) for terminated workloads to be tracked. Only terminated workloads with energy consumption above this threshold will be included in the tracking. This helps filter out short-lived processes that consume minimal energy. Default is 10 joules.
168+
165169
### 🗄️ Host Configuration
166170

167171
```yaml

hack/config.yaml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,10 @@ monitor:
2424
# to be kept in memory until the data is exported; 0 disables the limit
2525
maxTerminated: 500
2626

27+
# minimum energy threshold (in joules) for terminated workloads
28+
# terminated workloads with energy consumption below this threshold will be filtered out
29+
minTerminatedEnergyThreshold: 10
30+
2731
host:
2832
sysfs: /sys # Path to sysfs filesystem (default: /sys)
2933
procfs: /proc # Path to procfs filesystem (default: /proc)

internal/monitor/container.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -85,17 +85,9 @@ func (pm *PowerMonitor) calculateContainerPower(prev, newSnapshot *Snapshot) err
8585
continue
8686
}
8787

88-
// Only include terminated containers that have consumed energy
89-
if prevContainer.Zones.HasZeroEnergy() {
90-
pm.logger.Debug("Filtering out terminated container with zero energy", "id", id)
91-
continue
92-
}
93-
pm.logger.Debug("Including terminated container with non-zero energy", "id", id)
94-
9588
// Add to internal tracker (which will handle priority-based retention)
9689
// NOTE: Each terminated container is only added once since a container cannot be terminated twice
97-
terminatedContainer := prevContainer.Clone()
98-
pm.terminatedContainersTracker.Add(terminatedContainer)
90+
pm.terminatedContainersTracker.Add(prevContainer.Clone())
9991
}
10092

10193
// process running containers

internal/monitor/container_power_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -655,11 +655,12 @@ func TestTerminatedContainerTracking(t *testing.T) {
655655
resInformer := &MockResourceInformer{}
656656

657657
monitor := &PowerMonitor{
658-
logger: logger,
659-
cpu: mockMeter,
660-
clock: fakeClock,
661-
resources: resInformer,
662-
maxTerminated: 500,
658+
logger: logger,
659+
cpu: mockMeter,
660+
clock: fakeClock,
661+
resources: resInformer,
662+
maxTerminated: 500,
663+
minTerminatedEnergyThreshold: 1 * Joule, // Set threshold to filter zero-energy containers
663664
}
664665

665666
err := monitor.Init()

internal/monitor/monitor.go

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,17 @@ type PowerMonitor struct {
4040
logger *slog.Logger
4141
cpu device.CPUPowerMeter
4242

43-
interval time.Duration
44-
clock clock.WithTicker
45-
maxStaleness time.Duration
46-
maxTerminated int
47-
resources resource.Informer
43+
interval time.Duration
44+
clock clock.WithTicker
45+
46+
// related to snapshots
47+
maxStaleness time.Duration
48+
49+
// related to terminated resource tracking
50+
maxTerminated int
51+
minTerminatedEnergyThreshold Energy
52+
53+
resources resource.Informer
4854

4955
// signals when a snapshot has been updated
5056
dataCh chan struct{}
@@ -86,14 +92,18 @@ func NewPowerMonitor(meter device.CPUPowerMeter, applyOpts ...OptionFn) *PowerMo
8692
ctx, cancel := context.WithCancel(context.Background())
8793

8894
monitor := &PowerMonitor{
89-
logger: opts.logger.With("service", "monitor"),
90-
cpu: meter,
91-
clock: opts.clock,
92-
interval: opts.interval,
93-
resources: opts.resources,
94-
dataCh: make(chan struct{}, 1),
95-
maxStaleness: opts.maxStaleness,
96-
maxTerminated: opts.maxTerminated,
95+
logger: opts.logger.With("service", "monitor"),
96+
cpu: meter,
97+
clock: opts.clock,
98+
interval: opts.interval,
99+
resources: opts.resources,
100+
dataCh: make(chan struct{}, 1),
101+
102+
maxStaleness: opts.maxStaleness,
103+
104+
maxTerminated: opts.maxTerminated,
105+
minTerminatedEnergyThreshold: opts.minTerminatedEnergyThreshold,
106+
97107
collectionCtx: ctx,
98108
collectionCancel: cancel,
99109
}
@@ -119,11 +129,19 @@ func (pm *PowerMonitor) Init() error {
119129
pm.logger.Info("Using primary energy zone for terminated workload tracking",
120130
"zone", primaryEnergyZone.Name())
121131

122-
// Initialize terminated workload trackers with the primary energy zone
123-
pm.terminatedProcessesTracker = NewTerminatedResourceTracker[*Process](primaryEnergyZone, pm.maxTerminated, pm.logger)
124-
pm.terminatedContainersTracker = NewTerminatedResourceTracker[*Container](primaryEnergyZone, pm.maxTerminated, pm.logger)
125-
pm.terminatedVMsTracker = NewTerminatedResourceTracker[*VirtualMachine](primaryEnergyZone, pm.maxTerminated, pm.logger)
126-
pm.terminatedPodsTracker = NewTerminatedResourceTracker[*Pod](primaryEnergyZone, pm.maxTerminated, pm.logger)
132+
// Initialize terminated workload trackers with the primary energy zone and minimum energy threshold
133+
pm.terminatedProcessesTracker = NewTerminatedResourceTracker[*Process](
134+
primaryEnergyZone, pm.maxTerminated,
135+
pm.minTerminatedEnergyThreshold, pm.logger)
136+
pm.terminatedContainersTracker = NewTerminatedResourceTracker[*Container](
137+
primaryEnergyZone, pm.maxTerminated,
138+
pm.minTerminatedEnergyThreshold, pm.logger)
139+
pm.terminatedVMsTracker = NewTerminatedResourceTracker[*VirtualMachine](
140+
primaryEnergyZone, pm.maxTerminated,
141+
pm.minTerminatedEnergyThreshold, pm.logger)
142+
pm.terminatedPodsTracker = NewTerminatedResourceTracker[*Pod](
143+
primaryEnergyZone, pm.maxTerminated,
144+
pm.minTerminatedEnergyThreshold, pm.logger)
127145

128146
// signal now so that exporters can construct descriptors
129147
pm.signalNewData()

internal/monitor/options.go

Lines changed: 21 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -12,23 +12,25 @@ import (
1212
)
1313

1414
type Opts struct {
15-
logger *slog.Logger
16-
interval time.Duration
17-
clock clock.WithTicker
18-
resources resource.Informer
19-
maxStaleness time.Duration
20-
maxTerminated int
15+
logger *slog.Logger
16+
interval time.Duration
17+
clock clock.WithTicker
18+
resources resource.Informer
19+
maxStaleness time.Duration
20+
maxTerminated int
21+
minTerminatedEnergyThreshold Energy
2122
}
2223

2324
// NewConfig returns a new Config with defaults set
2425
func DefaultOpts() Opts {
2526
return Opts{
26-
logger: slog.Default(),
27-
interval: 5 * time.Second,
28-
clock: clock.RealClock{},
29-
maxStaleness: 500 * time.Millisecond,
30-
resources: nil,
31-
maxTerminated: 500,
27+
logger: slog.Default(),
28+
interval: 5 * time.Second,
29+
clock: clock.RealClock{},
30+
maxStaleness: 500 * time.Millisecond,
31+
resources: nil,
32+
maxTerminated: 500,
33+
minTerminatedEnergyThreshold: 10 * Joule,
3234
}
3335
}
3436

@@ -76,3 +78,10 @@ func WithMaxTerminated(max int) OptionFn {
7678
o.maxTerminated = max
7779
}
7880
}
81+
82+
// WithMinTerminatedEnergyThreshold sets the minimum energy threshold for terminated workloads
83+
func WithMinTerminatedEnergyThreshold(threshold Energy) OptionFn {
84+
return func(o *Opts) {
85+
o.minTerminatedEnergyThreshold = threshold
86+
}
87+
}

0 commit comments

Comments
 (0)