Skip to content

Commit

Permalink
feat(cstorBackup, delete): support for snapshot deletion (#57)
Browse files Browse the repository at this point in the history
This commit adds support to cleanup backup resources generated by maya-apiserver, to execute backup of CStor volumes.

Velero-plugin will execute clean-up operation in following scenarios:
- In case of normal backup, Plugin will send delete request to api-server once the backup for the cstor volume completes
- In case of scheduled backup, Plugin will send delete request, for second-last completed backup, to api-server.
- If backup, normal or scheduled,  is failed then plugin will send the delete request to api-server.

- In case of scheduled backup, 
               A) If backup is last generated one then Maya-Apiserver will delete the relevant CStorBackup, CStorCompletedBackups and CStor snapshot, so next new backup will be full backup
               B) If backup is not the last one then maya-apiserver will delete the relevant CStorBackup and CStor snapshot


REST API query format:
`<APISERVER_ADDR>/latest/backups/<BACKUP_NAME>?volume=<VOL_NAME>&namespace=<PVC_NAMESPACE>&schedule=<SCHEDULE_NAME>`

Sample query URL:
`http://10.0.0.94:5656/latest/backups/emyqevym-20200401174441?volume=pvc-165d0f86-7441-11ea-9d2c-d89c67d5e4a1&namespace=app&schedule=emyqevym`

Covered test cases :
1. Test to verify clean-up of CStorBackup, CStorCompletedBackups and CStor snapshot in case of normal backup
2. Test to verify clean-up of CStorBackup, CStorCompletedBackups and CStor snapshot in case of scheduled backup


Signed-off-by: mayank <[email protected]>
  • Loading branch information
mynktl authored Apr 7, 2020
1 parent be88acc commit 1cd8f21
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 54 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ defaultbackup Completed 2019-05-09 17:08:41 +0530 IST 26d gcp
```
Once the backup is completed you should see the backup marked as `Completed`.

*Note:*
- _If backup name ends with "-20190513104034" format then it is considered as part of scheduled backup_


### Creating a restore from backup
#### Creating a restore from local backup/snapshot
Expand Down Expand Up @@ -174,6 +177,9 @@ newschedule Enabled 2019-05-13 15:15:39 +0530 IST */5 * * * * 720h0m0s

During the first backup iteration of a schedule, full data of the volume will be backed up. For later backup iterations of a schedule, only modified or new data from the previous iteration will be backed up.

*Note:*
- _If backup name ends with "-20190513104034" format then it is considered as part of scheduled backup_

### Restoring from a scheduled backup
#### Restoring from a scheduled local backup/snapshot
In case of local backups/snapshots, you can restore from any completed backup created by the schedule.
Expand Down
4 changes: 4 additions & 0 deletions pkg/clouduploader/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,6 +274,10 @@ func (c *Conn) Destroy(rw ReadWriter, opType ServerOperation) {
}
}

// getPartSize returns the multiPartChunkSize from the config
// - if multiPartChunkSize is not specified then it will return 0
// - if multiPartChunkSize is less then s3manager.MinUploadPartSize/5Mb then it will return an error
// - if multiPartChunkSize is invalid then it will return an error
func getPartSize(config map[string]string) (val int64, err error) {
partSize, ok := config[MultiPartChunkSize]
if !ok {
Expand Down
4 changes: 2 additions & 2 deletions pkg/clouduploader/operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ const (
// It will create a TCP server through which client can
// connect and upload data to cloud blob storage file
func (c *Conn) Upload(file string, fileSize int64) bool {
c.Log.Infof("Uploading snapshot to '%s' with provider{%s} to bucket{%s}", file, c.provider, c.bucketname)
c.Log.Infof("Uploading snapshot to '%s' with provider{%s} to bucket{%s}", file, c.provider, c.bucketname)

c.file = file
if c.partSize == 0 {
Expand Down Expand Up @@ -59,7 +59,7 @@ func (c *Conn) Upload(file string, fileSize int64) bool {

// Delete will delete file from cloud blob storage
func (c *Conn) Delete(file string) bool {
c.Log.Infof("Removing snapshot:'%s' from bucket{%s} provider{%s}", file, c.bucket, c.provider)
c.Log.Infof("Removing snapshot:'%s' from bucket{%s} provider{%s}", file, c.bucketname, c.provider)

if c.bucket.Delete(c.ctx, file) != nil {
c.Log.Errorf("Failed to remove snapshot{%s} from cloud", file)
Expand Down
54 changes: 45 additions & 9 deletions pkg/cstor/api_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"io/ioutil"
"net/http"
"strconv"
"strings"
"time"

v1alpha1 "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
Expand Down Expand Up @@ -121,16 +120,10 @@ fetchip:
}

func (p *Plugin) sendBackupRequest(vol *Volume) (*v1alpha1.CStorBackup, error) {
bname := vol.backupName // This will be backup/schedule name

// If it is scheduled backup then we need to fetch schedule name
splitName := strings.Split(vol.backupName, "-")
if len(splitName) >= 2 {
bname = strings.Join(splitName[0:len(splitName)-1], "-")
}
scheduleName := p.getScheduleName(vol.backupName) // This will be backup/schedule name

bkpSpec := &v1alpha1.CStorBackupSpec{
BackupName: bname,
BackupName: scheduleName,
VolumeName: vol.volname,
SnapName: vol.backupName,
BackupDest: p.cstorServerAddr,
Expand Down Expand Up @@ -219,3 +212,46 @@ func isEmptyRestResponse(data []byte) (bool, error) {

return false, nil
}

func (p *Plugin) sendDeleteRequest(backup, volume, namespace, schedule string) error {
url := p.mayaAddr + backupEndpoint + backup

req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
return errors.Wrapf(err, "failed to create HTTP request")
}

q := req.URL.Query()
q.Add("volume", volume)
q.Add("namespace", namespace)
q.Add("schedule", schedule)

req.URL.RawQuery = q.Encode()

c := &http.Client{
Timeout: 60 * time.Second,
}

resp, err := c.Do(req)
if err != nil {
return errors.Wrapf(err, "failed to connect maya-apiserver")
}

defer func() {
if err := resp.Body.Close(); err != nil {
p.Log.Warnf("Failed to close response err=%s", err)
}
}()

_, err = ioutil.ReadAll(resp.Body)
if err != nil {
return errors.Wrapf(err, "failed to read response from maya-apiserver")
}

code := resp.StatusCode
if code != http.StatusOK {
return errors.Errorf("HTTP Status error{%v} from maya-apiserver", code)
}

return nil
}
70 changes: 30 additions & 40 deletions pkg/cstor/cstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ limitations under the License.
package cstor

import (
"io/ioutil"
"net"
"net/http"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -276,49 +274,22 @@ func (p *Plugin) DeleteSnapshot(snapshotID string) error {
snapInfo = p.snapshots[snapshotID]
}

if snapInfo.volID == "" || snapInfo.backupName == "" || snapInfo.namespace == "" {
return errors.Errorf("Got insufficient info vol:{%s} snap:{%s} ns:{%s}",
scheduleName := p.getScheduleName(snapInfo.backupName)

if snapInfo.volID == "" || snapInfo.backupName == "" || snapInfo.namespace == "" || scheduleName == "" {
return errors.Errorf("Got insufficient info vol:{%s} snap:{%s} ns:{%s} schedule:{%s}",
snapInfo.volID,
snapInfo.backupName,
snapInfo.namespace)
}

url := p.mayaAddr + backupEndpoint + snapInfo.backupName

req, err := http.NewRequest("DELETE", url, nil)
if err != nil {
p.Log.Errorf("Failed to create HTTP request")
return err
}

q := req.URL.Query()
q.Add("volume", snapInfo.volID)
q.Add("namespace", snapInfo.namespace)

req.URL.RawQuery = q.Encode()

c := &http.Client{
Timeout: 60 * time.Second,
}
resp, err := c.Do(req)
if err != nil {
return errors.Errorf("Error when connecting to maya-apiserver : %s", err.Error())
snapInfo.namespace,
scheduleName)
}

defer func() {
if err := resp.Body.Close(); err != nil {
p.Log.Warnf("Failed to close response : %s", err.Error())
}
}()

_, err = ioutil.ReadAll(resp.Body)
err = p.sendDeleteRequest(snapInfo.backupName,
snapInfo.volID,
snapInfo.namespace,
scheduleName)
if err != nil {
return errors.Errorf("Unable to read response from maya-apiserver : %s", err.Error())
}

code := resp.StatusCode
if code != http.StatusOK {
return errors.Errorf("HTTP Status error{%v} from maya-apiserver", code)
return errors.Wrapf(err, "failed to execute maya-apiserver DELETE API")
}

if p.local {
Expand Down Expand Up @@ -536,3 +507,22 @@ func (p *Plugin) getVolInfo(volumeID, snapName string) (*Volume, error) {

return vol, nil
}

// getScheduleName return the schedule name for the given backup
// It will check if backup name have 'bkp-20060102150405' format
func (p *Plugin) getScheduleName(backupName string) string {
// for non-scheduled backup, we are considering backup name as schedule name only
scheduleOrBackupName := backupName

// If it is scheduled backup then we need to get the schedule name
splitName := strings.Split(backupName, "-")
if len(splitName) >= 2 {
_, err := time.Parse("20060102150405", splitName[len(splitName)-1])
if err != nil {
// last substring is not timestamp, so it is not generated from schedule
return scheduleOrBackupName
}
scheduleOrBackupName = strings.Join(splitName[0:len(splitName)-1], "-")
}
return scheduleOrBackupName
}
56 changes: 56 additions & 0 deletions pkg/cstor/status.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ func (p *Plugin) checkBackupStatus(bkp *v1alpha1.CStorBackup) {
case v1alpha1.BKPCStorStatusDone, v1alpha1.BKPCStorStatusFailed, v1alpha1.BKPCStorStatusInvalid:
bkpDone = true
p.cl.ExitServer = true
if err = p.cleanupCompletedBackup(bs); err != nil {
p.Log.Warningf("failed to execute clean-up request for backup=%s err=%s", bs.Name, err)
}
}
}
}
Expand Down Expand Up @@ -107,3 +110,56 @@ func (p *Plugin) checkRestoreStatus(rst *v1alpha1.CStorRestore, vol *Volume) {
}
}
}

// cleanupCompletedBackup send the delete request to apiserver
// to cleanup backup resources
// If it is normal backup then it will delete the current backup, it can be failed or succeeded backup
// If it is scheduled backup then
// - if current backup is base backup, not incremental one, then it will not perform any clean-up
// - if current backup is incremental backup and failed one then it will delete that(current) backup
// - if current backup is incremental backup and completed successfully then it will delete the last completed or previous backup
func (p *Plugin) cleanupCompletedBackup(bkp v1alpha1.CStorBackup) error {
targetedSnapName := bkp.Spec.SnapName

// In case of scheduled backup we are using the last completed backup to send
// differential snapshot. So We don't need to delete the last completed backup.
if isScheduledBackup(bkp) && isBackupSucceeded(bkp) {
// For incremental backup We are using PrevSnapName to send the differential snapshot
// Since Given backup is completed successfully We can delete the 2nd last completed backup
if len(bkp.Spec.PrevSnapName) == 0 {
// PrevSnapName will be empty if the given backup is base backup
// clean-up is not required for base backup
return nil
}
targetedSnapName = bkp.Spec.PrevSnapName
}

p.Log.Infof("executing clean-up request.. snapshot=%s volume=%s ns=%s backup=%s",
targetedSnapName,
bkp.Spec.VolumeName,
bkp.Namespace,
bkp.Spec.BackupName,
)

return p.sendDeleteRequest(targetedSnapName,
bkp.Spec.VolumeName,
bkp.Namespace,
bkp.Spec.BackupName)
}

// return true if given backup is part of schedule
func isScheduledBackup(bkp v1alpha1.CStorBackup) bool {
// if backup is scheduled backup then snapshot name and backup name are different
if bkp.Spec.SnapName != bkp.Spec.BackupName {
return true
}
return false
}

// isBackupSucceeded returns true if backup completed successfully
func isBackupSucceeded(bkp v1alpha1.CStorBackup) bool {
if bkp.Status == v1alpha1.BKPCStorStatusDone {
return true
}
return false
}
7 changes: 5 additions & 2 deletions tests/k8s/log.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,14 @@ func (k *KubeClient) DumpLogs(ns, podName, container string) error {

readCloser, err := req.Stream()
if err != nil {
fmt.Printf("DumpLogs: Error occured for %s/%s:%s.. %s", ns, podName, container, err)
fmt.Printf("DumpLogs: Error occurred for %s/%s:%s.. %s", ns, podName, container, err)
return err
}

defer readCloser.Close()
defer func() {
_ = readCloser.Close()
}()

_, err = io.Copy(os.Stdout, readCloser)
fmt.Println(err)
return err
Expand Down
74 changes: 74 additions & 0 deletions tests/openebs/storage_status.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package openebs

import (
"fmt"
"strings"
"time"

v1alpha1 "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
Expand Down Expand Up @@ -170,3 +171,76 @@ func getPoolNameFromCVR(k v1alpha1.CStorVolumeReplica) string {
func getVolumeNameFromCVR(k v1alpha1.CStorVolumeReplica) string {
return k.Labels[cVRPVLabel]
}

// GetCStorBackups returns cstorbackup list for the given backup
func (c *ClientSet) GetCStorBackups(backup, ns string) (*v1alpha1.CStorBackupList, error) {
return c.OpenebsV1alpha1().
CStorBackups(ns).
List(metav1.ListOptions{
LabelSelector: "openebs.io/backup=" + backup,
})
}

// GetCStorCompletedBackups returns cstorcompletedbackup list for the given backup
func (c *ClientSet) GetCStorCompletedBackups(backup, ns string) (*v1alpha1.CStorCompletedBackupList, error) {
return c.OpenebsV1alpha1().
CStorCompletedBackups(ns).
List(metav1.ListOptions{
LabelSelector: "openebs.io/backup=" + backup,
})
}

// IsBackupResourcesExist checks if backupResources, for the given backup, exist or not
func (c *ClientSet) IsBackupResourcesExist(backup, pvc, ns string) (bool, error) {
isSchedule := false
isLastBackup := false

scheduleName := backup
splitName := strings.Split(backup, "-")
if len(splitName) >= 2 {
isSchedule = true
scheduleName = strings.Join(splitName[0:len(splitName)-1], "-")
}

blist, err := c.GetCStorBackups(scheduleName, ns)
if err != nil {
return false, errors.Wrapf(err, "failed to fetch cstorbackup list for backup %s/%s", ns, backup)
}

cblist, err := c.GetCStorCompletedBackups(scheduleName, ns)
if err != nil {
return false, errors.Wrapf(err, "failed to fetch cstorcompletedbackup list for backup %s/%s", ns, backup)
}

if isSchedule && len(cblist.Items) == 0 {
return true, errors.Errorf("for schedule cstorcompletedbackups should be present")
}

// for schedule cstorcompletedbackups is not deleted by apiserver to support incremental backup
if isSchedule && len(cblist.Items) == 1 {
cbkp := cblist.Items[0]
// if given backup is the last backup then relevant cstorbackup will not be deleted
for i, bkp := range blist.Items {
if bkp.Spec.SnapName == cbkp.Spec.PrevSnapName {
blist.Items = append(blist.Items[:i], blist.Items[i+1:]...)
}
if bkp.Spec.SnapName == backup {
isLastBackup = true
}
}
cblist.Items = cblist.Items[:0]
}

snapshotExist, err := c.CheckSnapshot(pvc, ns, backup)
if err != nil {
if !strings.Contains(err.Error(), "command terminated with exit code 1") {
return false, errors.Wrapf(err, "failed to verify snapshot for backup %s/%s", ns, backup)
}
}

if len(blist.Items) != 0 || len(cblist.Items) != 0 || (snapshotExist && !isLastBackup) {
return true, errors.Errorf("backup %s/%s backup:%d cbackup:%d snapshot:%v isLastBackup:%v", ns, backup, len(blist.Items), len(cblist.Items), snapshotExist, isLastBackup)
}

return false, nil
}
Loading

0 comments on commit 1cd8f21

Please sign in to comment.