Skip to content

Commit e78597d

Browse files
authored
refactor affinity to use same startegy as flux operator (#59)
* refactor affinity to use same startegy as flux operator Signed-off-by: vsoch <vsoch@users.noreply.github.com>
1 parent 1a4fb50 commit e78597d

File tree

6 files changed

+63
-10
lines changed

6 files changed

+63
-10
lines changed

pkg/jobs/launcher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ func (m *LauncherWorker) AddWorkers(
188188
) (*jobset.ReplicatedJob, error) {
189189

190190
numWorkers := spec.Spec.Pods - 1
191-
workers, err := metrics.GetReplicatedJob(spec, false, numWorkers, numWorkers, m.WorkerLetter)
191+
workers, err := metrics.GetReplicatedJob(spec, false, numWorkers, numWorkers, m.WorkerLetter, m.SoleTenancy)
192192
if err != nil {
193193
return workers, err
194194
}
@@ -221,7 +221,7 @@ func (m *LauncherWorker) ReplicatedJobs(spec *api.MetricSet) ([]jobset.Replicate
221221
m.ensureDefaultNames()
222222

223223
// Generate a replicated job for the launcher (LauncherWorker) and workers
224-
launcher, err := metrics.GetReplicatedJob(spec, false, 1, 1, m.LauncherLetter)
224+
launcher, err := metrics.GetReplicatedJob(spec, false, 1, 1, m.LauncherLetter, m.SoleTenancy)
225225
if err != nil {
226226
return js, err
227227
}

pkg/jobs/storage.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func (m StorageGeneric) Description() string {
4747
return m.Summary
4848
}
4949

50+
// By default assume storage does not have sole tenancy
5051
func (m StorageGeneric) HasSoleTenancy() bool {
5152
return false
5253
}

pkg/metrics/application.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func GetApplicationReplicatedJobs(
5656
m := (*metric)
5757

5858
// This defaults to one replicated job, named "m", no custom replicated job name, and sole tenancy false
59-
job, err := GetReplicatedJob(spec, shareProcessNamespace, spec.Spec.Pods, spec.Spec.Completions, "")
59+
job, err := GetReplicatedJob(spec, shareProcessNamespace, spec.Spec.Pods, spec.Spec.Completions, "", m.HasSoleTenancy())
6060
if err != nil {
6161
return rjs, err
6262
}

pkg/metrics/jobset.go

Lines changed: 58 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@ var (
2828
soleTenancyValue = "sole-tenancy"
2929
)
3030

31+
const podLabelAppName = "app.kubernetes.io/name"
32+
3133
// GetJobSet is called by the controller to return some JobSet based
3234
// on the type: application, storage, or standalone
3335
func GetJobSet(
@@ -45,7 +47,7 @@ func GetJobSet(
4547
successJobs := getSuccessJobs(set.Metrics())
4648

4749
// A base JobSet can hold one or more replicated jobs
48-
js := getBaseJobSet(spec, successJobs, set.HasSoleTenancy())
50+
js := getBaseJobSet(spec, successJobs)
4951

5052
// Get one or more replicated jobs, depending on the type
5153
rjs, err := set.ReplicatedJobs(spec)
@@ -80,7 +82,7 @@ func getSuccessJobs(metrics []*Metric) []string {
8082
}
8183

8284
// getBaseJobSet shared for either an application or isolated jobset
83-
func getBaseJobSet(set *api.MetricSet, successSet []string, soleTenancy bool) *jobset.JobSet {
85+
func getBaseJobSet(set *api.MetricSet, successSet []string) *jobset.JobSet {
8486

8587
// When suspend is true we have a hard time debugging jobs, so keep false
8688
suspend := false
@@ -112,9 +114,6 @@ func getBaseJobSet(set *api.MetricSet, successSet []string, soleTenancy bool) *j
112114
}
113115

114116
// Do we want to assign 1 node: 1 pod? We can use Pod Anti-affinity for that
115-
if soleTenancy {
116-
js.ObjectMeta.Annotations = map[string]string{jobset.ExclusiveKey: "kubernetes.io/hostname"}
117-
}
118117
return &js
119118
}
120119

@@ -125,6 +124,7 @@ func GetReplicatedJob(
125124
pods int32,
126125
completions int32,
127126
jobname string,
127+
soleTenancy bool,
128128
) (*jobset.ReplicatedJob, error) {
129129

130130
// Default replicated job name, if not set
@@ -183,6 +183,11 @@ func GetReplicatedJob(
183183
},
184184
}
185185

186+
// Do we want sole tenancy?
187+
if soleTenancy {
188+
jobspec.Template.Spec.Affinity = getAffinity(set)
189+
}
190+
186191
// Do we have a pull secret for the application image?
187192
if set.Spec.Application.PullSecret != "" {
188193
jobspec.Template.Spec.ImagePullSecrets = []corev1.LocalObjectReference{
@@ -193,3 +198,51 @@ func GetReplicatedJob(
193198
job.Template.Spec = jobspec
194199
return &job, nil
195200
}
201+
202+
// getAffinity returns to pod affinity to ensure 1 address / node
203+
func getAffinity(set *api.MetricSet) *corev1.Affinity {
204+
return &corev1.Affinity{
205+
// Prefer to schedule pods on the same zone
206+
PodAffinity: &corev1.PodAffinity{
207+
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
208+
{
209+
Weight: 100,
210+
PodAffinityTerm: corev1.PodAffinityTerm{
211+
LabelSelector: &metav1.LabelSelector{
212+
MatchExpressions: []metav1.LabelSelectorRequirement{
213+
{
214+
// added in getPodLabels
215+
Key: podLabelAppName,
216+
Operator: metav1.LabelSelectorOpIn,
217+
Values: []string{set.Name},
218+
},
219+
},
220+
},
221+
TopologyKey: "topology.kubernetes.io/zone",
222+
},
223+
},
224+
},
225+
},
226+
// Prefer to schedule pods on different nodes
227+
PodAntiAffinity: &corev1.PodAntiAffinity{
228+
PreferredDuringSchedulingIgnoredDuringExecution: []corev1.WeightedPodAffinityTerm{
229+
{
230+
Weight: 100,
231+
PodAffinityTerm: corev1.PodAffinityTerm{
232+
LabelSelector: &metav1.LabelSelector{
233+
MatchExpressions: []metav1.LabelSelectorRequirement{
234+
{
235+
// added in getPodLabels
236+
Key: podLabelAppName,
237+
Operator: metav1.LabelSelectorOpIn,
238+
Values: []string{set.Name},
239+
},
240+
},
241+
},
242+
TopologyKey: "kubernetes.io/hostname",
243+
},
244+
},
245+
},
246+
},
247+
}
248+
}

pkg/metrics/metricset.go

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ type MetricSet interface {
4848
Metrics() []*Metric
4949
EntrypointScripts(*api.MetricSet) []EntrypointScript
5050
ReplicatedJobs(*api.MetricSet) ([]jobset.ReplicatedJob, error)
51-
HasSoleTenancy() bool
5251
}
5352

5453
// get an application default entrypoint, if not determined by metric

pkg/metrics/storage.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ func (m *StorageMetricSet) ReplicatedJobs(spec *api.MetricSet) ([]jobset.Replica
2222

2323
// Storage metrics do not need to share the process namespace
2424
// The jobname empty string will use the default, no custom replicated job name, and sole tenancy false
25-
job, err := GetReplicatedJob(spec, false, spec.Spec.Pods, spec.Spec.Completions, "")
25+
job, err := GetReplicatedJob(spec, false, spec.Spec.Pods, spec.Spec.Completions, "", m.HasSoleTenancy())
2626
if err != nil {
2727
return rjs, err
2828
}

0 commit comments

Comments
 (0)