From d01e8ff7f0e0647bcfe2fc4e4b1d7d7dabe718e3 Mon Sep 17 00:00:00 2001 From: Harold Cheng Date: Thu, 29 May 2025 15:19:19 +0800 Subject: [PATCH 1/2] chore: add an actionset api to delay postReady job after cluster running --- .../dataprotection/restore_controller.go | 31 +++++++++- .../dataprotection/restore_controller_test.go | 56 ++++++++++++++++++- pkg/constant/annotations.go | 8 +++ pkg/dataprotection/restore/types.go | 1 + 4 files changed, 93 insertions(+), 3 deletions(-) diff --git a/controllers/dataprotection/restore_controller.go b/controllers/dataprotection/restore_controller.go index d137d6f43c9..82bcd7ab36d 100644 --- a/controllers/dataprotection/restore_controller.go +++ b/controllers/dataprotection/restore_controller.go @@ -41,6 +41,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/reconcile" + kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1" "github.com/apecloud/kubeblocks/pkg/constant" intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" @@ -468,6 +469,28 @@ func (r *RestoreReconciler) handleBackupActionSet(reqCtx intctrlutil.RequestCtx, return isCompleted, err } + checkClusterRunning := func() (pass bool, err error) { + v, ok := backupSet.ActionSet.Annotations[constant.DoReadyRestoreAfterClusterRunningAnnotationKey] + if !ok || v != "true" { + return true, nil + } + cluster := &kbappsv1.Cluster{} + clusterName, ok := restoreMgr.Restore.Labels[constant.AppInstanceLabelKey] + if !ok { + reqCtx.Log.V(2).Info("restore cr missing AppInstanceLabel") + return true, nil + } + if err := r.Client.Get(reqCtx.Ctx, client.ObjectKey{Name: clusterName, Namespace: restoreMgr.Restore.Namespace}, cluster); err != nil { + return false, err + } + if cluster.Status.Phase != kbappsv1.RunningClusterPhase { + reqCtx.Recorder.Event(restoreMgr.Restore, corev1.EventTypeWarning, dprestore.ReasonWaitForClusterRunning, "wait for cluster entering running phase") + return false, nil + } + return true, nil + } + + // 2. build jobs var jobs []*batchv1.Job switch stage { case dpv1alpha1.PrepareData: @@ -478,7 +501,13 @@ func (r *RestoreReconciler) handleBackupActionSet(reqCtx intctrlutil.RequestCtx, } jobs, err = restoreMgr.BuildPrepareDataJobs(reqCtx, r.Client, backupSet, target, actionName) case dpv1alpha1.PostReady: - // 2. build jobs for postReady action + // check if need to delay job creation until cluster running + var pass bool + pass, err = checkClusterRunning() + if err != nil || !pass { + return false, err + } + jobs, err = restoreMgr.BuildPostReadyActionJobs(reqCtx, r.Client, backupSet, target, step) } if err != nil { diff --git a/controllers/dataprotection/restore_controller_test.go b/controllers/dataprotection/restore_controller_test.go index 9bfaf1fd553..015c05385ed 100644 --- a/controllers/dataprotection/restore_controller_test.go +++ b/controllers/dataprotection/restore_controller_test.go @@ -35,6 +35,7 @@ import ( testclocks "k8s.io/utils/clock/testing" "sigs.k8s.io/controller-runtime/pkg/client" + kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1" "github.com/apecloud/kubeblocks/pkg/constant" dprestore "github.com/apecloud/kubeblocks/pkg/dataprotection/restore" @@ -483,10 +484,10 @@ var _ = Describe("Restore Controller test", func() { }) Context("test postReady stage", func() { - var _ *testdp.BackupClusterInfo + var backupClusterInfo *testdp.BackupClusterInfo BeforeEach(func() { By("fake a new cluster") - _ = testdp.NewFakeCluster(&testCtx) + backupClusterInfo = testdp.NewFakeCluster(&testCtx) }) It("test post ready actions", func() { @@ -580,6 +581,7 @@ var _ = Describe("Restore Controller test", func() { })).Should(Succeed()) }) + It("test parameters env", func() { By("set schema and parameters in actionSet") testdp.MockActionSetWithSchema(&testCtx, actionSet) @@ -602,6 +604,56 @@ var _ = Describe("Restore Controller test", func() { By("expect parameters env in restore jobs") checkJobParametersEnv(restore) }) + + It("respects DoReadyRestoreAfterClusterRunning annotation", func() { + By("set annotation for actionset") + Expect(testapps.ChangeObj(&testCtx, actionSet, func(set *dpv1alpha1.ActionSet) { + set.Spec.Restore.PrepareData = nil + if set.Annotations == nil { + set.Annotations = make(map[string]string) + } + set.Annotations[constant.DoReadyRestoreAfterClusterRunningAnnotationKey] = "true" + })).Should(Succeed()) + + By("create restore cr") + matchLabels := map[string]string{ + constant.AppInstanceLabelKey: testdp.ClusterName, + } + restore := initResourcesAndWaitRestore(true, false, false, "", dpv1alpha1.RestorePhaseRunning, + func(f *testdp.MockRestoreFactory) { + f. + SetConnectCredential(testdp.ClusterName). + SetJobActionConfig(matchLabels). + SetExecActionConfig(matchLabels). + SetLabels(matchLabels) + }, nil) + + By("check event fired") + Eventually(func() bool { + eventList := &corev1.EventList{} + err := k8sClient.List(ctx, eventList, client.InNamespace(backupClusterInfo.Cluster.Namespace)) + if err != nil { + return false + } + for _, e := range eventList.Items { + if e.Reason == dprestore.ReasonWaitForClusterRunning && e.InvolvedObject.Name == restore.Name { + return true + } + } + return false + + }).Should(BeTrue()) + + By("mock cluster running") + Expect(testapps.ChangeObjStatus(&testCtx, backupClusterInfo.Cluster, func() { + backupClusterInfo.Cluster.Status.Phase = kbappsv1.RunningClusterPhase + })).Should(Succeed()) + + By("check job created") + Eventually(testapps.List(&testCtx, generics.JobSignature, + client.MatchingLabels{dprestore.DataProtectionRestoreLabelKey: restore.Name}, + client.InNamespace(testCtx.DefaultNamespace))).Should(HaveLen(2)) + }) }) Context("test cross namespace", func() { diff --git a/pkg/constant/annotations.go b/pkg/constant/annotations.go index 9f22ea3dfab..21dea07ae80 100644 --- a/pkg/constant/annotations.go +++ b/pkg/constant/annotations.go @@ -59,6 +59,14 @@ const ( MultiClusterServicePlacementKey = "apps.kubeblocks.io/multi-cluster-service-placement" ) +// annotations for data protection +const ( + // DoReadyRestoreAfterClusterRunningAnnotationKey is an experimental api to delay postReady restore job after cluster is running + // It should be set to "true" in actionset cr. + // This api may later added to action spec and replace the old api which is in cluster restore annotaion (kubeblocks.io/restore-from-backup) + DoReadyRestoreAfterClusterRunningAnnotationKey = "dataprotection.kubeblocks.io/do-ready-restore-after-cluster-running" +) + func InheritedAnnotations() []string { return []string{ RestoreFromBackupAnnotationKey, diff --git a/pkg/dataprotection/restore/types.go b/pkg/dataprotection/restore/types.go index 46bc407e343..116a1c2ffdd 100644 --- a/pkg/dataprotection/restore/types.go +++ b/pkg/dataprotection/restore/types.go @@ -41,6 +41,7 @@ const ( ReasonProcessing = "Processing" ReasonFailed = "Failed" ReasonSucceed = "Succeed" + ReasonWaitForClusterRunning = "WaitForClusterRunning" reasonCreateRestoreJob = "CreateRestoreJob" reasonCreateRestorePVC = "CreateRestorePVC" ) From 217b0c3976dc343cc93d30d3768b6237ce623c21 Mon Sep 17 00:00:00 2001 From: Harold Cheng Date: Thu, 29 May 2025 15:46:04 +0800 Subject: [PATCH 2/2] requeue requests --- controllers/dataprotection/restore_controller.go | 15 +++++++++++---- controllers/dataprotection/suite_test.go | 1 + 2 files changed, 12 insertions(+), 4 deletions(-) diff --git a/controllers/dataprotection/restore_controller.go b/controllers/dataprotection/restore_controller.go index 82bcd7ab36d..4a54fab4d6b 100644 --- a/controllers/dataprotection/restore_controller.go +++ b/controllers/dataprotection/restore_controller.go @@ -21,6 +21,7 @@ package dataprotection import ( "context" + "errors" "fmt" "reflect" "time" @@ -317,8 +318,13 @@ func (r *RestoreReconciler) handleRunningPhase(reqCtx intctrlutil.RequestCtx, re err = r.Client.Status().Patch(reqCtx.Ctx, restoreMgr.Restore, client.MergeFrom(restoreMgr.OriginalRestore)) } if err != nil { - r.Recorder.Event(restore, corev1.EventTypeWarning, corev1.EventTypeWarning, err.Error()) - return intctrlutil.RequeueWithError(err, reqCtx.Log, "") + if errors.Is(err, ErrWaitClusterRunning) { + r.Recorder.Event(restore, corev1.EventTypeWarning, dprestore.ReasonWaitForClusterRunning, err.Error()) + return intctrlutil.Requeue(reqCtx.Log, err.Error()) + } else { + r.Recorder.Event(restore, corev1.EventTypeWarning, corev1.EventTypeWarning, err.Error()) + return intctrlutil.RequeueWithError(err, reqCtx.Log, "") + } } return intctrlutil.Reconciled() } @@ -436,6 +442,8 @@ func (r *RestoreReconciler) postReady(reqCtx intctrlutil.RequestCtx, restoreMgr return true, nil } +var ErrWaitClusterRunning = errors.New("wait for cluster entering running phase") + func (r *RestoreReconciler) handleBackupActionSet(reqCtx intctrlutil.RequestCtx, restoreMgr *dprestore.RestoreManager, backupSet dprestore.BackupActionSet, @@ -484,8 +492,7 @@ func (r *RestoreReconciler) handleBackupActionSet(reqCtx intctrlutil.RequestCtx, return false, err } if cluster.Status.Phase != kbappsv1.RunningClusterPhase { - reqCtx.Recorder.Event(restoreMgr.Restore, corev1.EventTypeWarning, dprestore.ReasonWaitForClusterRunning, "wait for cluster entering running phase") - return false, nil + return false, ErrWaitClusterRunning } return true, nil } diff --git a/controllers/dataprotection/suite_test.go b/controllers/dataprotection/suite_test.go index 5062901be6b..73d71aed121 100644 --- a/controllers/dataprotection/suite_test.go +++ b/controllers/dataprotection/suite_test.go @@ -69,6 +69,7 @@ var fakeClock *testclocks.FakeClock func init() { viper.AutomaticEnv() + // viper.Set("ENABLE_DEBUG_LOG", "true") } func TestAPIs(t *testing.T) {