Skip to content

Commit

Permalink
Merge pull request #153 from replicatedhq/ricardomaraschini/sc-58510/…
Browse files Browse the repository at this point in the history
…support-migrating-from-longhorn-to-rook-ceph

bug: checking if claim ref is set before changing reclaim policy.
  • Loading branch information
ricardomaraschini authored Feb 17, 2023
2 parents 65e2284 + 27cad1a commit 33bbb07
Show file tree
Hide file tree
Showing 2 changed files with 122 additions and 22 deletions.
37 changes: 25 additions & 12 deletions pkg/migrate/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -592,15 +592,24 @@ func newPvcName(originalName string) string {
}

// get a PV, apply the selected mutator to the PV, update the PV, use the supplied validator to wait for the update to show up
func mutatePV(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvName string, mutator func(volume *corev1.PersistentVolume) *corev1.PersistentVolume, checker func(volume *corev1.PersistentVolume) bool) error {
func mutatePV(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvName string, mutator func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error), checker func(volume *corev1.PersistentVolume) bool) error {
tries := 0
for {
pv, err := clientset.CoreV1().PersistentVolumes().Get(ctx, pvName, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("failed to get persistent volumes %s: %w", pvName, err)
}

pv = mutator(pv)
pv, err = mutator(pv)
if err != nil {
if tries > 5 {
return fmt.Errorf("failed to determine new PV %s: %w", pvName, err)
}
w.Printf("Failed to determine new PV %s, waiting 5s to retry\n", pvName)
time.Sleep(time.Second * 5)
tries++
continue
}

_, err = clientset.CoreV1().PersistentVolumes().Update(ctx, pv, metav1.UpdateOptions{})
if err != nil {
Expand Down Expand Up @@ -943,10 +952,10 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
// mark PVs used by both originalPVC and migratedPVC as to-be-retained
w.Printf("Marking original PV %s as to-be-retained\n", originalPVC.Spec.VolumeName)
var originalReclaim corev1.PersistentVolumeReclaimPolicy
err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
originalReclaim = volume.Spec.PersistentVolumeReclaimPolicy
volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain
return volume
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
return volume.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimRetain
})
Expand All @@ -955,7 +964,7 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
}

w.Printf("Marking migrated-to PV %s as to-be-retained\n", migratedPVC.Spec.VolumeName)
err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
// add annotations describing what PVC this data came from in case of a failure later
volume.Annotations[sourceNsAnnotation] = ns
volume.Annotations[sourcePVCAnnotation] = pvcName
Expand All @@ -964,7 +973,7 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,
volume.Annotations[desiredReclaimAnnotation] = string(originalReclaim)

volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain
return volume
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
return volume.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimRetain
})
Expand Down Expand Up @@ -998,19 +1007,19 @@ func swapPVs(ctx context.Context, w *log.Logger, clientset k8sclient.Interface,

// remove claimrefs from original and migrated-to PVs
w.Printf("Removing claimref from original PV %s\n", originalPVC.Spec.VolumeName)
err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
err = mutatePV(ctx, w, clientset, originalPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
volume.Spec.ClaimRef = nil
return volume
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
return volume.Spec.ClaimRef == nil
})
if err != nil {
return fmt.Errorf("failed to remove claimrefs from PV %s: %w", originalPVC.Spec.VolumeName, err)
}
w.Printf("Removing claimref from migrated-to PV %s\n", migratedPVC.Spec.VolumeName)
err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
err = mutatePV(ctx, w, clientset, migratedPVC.Spec.VolumeName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
volume.Spec.ClaimRef = nil
return volume
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
return volume.Spec.ClaimRef == nil
})
Expand Down Expand Up @@ -1089,15 +1098,19 @@ func waitForDeletion(ctx context.Context, clientset k8sclient.Interface, pvcName
// If 'reclaim' is not specified and the annotation does not exist, the reclaim policy will not be updated.
// in either case, the annotation will be removed.
func resetReclaimPolicy(ctx context.Context, w *log.Logger, clientset k8sclient.Interface, pvName string, reclaim *corev1.PersistentVolumeReclaimPolicy) error {
err := mutatePV(ctx, w, clientset, pvName, func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
err := mutatePV(ctx, w, clientset, pvName, func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
if volume.Spec.ClaimRef == nil {
return nil, fmt.Errorf("PV claimRef not set")
}

if reclaim != nil {
volume.Spec.PersistentVolumeReclaimPolicy = *reclaim
} else {
if annotationVal, ok := volume.Annotations[desiredReclaimAnnotation]; ok {
volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimPolicy(annotationVal)
}
}
return volume
return volume, nil
}, func(volume *corev1.PersistentVolume) bool {
if reclaim != nil {
return volume.Spec.PersistentVolumeReclaimPolicy == *reclaim
Expand Down
107 changes: 97 additions & 10 deletions pkg/migrate/migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func TestMutatePV(t *testing.T) {
resources []runtime.Object
pvname string
wantErr bool
ttmutator func(volume *corev1.PersistentVolume) *corev1.PersistentVolume
ttmutator func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error)
ttchecker func(volume *corev1.PersistentVolume) bool
validate func(clientset k8sclient.Interface, t *testing.T) error
}{
Expand All @@ -236,9 +236,9 @@ func TestMutatePV(t *testing.T) {
},
},
},
ttmutator: func(volume *corev1.PersistentVolume) *corev1.PersistentVolume {
ttmutator: func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
volume.Spec.PersistentVolumeReclaimPolicy = corev1.PersistentVolumeReclaimRetain
return volume
return volume, nil
},
ttchecker: func(volume *corev1.PersistentVolume) bool {
return volume.Spec.PersistentVolumeReclaimPolicy == corev1.PersistentVolumeReclaimRetain
Expand Down Expand Up @@ -1217,18 +1217,58 @@ func Test_swapPVs(t *testing.T) {
sourceScName := "sourceScName"
destScName := "destScName"
tests := []struct {
name string
resources []runtime.Object
wantPVs []corev1.PersistentVolume
wantPVCs []corev1.PersistentVolumeClaim
ns string
pvcName string
wantErr bool
name string
resources []runtime.Object
wantPVs []corev1.PersistentVolume
wantPVCs []corev1.PersistentVolumeClaim
ns string
pvcName string
wantErr bool
backgroundFunc func(context.Context, *log.Logger, k8sclient.Interface)
}{
{
name: "swap one PVC",
ns: "testns",
pvcName: "sourcepvc",
backgroundFunc: func(ctx context.Context, logger *log.Logger, k k8sclient.Interface) {
// watch for the statefulset to be scaled down, and then delete the pod
for {
select {
case <-time.After(time.Second / 100):
// check statefulset, maybe delete pod
pvcs, err := k.CoreV1().PersistentVolumeClaims("testns").List(ctx, metav1.ListOptions{})
if err != nil {
logger.Printf("got listing PVCs: %s", err.Error())
continue
}

for _, pvc := range pvcs.Items {
if pvc.Spec.VolumeName != "" {
logger.Printf("setting pv %s claim ref to pvc %s", pvc.Spec.VolumeName, pvc.Name)
err := mutatePV(ctx, logger, k, pvc.Spec.VolumeName,
func(volume *corev1.PersistentVolume) (*corev1.PersistentVolume, error) {
volume.Spec.ClaimRef = &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: pvc.Name,
}
return volume, nil
},
func(volume *corev1.PersistentVolume) bool {
return true
},
)
if err != nil {
logger.Printf("error mutating PV: %s", err)
}
}
}
case <-ctx.Done():
return
}
}
},
resources: []runtime.Object{
// two PVCs
&corev1.PersistentVolumeClaim{
Expand Down Expand Up @@ -1402,6 +1442,12 @@ func Test_swapPVs(t *testing.T) {
},
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimDelete,
StorageClassName: sourceScName,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
Status: corev1.PersistentVolumeStatus{
Phase: corev1.VolumeBound,
Expand Down Expand Up @@ -1442,6 +1488,11 @@ func Test_swapPVs(t *testing.T) {
req := require.New(t)
clientset := fake.NewSimpleClientset(tt.resources...)
testlog := log.New(testWriter{t: t}, "", 0)

if tt.backgroundFunc != nil {
go tt.backgroundFunc(context.Background(), testlog, clientset)
}

err := swapPVs(context.Background(), testlog, clientset, tt.ns, tt.pvcName)
if tt.wantErr {
req.Error(err)
Expand Down Expand Up @@ -1489,6 +1540,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand All @@ -1507,6 +1564,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimDelete,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand All @@ -1530,6 +1593,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand All @@ -1548,6 +1617,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRetain,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand All @@ -1569,6 +1644,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand All @@ -1586,6 +1667,12 @@ func Test_resetReclaimPolicy(t *testing.T) {
},
Spec: corev1.PersistentVolumeSpec{
PersistentVolumeReclaimPolicy: corev1.PersistentVolumeReclaimRecycle,
ClaimRef: &corev1.ObjectReference{
APIVersion: "v1",
Kind: "PersistentVolumeClaim",
Namespace: "testns",
Name: "sourcepvc",
},
},
},
},
Expand Down

0 comments on commit 33bbb07

Please sign in to comment.