Skip to content

Commit 3d594cc

Browse files
authored
fix: always sort bindings in the scheduler to ensure deterministic decision list output (#361)
1 parent 6862d17 commit 3d594cc

File tree

4 files changed

+295
-5
lines changed

4 files changed

+295
-5
lines changed

pkg/scheduler/framework/framework.go

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -607,7 +607,7 @@ type filteredClusterWithStatus struct {
607607
status *Status
608608
}
609609

610-
// helper type to pretty print a list of filteredClusterWithStatus
610+
// filteredClusterWithStatusList is a list of filteredClusterWithStatus.
611611
type filteredClusterWithStatusList []*filteredClusterWithStatus
612612

613613
func (cs filteredClusterWithStatusList) String() string {
@@ -621,6 +621,15 @@ func (cs filteredClusterWithStatusList) String() string {
621621
return fmt.Sprintf("filteredClusters[%s]", strings.Join(filteredClusters, ", "))
622622
}
623623

624+
// Implement sort.Interface for filteredClusterWithStatusList.
625+
func (f filteredClusterWithStatusList) Len() int { return len(f) }
626+
func (f filteredClusterWithStatusList) Less(i, j int) bool {
627+
return f[i].cluster.Name < f[j].cluster.Name
628+
}
629+
func (f filteredClusterWithStatusList) Swap(i, j int) {
630+
f[i], f[j] = f[j], f[i]
631+
}
632+
624633
// runFilterPlugins runs filter plugins on clusters in parallel.
625634
func (f *framework) runFilterPlugins(ctx context.Context, state *CycleState, policy placementv1beta1.PolicySnapshotObj, clusters []clusterv1beta1.MemberCluster) (passed []*clusterv1beta1.MemberCluster, filtered filteredClusterWithStatusList, err error) {
626635
// Create a child context.
@@ -787,6 +796,14 @@ func (f *framework) updatePolicySnapshotStatusFromBindings(
787796
return controller.NewUnexpectedBehaviorError(err)
788797
}
789798

799+
// Sort all filtered clusters.
800+
//
801+
// This step is needed to produce deterministic decision outputs. If there are enough slots,
802+
// the scheduler will try to explain why some clusters are filtered out in the decision list; to ensure
803+
// that the list will not change across scheduling cycles without actual scheduling policy
804+
// refreshes, the filtered clusters need to be sorted.
805+
sort.Sort(filteredClusterWithStatusList(filtered))
806+
790807
// Prepare new scheduling decisions.
791808
newDecisions := newSchedulingDecisionsFromBindings(f.maxUnselectedClusterDecisionCount, notPicked, filtered, existing...)
792809
// Prepare new scheduling condition.

pkg/scheduler/framework/framework_test.go

Lines changed: 177 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,9 +23,11 @@ import (
2323
"log"
2424
"os"
2525
"strings"
26+
"sync/atomic"
2627
"testing"
2728
"time"
2829

30+
crossplanetest "github.com/crossplane/crossplane-runtime/v2/pkg/test"
2931
"github.com/google/go-cmp/cmp"
3032
"github.com/google/go-cmp/cmp/cmpopts"
3133
k8serrors "k8s.io/apimachinery/pkg/api/errors"
@@ -2734,15 +2736,15 @@ func TestUpdatePolicySnapshotStatusFromBindings(t *testing.T) {
27342736
{
27352737
cluster: &clusterv1beta1.MemberCluster{
27362738
ObjectMeta: metav1.ObjectMeta{
2737-
Name: altClusterName,
2739+
Name: anotherClusterName,
27382740
},
27392741
},
27402742
status: filteredStatus,
27412743
},
27422744
{
27432745
cluster: &clusterv1beta1.MemberCluster{
27442746
ObjectMeta: metav1.ObjectMeta{
2745-
Name: anotherClusterName,
2747+
Name: altClusterName,
27462748
},
27472749
},
27482750
status: filteredStatus,
@@ -2760,7 +2762,7 @@ func TestUpdatePolicySnapshotStatusFromBindings(t *testing.T) {
27602762
Reason: fmt.Sprintf(resourceScheduleSucceededWithScoreMessageFormat, clusterName, affinityScore1, topologySpreadScore1),
27612763
},
27622764
{
2763-
ClusterName: altClusterName,
2765+
ClusterName: anotherClusterName,
27642766
Selected: false,
27652767
Reason: filteredStatus.String(),
27662768
},
@@ -6536,3 +6538,175 @@ func TestUpdatePolicySnapshotStatusForPickFixedPlacementType(t *testing.T) {
65366538
})
65376539
}
65386540
}
6541+
6542+
// TestRunSchedulingCycleForPickAllPlacementType_StableStatusOutputInLargeFleet tests the
6543+
// runSchedulingCycleForPickAllPlacementType method, specifically to ensure that the status output
6544+
// remains consistent when running the scheduling cycle in a large fleet (i.e., the scheduler
6545+
// will not constantly refresh the status across scheduling cycles).
6546+
func TestRunSchedulingCycleForPickAllPlacementType_StableStatusOutputInLargeFleet(t *testing.T) {
6547+
ctx := context.Background()
6548+
6549+
// Set up the scheduler profile with a label-based dummy filter plugin.
6550+
profile := NewProfile("TestOnly")
6551+
6552+
dummyLabelBasedFilterPluginName := fmt.Sprintf(dummyAllPurposePluginNameFormat, 0)
6553+
wantLabelKey := "pre-selected"
6554+
wantLabelValue := "true"
6555+
wantLabels := map[string]string{
6556+
wantLabelKey: wantLabelValue,
6557+
}
6558+
dummyLabelBasedFilterPlugin := &DummyAllPurposePlugin{
6559+
name: dummyLabelBasedFilterPluginName,
6560+
filterRunner: func(ctx context.Context, state CycleStatePluginReadWriter, policy placementv1beta1.PolicySnapshotObj, cluster *clusterv1beta1.MemberCluster) (status *Status) {
6561+
memberClusterLabels := cluster.GetLabels()
6562+
for wk, wv := range wantLabels {
6563+
if v, ok := memberClusterLabels[wk]; !ok || v != wv {
6564+
return NewNonErrorStatus(ClusterUnschedulable, dummyLabelBasedFilterPluginName)
6565+
}
6566+
}
6567+
return nil
6568+
},
6569+
}
6570+
profile.WithFilterPlugin(dummyLabelBasedFilterPlugin)
6571+
6572+
mockClientStatusUpdateCount := atomic.Int32{}
6573+
mockClient := crossplanetest.MockClient{
6574+
MockCreate: func(ctx context.Context, obj client.Object, opts ...client.CreateOption) error {
6575+
return nil
6576+
},
6577+
MockStatusUpdate: func(ctx context.Context, obj client.Object, opts ...client.SubResourceUpdateOption) error {
6578+
_ = mockClientStatusUpdateCount.Add(1)
6579+
return nil
6580+
},
6581+
}
6582+
6583+
f := &framework{
6584+
profile: profile,
6585+
client: &mockClient,
6586+
uncachedReader: &mockClient,
6587+
manager: nil,
6588+
eventRecorder: nil,
6589+
parallelizer: parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
6590+
maxUnselectedClusterDecisionCount: 3,
6591+
// The cluster eligibility checker is not invoked in this test spec.
6592+
clusterEligibilityChecker: clustereligibilitychecker.New(),
6593+
}
6594+
// No need to set up plugins with the framework.
6595+
6596+
clusters := []clusterv1beta1.MemberCluster{
6597+
{
6598+
ObjectMeta: metav1.ObjectMeta{
6599+
Name: fmt.Sprintf(clusterNameTemplate, 1),
6600+
},
6601+
},
6602+
{
6603+
ObjectMeta: metav1.ObjectMeta{
6604+
Name: fmt.Sprintf(clusterNameTemplate, 2),
6605+
},
6606+
},
6607+
{
6608+
ObjectMeta: metav1.ObjectMeta{
6609+
Name: fmt.Sprintf(clusterNameTemplate, 3),
6610+
},
6611+
},
6612+
{
6613+
ObjectMeta: metav1.ObjectMeta{
6614+
Name: fmt.Sprintf(clusterNameTemplate, 4),
6615+
},
6616+
},
6617+
{
6618+
ObjectMeta: metav1.ObjectMeta{
6619+
Name: fmt.Sprintf(clusterNameTemplate, 5),
6620+
},
6621+
},
6622+
{
6623+
ObjectMeta: metav1.ObjectMeta{
6624+
Name: fmt.Sprintf(clusterNameTemplate, 6),
6625+
},
6626+
},
6627+
{
6628+
ObjectMeta: metav1.ObjectMeta{
6629+
Name: fmt.Sprintf(clusterNameTemplate, 7),
6630+
},
6631+
},
6632+
{
6633+
ObjectMeta: metav1.ObjectMeta{
6634+
Name: fmt.Sprintf(clusterNameTemplate, 8),
6635+
},
6636+
},
6637+
{
6638+
ObjectMeta: metav1.ObjectMeta{
6639+
Name: fmt.Sprintf(clusterNameTemplate, 9),
6640+
},
6641+
},
6642+
}
6643+
state := NewCycleState(clusters, nil, nil)
6644+
placementKey := queue.PlacementKey(crpName)
6645+
wantClusterUnschedulableReason := "ClusterUnschedulable"
6646+
policy := &placementv1beta1.ClusterSchedulingPolicySnapshot{
6647+
ObjectMeta: metav1.ObjectMeta{
6648+
Name: policyName,
6649+
Annotations: map[string]string{
6650+
placementv1beta1.CRPGenerationAnnotation: "0",
6651+
},
6652+
},
6653+
Spec: placementv1beta1.SchedulingPolicySnapshotSpec{
6654+
Policy: &placementv1beta1.PlacementPolicy{
6655+
PlacementType: placementv1beta1.PickAllPlacementType,
6656+
Affinity: &placementv1beta1.Affinity{
6657+
ClusterAffinity: &placementv1beta1.ClusterAffinity{
6658+
RequiredDuringSchedulingIgnoredDuringExecution: &placementv1beta1.ClusterSelector{
6659+
ClusterSelectorTerms: []placementv1beta1.ClusterSelectorTerm{
6660+
{
6661+
LabelSelector: &metav1.LabelSelector{
6662+
MatchLabels: map[string]string{
6663+
wantLabelKey: wantLabelValue,
6664+
},
6665+
},
6666+
},
6667+
},
6668+
},
6669+
},
6670+
},
6671+
},
6672+
},
6673+
Status: placementv1beta1.SchedulingPolicySnapshotStatus{
6674+
Conditions: []metav1.Condition{
6675+
{
6676+
Type: string(placementv1beta1.PolicySnapshotScheduled),
6677+
Status: metav1.ConditionTrue,
6678+
Reason: FullyScheduledReason,
6679+
Message: fmt.Sprintf(fullyScheduledMessage, 1),
6680+
},
6681+
},
6682+
ObservedCRPGeneration: 0,
6683+
ClusterDecisions: []placementv1beta1.ClusterDecision{
6684+
{
6685+
ClusterName: fmt.Sprintf(clusterNameTemplate, 1),
6686+
Reason: wantClusterUnschedulableReason,
6687+
},
6688+
{
6689+
ClusterName: fmt.Sprintf(clusterNameTemplate, 2),
6690+
Reason: wantClusterUnschedulableReason,
6691+
},
6692+
{
6693+
ClusterName: fmt.Sprintf(clusterNameTemplate, 3),
6694+
Reason: wantClusterUnschedulableReason,
6695+
},
6696+
},
6697+
},
6698+
}
6699+
6700+
// Simulate 100 consecutive scheduling cycles.
6701+
for i := 0; i < 100; i++ {
6702+
_, err := f.runSchedulingCycleForPickAllPlacementType(ctx, state, placementKey, policy, clusters, nil, nil, nil, nil)
6703+
if err != nil {
6704+
t.Fatalf("runSchedulingCycleForPickAllPlacementType() = %v, want no error", err)
6705+
}
6706+
}
6707+
6708+
// Check if any status update was attempted; all should be skipped as there is no status change.
6709+
if mockClientStatusUpdateCount.Load() != 0 {
6710+
t.Errorf("runSchedulingCycleForPickAllPlacementType() status update attempt count = %d, want 0", mockClientStatusUpdateCount.Load())
6711+
}
6712+
}

pkg/utils/controller/binding_resolver.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ package controller
1818

1919
import (
2020
"context"
21+
"sort"
2122

2223
"k8s.io/apimachinery/pkg/types"
2324
"sigs.k8s.io/controller-runtime/pkg/client"
@@ -67,7 +68,19 @@ func ListBindingsFromKey(ctx context.Context, c client.Reader, placementKey type
6768
return nil, NewAPIServerError(false, err)
6869
}
6970

70-
return bindingList.GetBindingObjs(), nil
71+
bindingObjs := bindingList.GetBindingObjs()
72+
73+
// Sort the list of bindings.
74+
//
75+
// This is needed to ensure deterministic decision output from the scheduler.
76+
sort.Slice(bindingObjs, func(i, j int) bool {
77+
A, B := bindingObjs[i], bindingObjs[j]
78+
// Sort the bindings only by their names; for ClusterResourceBindings, their namespaces are always empty;
79+
// for ResourceBindings, in this case they all come from the same namespace.
80+
return A.GetName() < B.GetName()
81+
})
82+
83+
return bindingObjs, nil
7184
}
7285

7386
// ConvertCRBObjsToBindingObjs converts a slice of ClusterResourceBinding items to BindingObj array.

pkg/utils/controller/binding_resolver_test.go

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,13 @@ import (
2020
"context"
2121
"errors"
2222
"fmt"
23+
"sync/atomic"
2324
"testing"
2425

26+
crossplanetest "github.com/crossplane/crossplane-runtime/v2/pkg/test"
2527
"github.com/google/go-cmp/cmp"
2628
"github.com/google/go-cmp/cmp/cmpopts"
29+
"k8s.io/apimachinery/pkg/api/meta"
2730
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
2831
"k8s.io/apimachinery/pkg/runtime"
2932
"k8s.io/apimachinery/pkg/types"
@@ -33,6 +36,12 @@ import (
3336
placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
3437
)
3538

39+
const (
40+
bindingName1 = "binding-1"
41+
bindingName2 = "binding-2"
42+
bindingName3 = "binding-3"
43+
)
44+
3645
func TestListBindingsFromKey(t *testing.T) {
3746
ctx := context.Background()
3847

@@ -428,6 +437,83 @@ func TestListBindingsFromKey(t *testing.T) {
428437
}
429438
}
430439

440+
// TestListBindingsFromKey_Sorted verifies that the returned bindings are always sorted by their names.
441+
func TestListBindingsFromKey_Sorted(t *testing.T) {
442+
ctx := context.Background()
443+
444+
// Set a mode variable to control the behavior of list ops.
445+
mockMode := atomic.Int32{}
446+
mockMode.Store(0)
447+
448+
// Use the mock client from the crossplane package rather than the commonly used fake.Client to
449+
// better manipulate the list op results.
450+
mockClient := crossplanetest.MockClient{
451+
MockList: func(ctx context.Context, list client.ObjectList, opts ...client.ListOption) error {
452+
mode := mockMode.Load()
453+
switch mode {
454+
case 0:
455+
if err := meta.SetList(list, []runtime.Object{
456+
&placementv1beta1.ClusterResourceBinding{
457+
ObjectMeta: metav1.ObjectMeta{
458+
Name: bindingName1,
459+
},
460+
},
461+
&placementv1beta1.ClusterResourceBinding{
462+
ObjectMeta: metav1.ObjectMeta{
463+
Name: bindingName2,
464+
},
465+
},
466+
&placementv1beta1.ClusterResourceBinding{
467+
ObjectMeta: metav1.ObjectMeta{
468+
Name: bindingName3,
469+
},
470+
},
471+
}); err != nil {
472+
return fmt.Errorf("cannot set list results: %w", err)
473+
}
474+
case 1:
475+
if err := meta.SetList(list, []runtime.Object{
476+
&placementv1beta1.ClusterResourceBinding{
477+
ObjectMeta: metav1.ObjectMeta{
478+
Name: bindingName3,
479+
},
480+
},
481+
&placementv1beta1.ClusterResourceBinding{
482+
ObjectMeta: metav1.ObjectMeta{
483+
Name: bindingName2,
484+
},
485+
},
486+
&placementv1beta1.ClusterResourceBinding{
487+
ObjectMeta: metav1.ObjectMeta{
488+
Name: bindingName1,
489+
},
490+
},
491+
}); err != nil {
492+
return fmt.Errorf("cannot set list results: %w", err)
493+
}
494+
default:
495+
return fmt.Errorf("unexpected mock mode: %d", mode)
496+
}
497+
return nil
498+
},
499+
}
500+
501+
bindingsInMode0, err := ListBindingsFromKey(ctx, &mockClient, types.NamespacedName{Name: "placeholder"})
502+
if err != nil {
503+
t.Fatalf("ListBindingsFromKey() in mode 0 returned error: %v", err)
504+
}
505+
506+
mockMode.Store(1)
507+
bindingsInMode1, err := ListBindingsFromKey(ctx, &mockClient, types.NamespacedName{Name: "placeholder"})
508+
if err != nil {
509+
t.Fatalf("ListBindingsFromKey() in mode 1 returned error: %v", err)
510+
}
511+
512+
if diff := cmp.Diff(bindingsInMode0, bindingsInMode1); diff != "" {
513+
t.Errorf("ListBindingsFromKey() returned different results in different modes (-mode0, +mode1):\n%s", diff)
514+
}
515+
}
516+
431517
func TestListBindingsFromKey_ClientError(t *testing.T) {
432518
ctx := context.Background()
433519

0 commit comments

Comments
 (0)