Skip to content

Commit 878fa65

Browse files
committed
stopping update run implementation
Signed-off-by: Britania Rodriguez Reyes <britaniar@microsoft.com>
1 parent 0552f2e commit 878fa65

File tree

9 files changed

+1131
-46
lines changed

9 files changed

+1131
-46
lines changed

apis/placement/v1beta1/stageupdate_types.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -489,7 +489,8 @@ const (
489489
// StageUpdatingConditionProgressing indicates whether the stage updating is making progress.
490490
// Its condition status can be one of the following:
491491
// - "True": The stage updating is making progress.
492-
// - "False": The stage updating is waiting/pausing.
492+
// - "False": The stage updating is waiting.
493+
// - "Unknown" means it is unknown.
493494
StageUpdatingConditionProgressing StageUpdatingConditionType = "Progressing"
494495

495496
// StageUpdatingConditionSucceeded indicates whether the stage updating is completed successfully.

pkg/controllers/updaterun/controller.go

Lines changed: 114 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
7777
klog.ErrorS(err, "Failed to get updateRun object", "updateRun", req.NamespacedName)
7878
return runtime.Result{}, client.IgnoreNotFound(err)
7979
}
80+
81+
// Update all existing conditions' ObservedGeneration to the current generation
82+
updateAllStatusConditionsGeneration(updateRun.GetUpdateRunStatus(), updateRun.GetGeneration())
83+
8084
runObjRef := klog.KObj(updateRun)
8185

8286
// Remove waitTime from the updateRun status for BeforeStageTask and AfterStageTask for type Approval.
@@ -110,12 +114,9 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
110114
var toBeUpdatedBindings, toBeDeletedBindings []placementv1beta1.BindingObj
111115
updateRunStatus := updateRun.GetUpdateRunStatus()
112116
initCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
113-
// Check if initialized regardless of generation.
114-
// The updateRun spec fields are immutable except for the state field. When the state changes,
115-
// the update run generation increments, but we don't need to reinitialize since initialization is a one-time setup.
116-
if !(initCond != nil && initCond.Status == metav1.ConditionTrue) {
117+
if !condition.IsConditionStatusTrue(initCond, updateRun.GetGeneration()) {
117118
// Check if initialization failed for the current generation.
118-
if initCond != nil && initCond.Status == metav1.ConditionFalse {
119+
if condition.IsConditionStatusFalse(initCond, updateRun.GetGeneration()) {
119120
klog.V(2).InfoS("The updateRun has failed to initialize", "errorMsg", initCond.Message, "updateRun", runObjRef)
120121
return runtime.Result{}, nil
121122
}
@@ -158,9 +159,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
158159
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun)
159160
}
160161

161-
// Execute the updateRun.
162-
if state == placementv1beta1.StateRun {
163-
klog.V(2).InfoS("Continue to execute the updateRun", "state", state, "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
162+
switch state {
163+
case placementv1beta1.StateInitialize:
164+
klog.V(2).InfoS("The updateRun is initialized but not executed, waiting to execute", "state", state, "updateRun", runObjRef)
165+
case placementv1beta1.StateRun:
166+
// Execute the updateRun.
167+
klog.InfoS("Continue to execute the updateRun", "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
164168
finished, waitTime, execErr := r.execute(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
165169
if errors.Is(execErr, errStagedUpdatedAborted) {
166170
// errStagedUpdatedAborted cannot be retried.
@@ -172,21 +176,41 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
172176
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun)
173177
}
174178

175-
// The execution is not finished yet or it encounters a retriable error.
176-
// We need to record the status and requeue.
177-
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
178-
return runtime.Result{}, updateErr
179+
return r.handleIncompleteUpdateRun(ctx, updateRun, waitTime, execErr, state, runObjRef)
180+
case placementv1beta1.StateStop:
181+
// Stop the updateRun.
182+
klog.V(2).InfoS("Stopping the updateRun", "state", state, "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
183+
finished, waitTime, stopErr := r.stop(updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
184+
if errors.Is(stopErr, errStagedUpdatedAborted) {
185+
// errStagedUpdatedAborted cannot be retried.
186+
return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, stopErr.Error())
179187
}
180-
klog.V(2).InfoS("The updateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef)
181-
if execErr != nil {
182-
return runtime.Result{}, execErr
188+
189+
if finished {
190+
klog.V(2).InfoS("The updateRun is stopped", "updateRun", runObjRef)
191+
return runtime.Result{}, r.recordUpdateRunStopped(ctx, updateRun)
183192
}
184-
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
193+
194+
return r.handleIncompleteUpdateRun(ctx, updateRun, waitTime, stopErr, state, runObjRef)
185195
}
186-
klog.V(2).InfoS("The updateRun is initialized but not executed, waiting to execute", "state", state, "updateRun", runObjRef)
187196
return runtime.Result{}, nil
188197
}
189198

199+
func (r *Reconciler) handleIncompleteUpdateRun(ctx context.Context, updateRun placementv1beta1.UpdateRunObj, waitTime time.Duration, err error, state placementv1beta1.State, runObjRef klog.ObjectRef) (runtime.Result, error) {
200+
// The execution or stopping is not finished yet or it encounters a retriable error.
201+
// We need to record the status and requeue.
202+
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
203+
return runtime.Result{}, updateErr
204+
}
205+
206+
klog.V(2).InfoS("The updateRun is not finished yet", "state", state, "requeueWaitTime", waitTime, "err", err, "updateRun", runObjRef)
207+
208+
if err != nil {
209+
return runtime.Result{}, err
210+
}
211+
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
212+
}
213+
190214
// handleDelete handles the deletion of the updateRun object.
191215
// We delete all the dependent resources, including approvalRequest objects, of the updateRun object.
192216
func (r *Reconciler) handleDelete(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) (bool, time.Duration, error) {
@@ -277,6 +301,25 @@ func (r *Reconciler) recordUpdateRunFailed(ctx context.Context, updateRun placem
277301
return nil
278302
}
279303

304+
// recordUpdateRunStopped records the progressing condition as stopped in the updateRun status.
305+
func (r *Reconciler) recordUpdateRunStopped(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) error {
306+
updateRunStatus := updateRun.GetUpdateRunStatus()
307+
meta.SetStatusCondition(&updateRunStatus.Conditions, metav1.Condition{
308+
Type: string(placementv1beta1.StagedUpdateRunConditionProgressing),
309+
Status: metav1.ConditionFalse,
310+
ObservedGeneration: updateRun.GetGeneration(),
311+
Reason: condition.UpdateRunStoppedReason,
312+
Message: "The update run has been stopped",
313+
})
314+
315+
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
316+
klog.ErrorS(updateErr, "Failed to update the updateRun status as stopped", "updateRun", klog.KObj(updateRun))
317+
// updateErr can be retried.
318+
return controller.NewUpdateIgnoreConflictError(updateErr)
319+
}
320+
return nil
321+
}
322+
280323
// recordUpdateRunStatus records the updateRun status.
281324
func (r *Reconciler) recordUpdateRunStatus(ctx context.Context, updateRun placementv1beta1.UpdateRunObj) error {
282325
if updateErr := r.Client.Status().Update(ctx, updateRun); updateErr != nil {
@@ -484,3 +527,57 @@ func removeWaitTimeFromUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj)
484527
}
485528
}
486529
}
530+
531+
// updateAllStatusConditionsGeneration iterates through all existing conditions in the UpdateRun status
532+
// and updates their ObservedGeneration field to the current UpdateRun generation.
533+
func updateAllStatusConditionsGeneration(updateRunStatus *placementv1beta1.UpdateRunStatus, generation int64) {
534+
// Update main UpdateRun conditions
535+
for i := range updateRunStatus.Conditions {
536+
updateRunStatus.Conditions[i].ObservedGeneration = generation
537+
}
538+
539+
// Update stage-level conditions and nested task conditions if it exists
540+
for i := range updateRunStatus.StagesStatus {
541+
stageStatus := &updateRunStatus.StagesStatus[i]
542+
543+
// Update stage conditions
544+
updateAllStageStatusConditionsGeneration(stageStatus, generation)
545+
}
546+
547+
// Update deletion stage conditions and nested tasks if it exists
548+
if updateRunStatus.DeletionStageStatus != nil {
549+
deletionStageStatus := updateRunStatus.DeletionStageStatus
550+
551+
// Update deletion stage conditions
552+
updateAllStageStatusConditionsGeneration(deletionStageStatus, generation)
553+
}
554+
}
555+
556+
// updateAllStageStatusConditionsGeneration updates all conditions' ObservedGeneration in the given stage status.
557+
func updateAllStageStatusConditionsGeneration(stageStatus *placementv1beta1.StageUpdatingStatus, generation int64) {
558+
// Update stage conditions
559+
for j := range stageStatus.Conditions {
560+
stageStatus.Conditions[j].ObservedGeneration = generation
561+
}
562+
563+
// Update before stage task conditions
564+
for j := range stageStatus.BeforeStageTaskStatus {
565+
for k := range stageStatus.BeforeStageTaskStatus[j].Conditions {
566+
stageStatus.BeforeStageTaskStatus[j].Conditions[k].ObservedGeneration = generation
567+
}
568+
}
569+
570+
// Update after stage task conditions
571+
for j := range stageStatus.AfterStageTaskStatus {
572+
for k := range stageStatus.AfterStageTaskStatus[j].Conditions {
573+
stageStatus.AfterStageTaskStatus[j].Conditions[k].ObservedGeneration = generation
574+
}
575+
}
576+
577+
// Update cluster-level conditions
578+
for j := range stageStatus.Clusters {
579+
for k := range stageStatus.Clusters[j].Conditions {
580+
stageStatus.Clusters[j].Conditions[k].ObservedGeneration = generation
581+
}
582+
}
583+
}

pkg/controllers/updaterun/controller_integration_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import (
2626
"github.com/google/go-cmp/cmp"
2727
. "github.com/onsi/ginkgo/v2"
2828
. "github.com/onsi/gomega"
29+
"github.com/prometheus/client_golang/prometheus"
2930
prometheusclientmodel "github.com/prometheus/client_model/go"
3031

3132
corev1 "k8s.io/api/core/v1"
@@ -332,6 +333,26 @@ func generateFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *p
332333
}
333334
}
334335

336+
func generateStoppingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
337+
return &prometheusclientmodel.Metric{
338+
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
339+
string(metav1.ConditionUnknown), condition.UpdateRunStoppingReason),
340+
Gauge: &prometheusclientmodel.Gauge{
341+
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
342+
},
343+
}
344+
}
345+
346+
func generateStoppedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
347+
return &prometheusclientmodel.Metric{
348+
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
349+
string(metav1.ConditionFalse), condition.UpdateRunStoppedReason),
350+
Gauge: &prometheusclientmodel.Gauge{
351+
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
352+
},
353+
}
354+
}
355+
335356
func generateSucceededMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
336357
return &prometheusclientmodel.Metric{
337358
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionSucceeded),
@@ -342,6 +363,24 @@ func generateSucceededMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun)
342363
}
343364
}
344365

366+
func labelPairsToMap(pairs []*prometheusclientmodel.LabelPair) prometheus.Labels {
367+
m := prometheus.Labels{}
368+
for _, p := range pairs {
369+
m[p.GetName()] = p.GetValue()
370+
}
371+
return m
372+
}
373+
374+
func removeMetricFromMetricList(metricList []*prometheusclientmodel.Metric, metricToRemove *prometheusclientmodel.Metric) []*prometheusclientmodel.Metric {
375+
var result []*prometheusclientmodel.Metric
376+
for _, metric := range metricList {
377+
if !cmp.Equal(labelPairsToMap(metric.Label), labelPairsToMap(metricToRemove.Label)) {
378+
result = append(result, metric)
379+
}
380+
}
381+
return result
382+
}
383+
345384
func generateTestClusterStagedUpdateRun() *placementv1beta1.ClusterStagedUpdateRun {
346385
return &placementv1beta1.ClusterStagedUpdateRun{
347386
ObjectMeta: metav1.ObjectMeta{
@@ -823,3 +862,18 @@ func generateFalseProgressingCondition(obj client.Object, condType any, reason s
823862
falseCond.Reason = reason
824863
return falseCond
825864
}
865+
866+
func generateFalseConditionWithReason(obj client.Object, condType any, reason string) metav1.Condition {
867+
falseCond := generateFalseCondition(obj, condType)
868+
falseCond.Reason = reason
869+
return falseCond
870+
}
871+
872+
func generateProgressingUnknownConditionWithReason(obj client.Object, reason string) metav1.Condition {
873+
return metav1.Condition{
874+
Status: metav1.ConditionUnknown,
875+
Type: string(placementv1beta1.StageUpdatingConditionProgressing),
876+
ObservedGeneration: obj.GetGeneration(),
877+
Reason: reason,
878+
}
879+
}

pkg/controllers/updaterun/execution.go

Lines changed: 14 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ package updaterun
1818

1919
import (
2020
"context"
21-
"errors"
2221
"fmt"
2322
"reflect"
2423
"strconv"
@@ -62,20 +61,13 @@ func (r *Reconciler) execute(
6261
updateRun placementv1beta1.UpdateRunObj,
6362
updatingStageIndex int,
6463
toBeUpdatedBindings, toBeDeletedBindings []placementv1beta1.BindingObj,
65-
) (finished bool, waitTime time.Duration, err error) {
64+
) (finished bool, waitTime time.Duration, execErr error) {
6665
updateRunStatus := updateRun.GetUpdateRunStatus()
6766
var updatingStageStatus *placementv1beta1.StageUpdatingStatus
6867

6968
// Set up defer function to handle errStagedUpdatedAborted.
7069
defer func() {
71-
if errors.Is(err, errStagedUpdatedAborted) {
72-
if updatingStageStatus != nil {
73-
markStageUpdatingFailed(updatingStageStatus, updateRun.GetGeneration(), err.Error())
74-
} else {
75-
// Handle deletion stage case.
76-
markStageUpdatingFailed(updateRunStatus.DeletionStageStatus, updateRun.GetGeneration(), err.Error())
77-
}
78-
}
70+
checkIfErrorStagedUpdateAborted(execErr, updateRun, updatingStageStatus)
7971
}()
8072

8173
// Mark updateRun as progressing if it's not already marked as waiting or stuck.
@@ -84,9 +76,9 @@ func (r *Reconciler) execute(
8476
markUpdateRunProgressingIfNotWaitingOrStuck(updateRun)
8577
if updatingStageIndex < len(updateRunStatus.StagesStatus) {
8678
updatingStageStatus = &updateRunStatus.StagesStatus[updatingStageIndex]
87-
approved, err := r.checkBeforeStageTasksStatus(ctx, updatingStageIndex, updateRun)
88-
if err != nil {
89-
return false, 0, err
79+
approved, execErr := r.checkBeforeStageTasksStatus(ctx, updatingStageIndex, updateRun)
80+
if execErr != nil {
81+
return false, 0, execErr
9082
}
9183
if !approved {
9284
markStageUpdatingWaiting(updatingStageStatus, updateRun.GetGeneration(), "Not all before-stage tasks are completed, waiting for approval")
@@ -97,13 +89,13 @@ func (r *Reconciler) execute(
9789
if err != nil {
9890
return false, 0, err
9991
}
100-
waitTime, err = r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, maxConcurrency)
92+
waitTime, execErr = r.executeUpdatingStage(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, maxConcurrency)
10193
// The execution has not finished yet.
102-
return false, waitTime, err
94+
return false, waitTime, execErr
10395
}
10496
// All the stages have finished, now start the delete stage.
105-
finished, err = r.executeDeleteStage(ctx, updateRun, toBeDeletedBindings)
106-
return finished, clusterUpdatingWaitTime, err
97+
finished, execErr = r.executeDeleteStage(ctx, updateRun, toBeDeletedBindings)
98+
return finished, clusterUpdatingWaitTime, execErr
10799
}
108100

109101
// checkBeforeStageTasksStatus checks if the before stage tasks have finished.
@@ -232,9 +224,7 @@ func (r *Reconciler) executeUpdatingStage(
232224
}
233225
}
234226
markClusterUpdatingStarted(clusterStatus, updateRun.GetGeneration())
235-
if finishedClusterCount == 0 {
236-
markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration())
237-
}
227+
markStageUpdatingStarted(updatingStageStatus, updateRun.GetGeneration())
238228
// Need to continue as we need to process at most maxConcurrency number of clusters in parallel.
239229
continue
240230
}
@@ -564,6 +554,9 @@ func calculateMaxConcurrencyValue(status *placementv1beta1.UpdateRunStatus, stag
564554
func aggregateUpdateRunStatus(updateRun placementv1beta1.UpdateRunObj, stageName string, stuckClusterNames []string) {
565555
if len(stuckClusterNames) > 0 {
566556
markUpdateRunStuck(updateRun, stageName, strings.Join(stuckClusterNames, ", "))
557+
} else if updateRun.GetUpdateRunSpec().State == placementv1beta1.StateStop {
558+
// If there is no stuck cluster and the update run is stopping, mark the update run as stopped.
559+
markUpdateRunStopping(updateRun)
567560
} else {
568561
// If there is no stuck cluster but some progress has been made, mark the update run as progressing.
569562
markUpdateRunProgressing(updateRun)
@@ -672,7 +665,7 @@ func markUpdateRunProgressing(updateRun placementv1beta1.UpdateRunObj) {
672665
})
673666
}
674667

675-
// markUpdateRunProgressingIfNotWaitingOrStuck marks the update run as proegressing in memory if it's not marked as waiting or stuck already.
668+
// markUpdateRunProgressingIfNotWaitingOrStuck marks the update run as progressing in memory if it's not marked as waiting or stuck already.
676669
func markUpdateRunProgressingIfNotWaitingOrStuck(updateRun placementv1beta1.UpdateRunObj) {
677670
updateRunStatus := updateRun.GetUpdateRunStatus()
678671
progressingCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionProgressing))

0 commit comments

Comments
 (0)