Skip to content

chore: add an actionset api to delay postReady job after cluster running #9399

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 39 additions & 3 deletions controllers/dataprotection/restore_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package dataprotection

import (
"context"
"errors"
"fmt"
"reflect"
"time"
Expand All @@ -41,6 +42,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil"
Expand Down Expand Up @@ -316,8 +318,13 @@ func (r *RestoreReconciler) handleRunningPhase(reqCtx intctrlutil.RequestCtx, re
err = r.Client.Status().Patch(reqCtx.Ctx, restoreMgr.Restore, client.MergeFrom(restoreMgr.OriginalRestore))
}
if err != nil {
r.Recorder.Event(restore, corev1.EventTypeWarning, corev1.EventTypeWarning, err.Error())
return intctrlutil.RequeueWithError(err, reqCtx.Log, "")
if errors.Is(err, ErrWaitClusterRunning) {
r.Recorder.Event(restore, corev1.EventTypeWarning, dprestore.ReasonWaitForClusterRunning, err.Error())
return intctrlutil.Requeue(reqCtx.Log, err.Error())
} else {
r.Recorder.Event(restore, corev1.EventTypeWarning, corev1.EventTypeWarning, err.Error())
return intctrlutil.RequeueWithError(err, reqCtx.Log, "")
}
}
return intctrlutil.Reconciled()
}
Expand Down Expand Up @@ -435,6 +442,8 @@ func (r *RestoreReconciler) postReady(reqCtx intctrlutil.RequestCtx, restoreMgr
return true, nil
}

var ErrWaitClusterRunning = errors.New("wait for cluster entering running phase")

func (r *RestoreReconciler) handleBackupActionSet(reqCtx intctrlutil.RequestCtx,
restoreMgr *dprestore.RestoreManager,
backupSet dprestore.BackupActionSet,
Expand Down Expand Up @@ -468,6 +477,27 @@ func (r *RestoreReconciler) handleBackupActionSet(reqCtx intctrlutil.RequestCtx,
return isCompleted, err
}

checkClusterRunning := func() (pass bool, err error) {
v, ok := backupSet.ActionSet.Annotations[constant.DoReadyRestoreAfterClusterRunningAnnotationKey]
if !ok || v != "true" {
return true, nil
}
cluster := &kbappsv1.Cluster{}
clusterName, ok := restoreMgr.Restore.Labels[constant.AppInstanceLabelKey]
if !ok {
reqCtx.Log.V(2).Info("restore cr missing AppInstanceLabel")
return true, nil
}
if err := r.Client.Get(reqCtx.Ctx, client.ObjectKey{Name: clusterName, Namespace: restoreMgr.Restore.Namespace}, cluster); err != nil {
return false, err
}
if cluster.Status.Phase != kbappsv1.RunningClusterPhase {
return false, ErrWaitClusterRunning
}
return true, nil
}

// 2. build jobs
var jobs []*batchv1.Job
switch stage {
case dpv1alpha1.PrepareData:
Expand All @@ -478,7 +508,13 @@ func (r *RestoreReconciler) handleBackupActionSet(reqCtx intctrlutil.RequestCtx,
}
jobs, err = restoreMgr.BuildPrepareDataJobs(reqCtx, r.Client, backupSet, target, actionName)
case dpv1alpha1.PostReady:
// 2. build jobs for postReady action
// check if need to delay job creation until cluster running
var pass bool
pass, err = checkClusterRunning()
if err != nil || !pass {
return false, err
}

jobs, err = restoreMgr.BuildPostReadyActionJobs(reqCtx, r.Client, backupSet, target, step)
}
if err != nil {
Expand Down
56 changes: 54 additions & 2 deletions controllers/dataprotection/restore_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
testclocks "k8s.io/utils/clock/testing"
"sigs.k8s.io/controller-runtime/pkg/client"

kbappsv1 "github.com/apecloud/kubeblocks/apis/apps/v1"
dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1"
"github.com/apecloud/kubeblocks/pkg/constant"
dprestore "github.com/apecloud/kubeblocks/pkg/dataprotection/restore"
Expand Down Expand Up @@ -483,10 +484,10 @@ var _ = Describe("Restore Controller test", func() {
})

Context("test postReady stage", func() {
var _ *testdp.BackupClusterInfo
var backupClusterInfo *testdp.BackupClusterInfo
BeforeEach(func() {
By("fake a new cluster")
_ = testdp.NewFakeCluster(&testCtx)
backupClusterInfo = testdp.NewFakeCluster(&testCtx)
})

It("test post ready actions", func() {
Expand Down Expand Up @@ -580,6 +581,7 @@ var _ = Describe("Restore Controller test", func() {
})).Should(Succeed())

})

It("test parameters env", func() {
By("set schema and parameters in actionSet")
testdp.MockActionSetWithSchema(&testCtx, actionSet)
Expand All @@ -602,6 +604,56 @@ var _ = Describe("Restore Controller test", func() {
By("expect parameters env in restore jobs")
checkJobParametersEnv(restore)
})

It("respects DoReadyRestoreAfterClusterRunning annotation", func() {
By("set annotation for actionset")
Expect(testapps.ChangeObj(&testCtx, actionSet, func(set *dpv1alpha1.ActionSet) {
set.Spec.Restore.PrepareData = nil
if set.Annotations == nil {
set.Annotations = make(map[string]string)
}
set.Annotations[constant.DoReadyRestoreAfterClusterRunningAnnotationKey] = "true"
})).Should(Succeed())

By("create restore cr")
matchLabels := map[string]string{
constant.AppInstanceLabelKey: testdp.ClusterName,
}
restore := initResourcesAndWaitRestore(true, false, false, "", dpv1alpha1.RestorePhaseRunning,
func(f *testdp.MockRestoreFactory) {
f.
SetConnectCredential(testdp.ClusterName).
SetJobActionConfig(matchLabels).
SetExecActionConfig(matchLabels).
SetLabels(matchLabels)
}, nil)

By("check event fired")
Eventually(func() bool {
eventList := &corev1.EventList{}
err := k8sClient.List(ctx, eventList, client.InNamespace(backupClusterInfo.Cluster.Namespace))
if err != nil {
return false
}
for _, e := range eventList.Items {
if e.Reason == dprestore.ReasonWaitForClusterRunning && e.InvolvedObject.Name == restore.Name {
return true
}
}
return false

}).Should(BeTrue())

By("mock cluster running")
Expect(testapps.ChangeObjStatus(&testCtx, backupClusterInfo.Cluster, func() {
backupClusterInfo.Cluster.Status.Phase = kbappsv1.RunningClusterPhase
})).Should(Succeed())

By("check job created")
Eventually(testapps.List(&testCtx, generics.JobSignature,
client.MatchingLabels{dprestore.DataProtectionRestoreLabelKey: restore.Name},
client.InNamespace(testCtx.DefaultNamespace))).Should(HaveLen(2))
})
})

Context("test cross namespace", func() {
Expand Down
1 change: 1 addition & 0 deletions controllers/dataprotection/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ var fakeClock *testclocks.FakeClock

func init() {
viper.AutomaticEnv()
// viper.Set("ENABLE_DEBUG_LOG", "true")
}

func TestAPIs(t *testing.T) {
Expand Down
8 changes: 8 additions & 0 deletions pkg/constant/annotations.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,14 @@ const (
MultiClusterServicePlacementKey = "apps.kubeblocks.io/multi-cluster-service-placement"
)

// annotations for data protection
const (
// DoReadyRestoreAfterClusterRunningAnnotationKey is an experimental api to delay postReady restore job after cluster is running
// It should be set to "true" in actionset cr.
// This api may later added to action spec and replace the old api which is in cluster restore annotaion (kubeblocks.io/restore-from-backup)
DoReadyRestoreAfterClusterRunningAnnotationKey = "dataprotection.kubeblocks.io/do-ready-restore-after-cluster-running"
)

func InheritedAnnotations() []string {
return []string{
RestoreFromBackupAnnotationKey,
Expand Down
1 change: 1 addition & 0 deletions pkg/dataprotection/restore/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
ReasonProcessing = "Processing"
ReasonFailed = "Failed"
ReasonSucceed = "Succeed"
ReasonWaitForClusterRunning = "WaitForClusterRunning"
reasonCreateRestoreJob = "CreateRestoreJob"
reasonCreateRestorePVC = "CreateRestorePVC"
)
Expand Down