Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions apis/placement/v1beta1/stageupdate_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,7 @@ const (
// Its condition status can be one of the following:
// - "True": The staged update run is making progress.
// - "False": The staged update run is waiting/paused/abandoned.
// - "Unknown" means it is unknown.
// - "Unknown": The staged update run is in the process of stopping.
StagedUpdateRunConditionProgressing StagedUpdateRunConditionType = "Progressing"

// StagedUpdateRunConditionSucceeded indicates whether the staged update run is completed successfully.
Expand Down Expand Up @@ -489,7 +489,8 @@ const (
// StageUpdatingConditionProgressing indicates whether the stage updating is making progress.
// Its condition status can be one of the following:
// - "True": The stage updating is making progress.
// - "False": The stage updating is waiting/pausing.
// - "False": The stage updating is waiting.
// - "Unknown": The staged update run is in the process of stopping.
StageUpdatingConditionProgressing StageUpdatingConditionType = "Progressing"

// StageUpdatingConditionSucceeded indicates whether the stage updating is completed successfully.
Expand Down
45 changes: 30 additions & 15 deletions pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
klog.V(2).InfoS("The updateRun is initialized but not executed, waiting to execute", "state", state, "updateRun", runObjRef)
case placementv1beta1.StateRun:
// Execute the updateRun.
klog.InfoS("Continue to execute the updateRun", "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
klog.V(2).InfoS("Continue to execute the updateRun", "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if errors.Is(execErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
Expand All @@ -176,23 +176,23 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun)
}

// The execution is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}
klog.V(2).InfoS("The updateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef)
if execErr != nil {
return runtime.Result{}, execErr
}
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
return r.handleIncompleteUpdateRun(ctx, updateRun, waitTime, execErr, state, runObjRef)
case placementv1beta1.StateStop:
// Stop the updateRun.
klog.InfoS("Stopping the updateRun", "state", state, "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
// TODO(britaniar): Implement the stopping logic for in-progress stages.
klog.V(2).InfoS("Stopping the updateRun", "state", state, "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
finished, waitTime, stopErr := r.stop(updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if errors.Is(stopErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, stopErr.Error())
}

if finished {
klog.V(2).InfoS("The updateRun is stopped", "updateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunStopped(ctx, updateRun)
}

return r.handleIncompleteUpdateRun(ctx, updateRun, waitTime, stopErr, state, runObjRef)

klog.V(2).InfoS("The updateRun is stopped", "updateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunStopped(ctx, updateRun)
default:
// Initialize, Run, or Stop are the only supported states.
unexpectedErr := controller.NewUnexpectedBehaviorError(fmt.Errorf("found unsupported updateRun state: %s", state))
Expand All @@ -202,6 +202,21 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
return runtime.Result{}, nil
}

func (r *Reconciler) handleIncompleteUpdateRun(ctx context.Context, updateRun placementv1beta1.UpdateRunObj, waitTime time.Duration, err error, state placementv1beta1.State, runObjRef klog.ObjectRef) (runtime.Result, error) {
// The execution or stopping is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}

klog.V(2).InfoS("The updateRun is not finished yet", "state", state, "requeueWaitTime", waitTime, "err", err, "updateRun", runObjRef)

if err != nil {
return runtime.Result{}, err
}
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
}

// handleDelete handles the deletion of the updateRun object.
// We delete all the dependent resources, including approvalRequest objects, of the updateRun object.
func (r *Reconciler) handleDelete(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) (bool, time.Duration, error) {
Expand Down
19 changes: 19 additions & 0 deletions pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,16 @@ func generateFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *p
}
}

func generateStoppingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
string(metav1.ConditionUnknown), condition.UpdateRunStoppingReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateStoppedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
Expand Down Expand Up @@ -858,3 +868,12 @@ func generateFalseConditionWithReason(obj client.Object, condType any, reason st
falseCond.Reason = reason
return falseCond
}

func generateProgressingUnknownConditionWithReason(obj client.Object, reason string) metav1.Condition {
return metav1.Condition{
Status: metav1.ConditionUnknown,
Type: string(placementv1beta1.StageUpdatingConditionProgressing),
ObservedGeneration: obj.GetGeneration(),
Reason: reason,
}
}
17 changes: 5 additions & 12 deletions pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package updaterun

import (
"context"
"errors"
"fmt"
"reflect"
"strconv"
Expand Down Expand Up @@ -68,14 +67,7 @@ func (r *Reconciler) execute(

// Set up defer function to handle errStagedUpdatedAborted.
defer func() {
if errors.Is(err, errStagedUpdatedAborted) {
if updatingStageStatus != nil {
markStageUpdatingFailed(updatingStageStatus, updateRun.GetGeneration(), err.Error())
} else {
// Handle deletion stage case.
markStageUpdatingFailed(updateRunStatus.DeletionStageStatus, updateRun.GetGeneration(), err.Error())
}
}
checkIfErrorStagedUpdateAborted(err, updateRun, updatingStageStatus)
}()

// Mark updateRun as progressing if it's not already marked as waiting or stuck.
Expand Down Expand Up @@ -232,9 +224,7 @@ func (r *Reconciler) executeUpdatingStage(
}
}
markClusterUpdatingStarted(clusterStatus, updateRun.GetGeneration())
if finishedClusterCount == 0 {
markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration())
}
markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration())
// Need to continue as we need to process at most maxConcurrency number of clusters in parallel.
continue
}
Expand Down Expand Up @@ -564,6 +554,9 @@ func calculateMaxConcurrencyValue(status *placementv1beta1.UpdateRunStatus, stag
func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string) {
if len(stuckClusterNames) > 0 {
markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", "))
} else if updateRun.GetUpdateRunSpec().State == placementv1beta1.StateStop {
// If there is no stuck cluster and the update run state is stop, mark the update run as stopping.
markUpdateRunStopping(updateRun)
} else {
// If there is no stuck cluster but some progress has been made, mark the update run as progressing.
markUpdateRunProgressing(updateRun)
Expand Down
Loading
Loading