Skip to content

Commit

Permalink
Merge pull request #332 from actiontech/operate_data_resource
Browse files Browse the repository at this point in the history
remove unused plugin model
  • Loading branch information
winfredLIN authored Dec 5, 2024
2 parents 5a2493d + 8f16bed commit b1e07f1
Show file tree
Hide file tree
Showing 8 changed files with 146 additions and 253 deletions.
37 changes: 22 additions & 15 deletions internal/dms/biz/db_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,21 +218,28 @@ func (d *DBServiceUsecase) CreateDBService(ctx context.Context, args *BizDBServi
return "", fmt.Errorf("new db service failed: %w", err)
}

err = d.createDBService(ctx, ds)
if err != nil {
return "", err
}
return ds.UID, nil
}

func (d *DBServiceUsecase) createDBService(ctx context.Context, dbService *DBService) error {
// 调用其他服务对数据源进行预检查
if err = d.pluginUsecase.AddDBServicePreCheck(ctx, ds); err != nil {
return "", fmt.Errorf("precheck db service failed: %w", err)
if err := d.pluginUsecase.AddDBServicePreCheck(ctx, dbService); err != nil {
return fmt.Errorf("precheck db service failed: %w", err)
}

if err = d.repo.SaveDBServices(ctx, []*DBService{ds}); err != nil {
return "", err
if err := d.repo.SaveDBServices(ctx, []*DBService{dbService}); err != nil {
return err
}

err = d.pluginUsecase.OperateDataResourceHandle(ctx, ds.UID, dmsCommonV1.DataResourceTypeDBService, dmsCommonV1.OperationTypeCreate, dmsCommonV1.OperationTimingAfter)
err := d.pluginUsecase.AddDBServiceAfterHandle(ctx, dbService.UID)
if err != nil {
return "", fmt.Errorf("plugin handle after craete db_service err: %v", err)
return fmt.Errorf("plugin handle after craete db_service err: %v", err)
}

return ds.UID, nil
return nil
}

type ListDBServicesOption struct {
Expand Down Expand Up @@ -300,7 +307,7 @@ func (d *DBServiceUsecase) TestDbServiceConnections(ctx context.Context, DBServi
dbService.LastConnectionTime = &connectionResult.TestConnectionTime
dbService.LastConnectionErrorMsg = &connectionResult.ConnectErrorMessage

err = d.UpdateDBServiceByBiz(ctx, dbService, currentUserUid)
err = d.UpdateDBService(ctx, dbService, currentUserUid)
if err != nil {
d.log.Errorf("dbService name: %v,UpdateDBServiceByBiz err: %v", dbService.Name, err)
}
Expand Down Expand Up @@ -495,7 +502,7 @@ func (d *DBServiceUsecase) DelDBService(ctx context.Context, dbServiceUid, curre
return fmt.Errorf("user is not project admin or golobal op permission user")
}

err = d.pluginUsecase.OperateDataResourceHandle(ctx, ds.UID, dmsCommonV1.DataResourceTypeDBService, dmsCommonV1.OperationTypeDelete, dmsCommonV1.OperationTimingTypeBefore)
err = d.pluginUsecase.DelDBServicePreCheck(ctx, ds.UID)
if err != nil {
return fmt.Errorf("plugin handle before delete db_service err: %v", err)
}
Expand All @@ -504,7 +511,7 @@ func (d *DBServiceUsecase) DelDBService(ctx context.Context, dbServiceUid, curre
return fmt.Errorf("delete data service error: %v", err)
}

err = d.pluginUsecase.OperateDataResourceHandle(ctx, ds.UID, dmsCommonV1.DataResourceTypeDBService, dmsCommonV1.OperationTypeDelete, dmsCommonV1.OperationTimingAfter)
err = d.pluginUsecase.DelDBServiceAfterHandle(ctx, ds.UID)
if err != nil {
return fmt.Errorf("plugin handle after delete db_service err: %v", err)
}
Expand Down Expand Up @@ -574,7 +581,7 @@ func (d *DBServiceUsecase) TestDbServiceConnection(ctx context.Context, dbServic
return connectionResult, nil
}

func (d *DBServiceUsecase) UpdateDBServiceByBiz(ctx context.Context, ds *DBService, currentUserUid string) (err error) {
func (d *DBServiceUsecase) UpdateDBService(ctx context.Context, ds *DBService, currentUserUid string) (err error) {
// 检查项目是否归档/删除
if err := d.projectUsecase.isProjectActive(ctx, ds.ProjectUID); err != nil {
return fmt.Errorf("update db service error: %v", err)
Expand All @@ -591,15 +598,15 @@ func (d *DBServiceUsecase) UpdateDBServiceByBiz(ctx context.Context, ds *DBServi
return fmt.Errorf("update db service error: %v", err)
}

err = d.pluginUsecase.OperateDataResourceHandle(ctx, ds.UID, dmsCommonV1.DataResourceTypeDBService, dmsCommonV1.OperationTypeUpdate, dmsCommonV1.OperationTimingAfter)
err = d.pluginUsecase.UpdateDBServiceAfterHandle(ctx, ds.UID)
if err != nil {
return fmt.Errorf("plugin handle after update db_service err: %v", err)
}

return nil
}

func (d *DBServiceUsecase) UpdateDBService(ctx context.Context, dbServiceUid string, updateDBService *BizDBServiceArgs, currentUserUid string) (err error) {
func (d *DBServiceUsecase) UpdateDBServiceByArgs(ctx context.Context, dbServiceUid string, updateDBService *BizDBServiceArgs, currentUserUid string) (err error) {
ds, err := d.repo.GetDBService(ctx, dbServiceUid)
if err != nil {
return fmt.Errorf("get db service failed: %v", err)
Expand Down Expand Up @@ -662,7 +669,7 @@ func (d *DBServiceUsecase) UpdateDBService(ctx context.Context, dbServiceUid str
return fmt.Errorf("update db service error: %v", err)
}

err = d.pluginUsecase.OperateDataResourceHandle(ctx, ds.UID, dmsCommonV1.DataResourceTypeDBService, dmsCommonV1.OperationTypeUpdate, dmsCommonV1.OperationTimingAfter)
err = d.pluginUsecase.UpdateDBServiceAfterHandle(ctx, ds.UID)
if err != nil {
return fmt.Errorf("plugin handle after update db_service err: %v", err)
}
Expand Down
207 changes: 94 additions & 113 deletions internal/dms/biz/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package biz

import (
"context"
"encoding/json"
"fmt"
"sync"

pkgConst "github.com/actiontech/dms/internal/dms/pkg/constant"

Expand All @@ -25,11 +27,10 @@ type PluginUsecase struct {
}

type Plugin struct {
Name string
AddDBServicePreCheckUrl string
DelDBServicePreCheckUrl string
DelUserPreCheckUrl string
DelUserGroupPreCheckUrl string
Name string
// 该地址目的是统一调用其他服务 数据资源变更前后校验/更新数据的 接口
// eg: 删除数据源前:
// 需要sqle服务中实现接口逻辑,判断该数据源上已经没有进行中的工单
OperateDataResourceHandleUrl string
}

Expand Down Expand Up @@ -80,174 +81,154 @@ func (p *PluginUsecase) RegisterPlugin(ctx context.Context, plugin *Plugin, curr
return nil
}

func (p *PluginUsecase) AddDBServicePreCheck(ctx context.Context, ds *DBService) error {
dbService := &dmsV1.IPluginDBService{
Name: ds.Name,
DBType: ds.DBType,
Host: ds.Host,
Port: ds.Port,
User: ds.User,
Business: ds.Business,
}
if ds.SQLEConfig != nil {
dbService.SQLERuleTemplateName = ds.SQLEConfig.RuleTemplateName
dbService.SQLERuleTemplateId = ds.SQLEConfig.RuleTemplateID
}
for _, plugin := range p.registeredPlugins {
if plugin.AddDBServicePreCheckUrl != "" {
if err := p.CallAddDBServicePreCheck(ctx, plugin.AddDBServicePreCheckUrl, dbService); err != nil {
return fmt.Errorf("plugin %s add db service pre check failed: %v", plugin.Name, err)
}
}
func (p *PluginUsecase) AddProjectAfterHandle(ctx context.Context, ProjectUid string) error {
if err := p.OperateDataResourceHandle(ctx, ProjectUid, nil, dmsV1.DataResourceTypeProject, dmsV1.OperationTypeCreate, dmsV1.OperationTimingTypeAfter); err != nil {
return fmt.Errorf("add project handle failed: %v", err)
}
return nil
}

func (p *PluginUsecase) DelDBServicePreCheck(ctx context.Context, dbServiceUid string) error {
for _, plugin := range p.registeredPlugins {
if plugin.DelDBServicePreCheckUrl != "" {
if err := p.CallDelDBServicePreCheck(ctx, plugin.DelDBServicePreCheckUrl, dbServiceUid); err != nil {
return fmt.Errorf("plugin %s del db service pre check failed: %v", plugin.Name, err)
}
}
func (p *PluginUsecase) UpdateProjectPreCheck(ctx context.Context, project *Project) error {
// 项目归档
if err := p.OperateDataResourceHandle(ctx, project.UID, dmsV1.IPluginProject{
Name: project.Name,
Archived: project.Status == ProjectStatusArchived,
Desc: project.Desc,
}, dmsV1.DataResourceTypeProject, dmsV1.OperationTypeUpdate, dmsV1.OperationTimingTypeBefore); err != nil {
return fmt.Errorf("update project handle failed: %v", err)
}
return nil
}

func (p *PluginUsecase) DelUserPreCheck(ctx context.Context, userUid string) error {
for _, plugin := range p.registeredPlugins {
if plugin.DelUserPreCheckUrl != "" {
if err := p.CallDelUserPreCheck(ctx, plugin.DelUserPreCheckUrl, userUid); err != nil {
return fmt.Errorf("plugin %s del user pre check failed: %v", plugin.Name, err)
}
}
}
func (p *PluginUsecase) UpdateProjectAfterHandle(ctx context.Context, projectUid string) error {
return nil
}

func (p *PluginUsecase) DelUserGroupPreCheck(ctx context.Context, groupUid string) error {
for _, plugin := range p.registeredPlugins {
if plugin.DelUserGroupPreCheckUrl != "" {
if err := p.CallDelUserGroupPreCheck(ctx, plugin.DelUserGroupPreCheckUrl, groupUid); err != nil {
return fmt.Errorf("plugin %s del user group pre check failed: %v", plugin.Name, err)
}
}
func (p *PluginUsecase) DelProjectPreCheck(ctx context.Context, projectUid string) error {
if err := p.OperateDataResourceHandle(ctx, projectUid, nil, dmsV1.DataResourceTypeProject, dmsV1.OperationTypeDelete, dmsV1.OperationTimingTypeBefore); err != nil {
return fmt.Errorf("del project pre check failed: %v", err)
}
return nil
}

func (p *PluginUsecase) OperateDataResourceHandle(ctx context.Context, uid string, dateResourceType dmsV1.DataResourceType,
operationType dmsV1.OperationType, operationTiming dmsV1.OperationTimingType) error {
for _, plugin := range p.registeredPlugins {
if plugin.OperateDataResourceHandleUrl != "" {
if err := p.CallOperateDataResourceHandle(ctx, plugin.OperateDataResourceHandleUrl, uid, dateResourceType, operationType, operationTiming); err != nil {
return fmt.Errorf("call plugin %s operate data resource handle failed: %v", plugin.Name, err)
}
}
func (p *PluginUsecase) DelProjectAfterHandle(ctx context.Context, projectUid string) error {
if err := p.OperateDataResourceHandle(ctx, projectUid, nil, dmsV1.DataResourceTypeProject, dmsV1.OperationTypeDelete, dmsV1.OperationTimingTypeAfter); err != nil {
return fmt.Errorf("del project handle failed: %v", err)
}
return nil
}

func (p *PluginUsecase) CallAddDBServicePreCheck(ctx context.Context, url string, ds *dmsV1.IPluginDBService) error {
header := map[string]string{
"Authorization": pkgHttp.DefaultDMSToken,
func (p *PluginUsecase) AddDBServicePreCheck(ctx context.Context, ds *DBService) error {
dbService := &dmsV1.IPluginDBService{
Name: ds.Name,
DBType: ds.DBType,
Host: ds.Host,
Port: ds.Port,
User: ds.User,
Business: ds.Business,
AdditionalParams: ds.AdditionalParams,
}

reqBody := struct {
DBService *dmsV1.IPluginDBService `json:"db_service"`
}{
DBService: ds,
if ds.SQLEConfig != nil {
dbService.SQLERuleTemplateName = ds.SQLEConfig.RuleTemplateName
dbService.SQLERuleTemplateId = ds.SQLEConfig.RuleTemplateID
}

reply := &dmsV1.AddDBServicePreCheckReply{}

if err := pkgHttp.Get(ctx, url, header, reqBody, reply); err != nil {
return err
}
if reply.Code != 0 {
return fmt.Errorf("reply code(%v) error: %v", reply.Code, reply.Message)
if err := p.OperateDataResourceHandle(ctx, "", dbService, dmsV1.DataResourceTypeDBService, dmsV1.OperationTypeCreate, dmsV1.OperationTimingTypeBefore); err != nil {
return fmt.Errorf("add db service pre check failed: %v", err)
}

return nil
}

func (p *PluginUsecase) CallDelDBServicePreCheck(ctx context.Context, url string, dbServiceUid string) error {
header := map[string]string{
"Authorization": pkgHttp.DefaultDMSToken,
func (p *PluginUsecase) AddDBServiceAfterHandle(ctx context.Context, dbServiceUid string) error {
if err := p.OperateDataResourceHandle(ctx, dbServiceUid, nil, dmsV1.DataResourceTypeDBService, dmsV1.OperationTypeCreate, dmsV1.OperationTimingTypeAfter); err != nil {
return fmt.Errorf("add db service handle failed: %v", err)
}

reqBody := struct {
DBServiceUid string `json:"db_service_uid"`
}{
DBServiceUid: dbServiceUid,
}
return nil
}

reply := &dmsV1.DelDBServicePreCheckReply{}
func (p *PluginUsecase) UpdateDBServicePreCheck(ctx context.Context, ds *DBService) error {
return nil
}

if err := pkgHttp.Get(ctx, url, header, reqBody, reply); err != nil {
return err
func (p *PluginUsecase) UpdateDBServiceAfterHandle(ctx context.Context, dbServiceUid string) error {
if err := p.OperateDataResourceHandle(ctx, dbServiceUid, nil, dmsV1.DataResourceTypeDBService, dmsV1.OperationTypeUpdate, dmsV1.OperationTimingTypeAfter); err != nil {
return fmt.Errorf("update db service handle failed: %v", err)
}
if reply.Code != 0 {
return fmt.Errorf("reply code(%v) error: %v", reply.Code, reply.Message)
}

return nil
}

func (p *PluginUsecase) CallDelUserPreCheck(ctx context.Context, url string, userUid string) error {
header := map[string]string{
"Authorization": pkgHttp.DefaultDMSToken,
func (p *PluginUsecase) DelDBServicePreCheck(ctx context.Context, dbServiceUid string) error {
if err := p.OperateDataResourceHandle(ctx, dbServiceUid, nil, dmsV1.DataResourceTypeDBService, dmsV1.OperationTypeDelete, dmsV1.OperationTimingTypeBefore); err != nil {
return fmt.Errorf("del db service pre check failed: %v", err)
}
return nil
}

reqBody := struct {
UserUid string `json:"user_uid"`
}{
UserUid: userUid,
func (p *PluginUsecase) DelDBServiceAfterHandle(ctx context.Context, dbServiceUid string) error {
if err := p.OperateDataResourceHandle(ctx, dbServiceUid, nil, dmsV1.DataResourceTypeDBService, dmsV1.OperationTypeDelete, dmsV1.OperationTimingTypeAfter); err != nil {
return fmt.Errorf("del db service handle failed: %v", err)
}
return nil
}

reply := &dmsV1.DelUserPreCheckReply{}

if err := pkgHttp.Get(ctx, url, header, reqBody, reply); err != nil {
return err
}
if reply.Code != 0 {
return fmt.Errorf("reply code(%v) error: %v", reply.Code, reply.Message)
func (p *PluginUsecase) DelUserPreCheck(ctx context.Context, userUid string) error {
if err := p.OperateDataResourceHandle(ctx, userUid, nil, dmsV1.DataResourceTypeUser, dmsV1.OperationTypeDelete, dmsV1.OperationTimingTypeBefore); err != nil {
return fmt.Errorf("del user pre check failed: %v", err)
}
return nil
}

func (p *PluginUsecase) DelUserGroupPreCheck(ctx context.Context, groupUid string) error {
return nil
}

func (p *PluginUsecase) CallDelUserGroupPreCheck(ctx context.Context, url string, userGroupUid string) error {
header := map[string]string{
"Authorization": pkgHttp.DefaultDMSToken,
}
func (p *PluginUsecase) OperateDataResourceHandle(ctx context.Context, uid string, resource interface{}, dateResourceType dmsV1.DataResourceType,
operationType dmsV1.OperationType, operationTiming dmsV1.OperationTimingType) error {
var (
mu sync.Mutex
errs []error
wg sync.WaitGroup
)

reqBody := struct {
UserGroupUid string `json:"user_group_uid"`
}{
UserGroupUid: userGroupUid,
for _, plugin := range p.registeredPlugins {
if plugin.OperateDataResourceHandleUrl != "" {
wg.Add(1)
go func(plugin *Plugin) {
defer wg.Done()
if err := p.CallOperateDataResourceHandle(ctx, plugin.OperateDataResourceHandleUrl, uid, resource, dateResourceType, operationType, operationTiming); err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("call plugin %s operate data resource handle failed: %v", plugin.Name, err))
mu.Unlock()
}
}(plugin)
}
}
reply := &dmsV1.DelUserGroupPreCheckReply{}

if err := pkgHttp.Get(ctx, url, header, reqBody, reply); err != nil {
return err
}
if reply.Code != 0 {
return fmt.Errorf("reply code(%v) error: %v", reply.Code, reply.Message)
wg.Wait()

if len(errs) > 0 {
return fmt.Errorf("encountered errors: %v", errs)
}

return nil
}

func (p *PluginUsecase) CallOperateDataResourceHandle(ctx context.Context, url string, dataResourceUid string, dataResourceType dmsV1.DataResourceType, operationType dmsV1.OperationType, operationTiming dmsV1.OperationTimingType) error {
func (p *PluginUsecase) CallOperateDataResourceHandle(ctx context.Context, url string, dataResourceUid string, resource interface{}, dataResourceType dmsV1.DataResourceType, operationType dmsV1.OperationType, operationTiming dmsV1.OperationTimingType) error {
header := map[string]string{
"Authorization": pkgHttp.DefaultDMSToken,
}
extraParams, err := json.Marshal(resource)
if err != nil {
return fmt.Errorf("marshal resource failed: %v", err)
}
operateDataResourceHandleReq := dmsV1.OperateDataResourceHandleReq{
DataResourceUid: dataResourceUid,
DataResourceType: dataResourceType,
OperationType: operationType,
OperationTiming: operationTiming,
ExtraParams: string(extraParams),
}
reply := &dmsV1.OperateDataResourceHandleReply{}

Expand Down
Loading

0 comments on commit b1e07f1

Please sign in to comment.