Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apis/placement/v1beta1/stageupdate_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ const (
// Its condition status can be one of the following:
// - "True": The staged update run is making progress.
// - "False": The staged update run is waiting/paused/abandoned.
// - "Unknown" means it is unknown.
// - "Unknown": The staged update run is in a transitioning state.
StagedUpdateRunConditionProgressing StagedUpdateRunConditionType = "Progressing"

// StagedUpdateRunConditionSucceeded indicates whether the staged update run is completed successfully.
Expand Down Expand Up @@ -489,7 +489,8 @@ const (
// StageUpdatingConditionProgressing indicates whether the stage updating is making progress.
// Its condition status can be one of the following:
// - "True": The stage updating is making progress.
// - "False": The stage updating is waiting/pausing.
// - "False": The stage updating is waiting.
// - "Unknown": The staged updating is a transitioning state.
StageUpdatingConditionProgressing StageUpdatingConditionType = "Progressing"

// StageUpdatingConditionSucceeded indicates whether the stage updating is completed successfully.
Expand Down
46 changes: 31 additions & 15 deletions pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
klog.V(2).InfoS("The updateRun is initialized but not executed, waiting to execute", "state", state, "updateRun", runObjRef)
case placementv1beta1.StateRun:
// Execute the updateRun.
klog.InfoS("Continue to execute the updateRun", "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
klog.V(2).InfoS("Continue to execute the updateRun", "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if errors.Is(execErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
Expand All @@ -176,23 +176,23 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun)
}

// The execution is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}
klog.V(2).InfoS("The updateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef)
if execErr != nil {
return runtime.Result{}, execErr
}
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
return r.handleIncompleteUpdateRun(ctx, updateRun, waitTime, execErr, state, runObjRef)
case placementv1beta1.StateStop:
// Stop the updateRun.
klog.InfoS("Stopping the updateRun", "state", state, "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
// TODO(britaniar): Implement the stopping logic for in-progress stages.
klog.V(2).InfoS("Stopping the updateRun", "state", state, "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
finished, waitTime, stopErr := r.stop(updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if errors.Is(stopErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, stopErr.Error())
}

if finished {
klog.V(2).InfoS("The updateRun is stopped", "updateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunStopped(ctx, updateRun)
}

return r.handleIncompleteUpdateRun(ctx, updateRun, waitTime, stopErr, state, runObjRef)

klog.V(2).InfoS("The updateRun is stopped", "updateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunStopped(ctx, updateRun)
default:
// Initialize, Run, or Stop are the only supported states.
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("found unsupported updateRun state: %s", state))
Expand All @@ -202,6 +202,22 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
return runtime.Result{}, nil
}

func (r *Reconciler) handleIncompleteUpdateRun(ctx context.Context, updateRun placementv1beta1.UpdateRunObj, waitTime time.Duration, err error, state placementv1beta1.State, runObjRef klog.ObjectRef) (runtime.Result, error) {
// The execution or stopping is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}

klog.V(2).InfoS("The updateRun is not finished yet", "state", state, "requeueWaitTime", waitTime, "err", err, "updateRun", runObjRef)

// Return execution or stopping retriable error if any.
if err != nil {
return runtime.Result{}, err
}
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
}

// handleDelete handles the deletion of the updateRun object.
// We delete all the dependent resources, including approvalRequest objects, of the updateRun object.
func (r *Reconciler) handleDelete(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) (bool, time.Duration, error) {
Expand Down
19 changes: 19 additions & 0 deletions pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,16 @@ func generateFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *p
}
}

func generateStoppingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
string(metav1.ConditionUnknown), condition.UpdateRunStoppingReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateStoppedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
Expand Down Expand Up @@ -858,3 +868,12 @@ func generateFalseConditionWithReason(obj client.Object, condType any, reason st
falseCond.Reason = reason
return falseCond
}

func generateProgressingUnknownConditionWithReason(obj client.Object, reason string) metav1.Condition {
return metav1.Condition{
Status: metav1.ConditionUnknown,
Type: string(placementv1beta1.StageUpdatingConditionProgressing),
ObservedGeneration: obj.GetGeneration(),
Reason: reason,
}
}
22 changes: 6 additions & 16 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package updaterun

import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -68,14 +67,7 @@ func (r *Reconciler) execute(

// Set up defer function to handle errStagedUpdatedAborted.
defer func() {
if errors.Is(err, errStagedUpdatedAborted) {
if updatingStageStatus != nil {
markStageUpdatingFailed(updatingStageStatus, updateRun.GetGeneration(), err.Error())
} else {
// Handle deletion stage case.
markStageUpdatingFailed(updateRunStatus.DeletionStageStatus, updateRun.GetGeneration(), err.Error())
}
}
checkIfErrorStagedUpdateAborted(err, updateRun, updatingStageStatus)
}()

// Mark updateRun as progressing if it's not already marked as waiting or stuck.
Expand Down Expand Up @@ -232,9 +224,7 @@ func (r *Reconciler) executeUpdatingStage(
}
}
markClusterUpdatingStarted(clusterStatus, updateRun.GetGeneration())
if finishedClusterCount == 0 {
markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration())
}
markStageUpdatingProgressStarted(updatingStageStatus, updateRun.GetGeneration())
// Need to continue as we need to process at most maxConcurrency number of clusters in parallel.
continue
}
Expand Down Expand Up @@ -338,7 +328,7 @@ func (r *Reconciler) executeDeleteStage(
existingDeleteStageClusterMap[existingDeleteStageStatus.Clusters[i].ClusterName] = &existingDeleteStageStatus.Clusters[i]
}
// Mark the delete stage as started in case it's not.
markStageUpdatingStarted(updateRunStatus.DeletionStageStatus, updateRun.GetGeneration())
markStageUpdatingProgressStarted(updateRunStatus.DeletionStageStatus, updateRun.GetGeneration())
for _, binding := range toBeDeletedBindings {
bindingSpec := binding.GetBindingSpec()
curCluster, exist := existingDeleteStageClusterMap[bindingSpec.TargetCluster]
Expand Down Expand Up @@ -564,7 +554,7 @@ func calculateMaxConcurrencyValue(status *placementv1beta1.UpdateRunStatus, stag
func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string) {
if len(stuckClusterNames) > 0 {
markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", "))
} else {
} else if updateRun.GetUpdateRunSpec().State == placementv1beta1.StateRun {
// If there is no stuck cluster but some progress has been made, mark the update run as progressing.
markUpdateRunProgressing(updateRun)
}
Expand Down Expand Up @@ -708,8 +698,8 @@ func markUpdateRunWaiting(updateRun placementv1beta1.UpdateRunObj, message strin
})
}

// markStageUpdatingStarted marks the stage updating status as started in memory.
func markStageUpdatingStarted(stageUpdatingStatus *placementv1beta1.StageUpdatingStatus, generation int64) {
// markStageUpdatingProgressStarted marks the stage updating status as started in memory.
func markStageUpdatingProgressStarted(stageUpdatingStatus *placementv1beta1.StageUpdatingStatus, generation int64) {
if stageUpdatingStatus.StartTime == nil {
stageUpdatingStatus.StartTime = &metav1.Time{Time: time.Now()}
}
Expand Down
Loading
Loading