Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions cmd/memberagent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
klog.ErrorS(err, "unable to find the required CRD", "GVK", gvk)
return err
}
// create the work controller, so we can pass it to the internal member cluster reconciler
// Set up the work applier. Note that it is referenced by the InternalMemberCluster controller.

// Set up the requeue rate limiter for the work applier.
//
Expand Down Expand Up @@ -413,7 +413,8 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
*workApplierRequeueRateLimiterSkipToFastBackoffForAvailableOrDiffReportedWorkObjs,
)

workController := workapplier.NewReconciler(
workObjAgeForPrioritizedProcessing := time.Minute * time.Duration(*watchWorkReconcileAgeMinutes)
workApplier := workapplier.NewReconciler(
hubMgr.GetClient(),
targetNS,
spokeDynamicClient,
Expand All @@ -426,12 +427,12 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
// Use the default worker count (4) for parallelized manifest processing.
parallelizer.NewParallelizer(parallelizer.DefaultNumOfWorkers),
time.Minute*time.Duration(*deletionWaitTime),
*watchWorkWithPriorityQueue,
*watchWorkReconcileAgeMinutes,
requeueRateLimiter,
*watchWorkWithPriorityQueue,
workObjAgeForPrioritizedProcessing,
)

if err = workController.SetupWithManager(hubMgr); err != nil {
if err = workApplier.SetupWithManager(hubMgr); err != nil {
klog.ErrorS(err, "Failed to create v1beta1 controller", "controller", "work")
return err
}
Expand Down Expand Up @@ -459,7 +460,7 @@ func Start(ctx context.Context, hubCfg, memberConfig *rest.Config, hubOpts, memb
ctx,
hubMgr.GetClient(),
memberMgr.GetConfig(), memberMgr.GetClient(),
workController,
workApplier,
pp)
if err != nil {
klog.ErrorS(err, "Failed to create InternalMemberCluster v1beta1 reconciler")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ var _ = BeforeSuite(func() {

// This controller is created for testing purposes only; no reconciliation loop is actually
// run.
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
workApplier1 = workapplier.NewReconciler(hubClient, member1ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, 0)

propertyProvider1 = &manuallyUpdatedProvider{}
member1Reconciler, err := NewReconciler(ctx, hubClient, member1Cfg, member1Client, workApplier1, propertyProvider1)
Expand All @@ -402,7 +402,7 @@ var _ = BeforeSuite(func() {

// This controller is created for testing purposes only; no reconciliation loop is actually
// run.
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, false, 60, nil)
workApplier2 = workapplier.NewReconciler(hubClient, member2ReservedNSName, nil, nil, nil, nil, 0, nil, time.Minute, nil, false, 0)

member2Reconciler, err := NewReconciler(ctx, hubClient, member2Cfg, member2Client, workApplier2, nil)
Expect(err).NotTo(HaveOccurred())
Expand Down
227 changes: 67 additions & 160 deletions pkg/controllers/workapplier/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package workapplier
import (
"context"
"fmt"
"sync"
"time"

"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
Expand All @@ -41,149 +41,30 @@ import (
ctrloption "sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/controller/priorityqueue"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

fleetv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
"github.com/kubefleet-dev/kubefleet/pkg/utils/condition"
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
"github.com/kubefleet-dev/kubefleet/pkg/utils/defaulter"
parallelizerutil "github.com/kubefleet-dev/kubefleet/pkg/utils/parallelizer"
)

const (
patchDetailPerObjLimit = 100

minWorkObjAgeForPrioritizedQueueing = time.Minute * 30
)

const (
workFieldManagerName = "work-api-agent"
)

var (
workAgeToReconcile = 1 * time.Hour
)

// Custom type to hold a reconcile.Request and a priority value
type priorityQueueItem struct {
reconcile.Request
Priority int
}

// PriorityQueueEventHandler is a custom event handler for adding objects to the priority queue.
type PriorityQueueEventHandler struct {
Queue priorityqueue.PriorityQueue[priorityQueueItem] // The priority queue to manage events
Client client.Client // store the client to make API calls
}

// Implement priorityqueue.Item interface for priorityQueueItem
func (i priorityQueueItem) GetPriority() int {
return i.Priority
}

func (h *PriorityQueueEventHandler) WorkPendingApply(ctx context.Context, obj client.Object) bool {
var work fleetv1beta1.Work
ns := obj.GetNamespace()
name := obj.GetName()
err := h.Client.Get(ctx, client.ObjectKey{
Namespace: ns,
Name: name,
}, &work)
if err != nil {
// Log and return
klog.ErrorS(err, "Failed to get the work", "name", name, "ns", ns)
return true
}
availCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeAvailable)
appliedCond := meta.FindStatusCondition(work.Status.Conditions, fleetv1beta1.WorkConditionTypeApplied)

if availCond != nil && appliedCond != nil {
// check if the object has been recently modified
availCondLastUpdatedTime := availCond.LastTransitionTime.Time
appliedCondLastUpdatedTime := appliedCond.LastTransitionTime.Time
if time.Since(availCondLastUpdatedTime) < workAgeToReconcile || time.Since(appliedCondLastUpdatedTime) < workAgeToReconcile {
return true
}
}

if condition.IsConditionStatusTrue(availCond, work.GetGeneration()) &&
condition.IsConditionStatusTrue(appliedCond, work.GetGeneration()) {
return false
}

// Work not yet applied
return true
}

func (h *PriorityQueueEventHandler) AddToPriorityQueue(ctx context.Context, obj client.Object, alwaysAdd bool) {
req := reconcile.Request{
NamespacedName: types.NamespacedName{
Namespace: obj.GetNamespace(),
Name: obj.GetName(),
},
}

objAge := time.Since(obj.GetCreationTimestamp().Time)

var objPriority int
if alwaysAdd || objAge < workAgeToReconcile || h.WorkPendingApply(ctx, obj) {
// Newer or pending objects get higher priority
// Negate the Unix timestamp to give higher priority to newer timestamps
objPriority = -int(time.Now().Unix())
} else {
// skip adding older objects with no changes
klog.V(2).InfoS("adding old item to priorityQueueItem", "obj", req.Name, "age", objAge)
objPriority = int(obj.GetCreationTimestamp().Unix())
}

// Create the custom priorityQueueItem with the request and priority
item := priorityQueueItem{
Request: req,
Priority: objPriority,
}

h.Queue.Add(item)
klog.V(2).InfoS("Created PriorityQueueItem", "priority", objPriority, "obj", req.Name, "queue size", h.Queue.Len())
}

func (h *PriorityQueueEventHandler) Create(ctx context.Context, evt event.TypedCreateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.AddToPriorityQueue(ctx, evt.Object, false)
}

func (h *PriorityQueueEventHandler) Delete(ctx context.Context, evt event.TypedDeleteEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.AddToPriorityQueue(ctx, evt.Object, true)
}

func (h *PriorityQueueEventHandler) Update(ctx context.Context, evt event.TypedUpdateEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
// Ignore updates where only the status changed
oldObj := evt.ObjectOld.DeepCopyObject()
newObj := evt.ObjectNew.DeepCopyObject()

// Zero out the status
if oldWork, ok := oldObj.(*fleetv1beta1.Work); ok {
oldWork.Status = fleetv1beta1.WorkStatus{}
}
if newWork, ok := newObj.(*fleetv1beta1.Work); ok {
newWork.Status = fleetv1beta1.WorkStatus{}
}

if !equality.Semantic.DeepEqual(oldObj, newObj) {
// ignore status changes to prevent noise
h.AddToPriorityQueue(ctx, evt.ObjectNew, true)
return
}
klog.V(4).InfoS("ignoring update event with only status change", "work", evt.ObjectNew.GetName())
}

func (h *PriorityQueueEventHandler) Generic(ctx context.Context, evt event.TypedGenericEvent[client.Object], queue workqueue.TypedRateLimitingInterface[reconcile.Request]) {
h.AddToPriorityQueue(ctx, evt.Object, false)
}

var defaultRequeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter = NewRequeueMultiStageWithExponentialBackoffRateLimiter(
// Allow 1 attempt of fixed delay; this helps give objects a bit of headroom to get available (or have
// diffs reported).
1,
// Use a fixed delay of 5 seconds for the first two attempts.
// Use a fixed delay of 5 seconds for the first attempt.
//
// Important (chenyu1): before the introduction of the requeue rate limiter, the work
// applier uses static requeue intervals, specifically 5 seconds (if the work object is unavailable),
Expand Down Expand Up @@ -216,19 +97,24 @@ var defaultRequeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimite

// Reconciler reconciles a Work object.
type Reconciler struct {
hubClient client.Client
workNameSpace string
spokeDynamicClient dynamic.Interface
spokeClient client.Client
restMapper meta.RESTMapper
recorder record.EventRecorder
concurrentReconciles int
watchWorkWithPriorityQueue bool
watchWorkReconcileAgeMinutes int
deletionWaitTime time.Duration
joined *atomic.Bool
parallelizer parallelizerutil.Parallelizer
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter
hubClient client.Client
workNameSpace string
spokeDynamicClient dynamic.Interface
spokeClient client.Client
restMapper meta.RESTMapper
recorder record.EventRecorder
concurrentReconciles int
deletionWaitTime time.Duration
joined *atomic.Bool
parallelizer parallelizerutil.Parallelizer
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter
usePriorityQueue bool
workObjAgeForPrioritizedProcessing time.Duration
// The custom priority queue in use if the option watchWorkWithPriorityQueue is enabled.
//
// Note that this variable is set only after the controller starts.
pq priorityqueue.PriorityQueue[reconcile.Request]
pqSetupOnce sync.Once
}

// NewReconciler returns a new Work object reconciler for the work applier.
Expand All @@ -239,9 +125,9 @@ func NewReconciler(
concurrentReconciles int,
parallelizer parallelizerutil.Parallelizer,
deletionWaitTime time.Duration,
watchWorkWithPriorityQueue bool,
watchWorkReconcileAgeMinutes int,
requeueRateLimiter *RequeueMultiStageWithExponentialBackoffRateLimiter,
usePriorityQueue bool,
workObjAgeForPrioritizedProcessing time.Duration,
) *Reconciler {
if requeueRateLimiter == nil {
klog.V(2).InfoS("requeue rate limiter is not set; using the default rate limiter")
Expand All @@ -252,23 +138,37 @@ func NewReconciler(
parallelizer = parallelizerutil.NewParallelizer(1)
}

woAgeForPrioritizedProcessing := workObjAgeForPrioritizedProcessing
if usePriorityQueue && woAgeForPrioritizedProcessing < minWorkObjAgeForPrioritizedQueueing {
klog.V(2).InfoS("Work object age for prioritized processing is too short; set to the longer default", "workObjAgeForPrioritizedProcessing", woAgeForPrioritizedProcessing)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you comment on why 30-minute is a recommended minimum?

Also suggest just crash the controller if the user input is invalid. Users might not read logs and expect whatever they specify as workObjAgeForPrioritizedProcessing will be respected by the controller

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and this is during startup, which is a good time to fail fast

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, the current value in use in the main branch is 1-hour, which was set by the user who originally proposed priority queues, as aforementioned (this PR does not change this default value).

The value has to strike a balance between responsiveness and fairness; if set too high, all requests will be assigned high priority, which defeats the purpose of having priority queues (this mostly applies to those users that use KubeFleet to run batch work), and if set too low new work objects might not get its fair chance to become ready.

The proper value is kind of user-dependent, which is why in this PR I am only blocking this value from being set too low.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With this being said, admitted we do not have enough empirical data on what a good default value really is at the moment.

And for the check part, I do not have a strong preference on the subject (and I 100% agree that many do not actually read logs 😂). This is currently following the pattern with the work applier backoff rate limiter (which, too, will overwrite invalid values with defaults and emit an error log).

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I do find it strange that if a user provides an invalid input, we just set it to a closest valid value rather than tell the user that your input is invalid please try again with a valid value because this gives the user the impression that their input is perfectly fine.

But I wouldn't block the PR if the team agree that this is best practice

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wei has a point, we normally should not silently change a user input.

woAgeForPrioritizedProcessing = minWorkObjAgeForPrioritizedQueueing
}

return &Reconciler{
hubClient: hubClient,
spokeDynamicClient: spokeDynamicClient,
spokeClient: spokeClient,
restMapper: restMapper,
recorder: recorder,
concurrentReconciles: concurrentReconciles,
parallelizer: parallelizer,
watchWorkWithPriorityQueue: watchWorkWithPriorityQueue,
watchWorkReconcileAgeMinutes: watchWorkReconcileAgeMinutes,
workNameSpace: workNameSpace,
joined: atomic.NewBool(false),
deletionWaitTime: deletionWaitTime,
requeueRateLimiter: requeueRateLimiter,
hubClient: hubClient,
spokeDynamicClient: spokeDynamicClient,
spokeClient: spokeClient,
restMapper: restMapper,
recorder: recorder,
concurrentReconciles: concurrentReconciles,
parallelizer: parallelizer,
workNameSpace: workNameSpace,
joined: atomic.NewBool(false),
deletionWaitTime: deletionWaitTime,
requeueRateLimiter: requeueRateLimiter,
usePriorityQueue: usePriorityQueue,
workObjAgeForPrioritizedProcessing: woAgeForPrioritizedProcessing,
}
}

// PriorityQueue returns the priority queue (if any) in use by the reconciler.
//
// Note that the priority queue is only set after the reconciler starts (i.e., the work applier
// has been set up with the controller manager).
func (r *Reconciler) PriorityQueue() priorityqueue.PriorityQueue[reconcile.Request] {
return r.pq
}

type ManifestProcessingApplyOrReportDiffResultType string

const (
Expand Down Expand Up @@ -728,22 +628,29 @@ func (r *Reconciler) Leave(ctx context.Context) error {

// SetupWithManager wires up the controller.
func (r *Reconciler) SetupWithManager(mgr ctrl.Manager) error {
// Create the priority queue using the rate limiter and a queue name
queue := priorityqueue.New[priorityQueueItem]("apply-work-queue")
if r.usePriorityQueue {
eventHandler := &priorityBasedWorkObjEventHandler{
qm: r,
workObjAgeForPrioritizedProcessing: r.workObjAgeForPrioritizedProcessing,
}

// Create the event handler that uses the priority queue
eventHandler := &PriorityQueueEventHandler{
Queue: queue, // Attach the priority queue to the event handler
Client: r.hubClient,
}
newPQ := func(controllerName string, rateLimiter workqueue.TypedRateLimiter[reconcile.Request]) workqueue.TypedRateLimitingInterface[reconcile.Request] {
withRateLimiterOpt := func(opts *priorityqueue.Opts[reconcile.Request]) {
opts.RateLimiter = rateLimiter
}
r.pqSetupOnce.Do(func() {
r.pq = priorityqueue.New(controllerName, withRateLimiterOpt)
})
return r.pq
}

if r.watchWorkWithPriorityQueue {
workAgeToReconcile = time.Duration(r.watchWorkReconcileAgeMinutes) * time.Minute
return ctrl.NewControllerManagedBy(mgr).Named("work-applier-controller").
WithOptions(ctrloption.Options{
MaxConcurrentReconciles: r.concurrentReconciles,
NewQueue: newPQ,
}).
For(&fleetv1beta1.Work{}).
// Use custom event handler to allow access to the priority queue interface.
Watches(&fleetv1beta1.Work{}, eventHandler).
Complete(r)
}
Expand Down
Loading
Loading