Skip to content

Commit

Permalink
Merge pull request #343 from actiontech/optimize_plugin_ce
Browse files Browse the repository at this point in the history
Optimize plugin ce
  • Loading branch information
LordofAvernus authored Dec 27, 2024
2 parents e687b79 + 26e7c04 commit 33b2116
Show file tree
Hide file tree
Showing 10 changed files with 234 additions and 91 deletions.
12 changes: 2 additions & 10 deletions internal/apiserver/conf/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
dmsCommonConf "github.com/actiontech/dms/pkg/dms-common/conf"
utilConf "github.com/actiontech/dms/pkg/dms-common/pkg/config"
utilLog "github.com/actiontech/dms/pkg/dms-common/pkg/log"
pkgParams "github.com/actiontech/dms/pkg/params"
)

type Options struct {
Expand All @@ -21,9 +20,8 @@ type SQLEOptions struct {

type DMSOptions struct {
dmsCommonConf.BaseOptions `yaml:",inline"`
CloudbeaverOpts *CloudbeaverOpts `yaml:"cloudbeaver"`
ServiceOpts *ServiceOptions `yaml:"service"`
DatabaseDriverOptions []DatabaseDriverOption `yaml:"database_driver_options"`
CloudbeaverOpts *CloudbeaverOpts `yaml:"cloudbeaver"`
ServiceOpts *ServiceOptions `yaml:"service"`
}

type CloudbeaverOpts struct {
Expand Down Expand Up @@ -52,12 +50,6 @@ type ServiceOptions struct {
} `yaml:"log"`
}

type DatabaseDriverOption struct {
DbType string `yaml:"db_type"`
LogoPath string `yaml:"logo_path"`
Params pkgParams.Params `yaml:"params"`
}

var optimizationEnabled bool

func IsOptimizationEnabled() bool {
Expand Down
30 changes: 22 additions & 8 deletions internal/dms/biz/db_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"time"

dmsV1 "github.com/actiontech/dms/api/dms/service/v1"
"github.com/actiontech/dms/internal/apiserver/conf"
pkgConst "github.com/actiontech/dms/internal/dms/pkg/constant"
"github.com/actiontech/dms/internal/pkg/locale"
v1Base "github.com/actiontech/dms/pkg/dms-common/api/base/v1"
Expand Down Expand Up @@ -166,18 +165,16 @@ type DBServiceUsecase struct {
pluginUsecase *PluginUsecase
opPermissionVerifyUsecase *OpPermissionVerifyUsecase
projectUsecase *ProjectUsecase
databaseDriverOptions []conf.DatabaseDriverOption
log *utilLog.Helper
}

func NewDBServiceUsecase(log utilLog.Logger, repo DBServiceRepo, pluginUsecase *PluginUsecase, opPermissionVerifyUsecase *OpPermissionVerifyUsecase, projectUsecase *ProjectUsecase, proxyTargetRepo ProxyTargetRepo, databaseDriverOptions []conf.DatabaseDriverOption) *DBServiceUsecase {
func NewDBServiceUsecase(log utilLog.Logger, repo DBServiceRepo, pluginUsecase *PluginUsecase, opPermissionVerifyUsecase *OpPermissionVerifyUsecase, projectUsecase *ProjectUsecase, proxyTargetRepo ProxyTargetRepo) *DBServiceUsecase {
return &DBServiceUsecase{
repo: repo,
opPermissionVerifyUsecase: opPermissionVerifyUsecase,
pluginUsecase: pluginUsecase,
projectUsecase: projectUsecase,
dmsProxyTargetRepo: proxyTargetRepo,
databaseDriverOptions: databaseDriverOptions,
log: utilLog.NewHelper(log, utilLog.WithMessageKey("biz.dbService")),
}
}
Expand Down Expand Up @@ -450,8 +447,12 @@ func (d *DBServiceUsecase) ListDBServiceTips(ctx context.Context, req *dmsV1.Lis
return ret, nil
}

func (d *DBServiceUsecase) ListDBServiceDriverOption(ctx context.Context) ([]conf.DatabaseDriverOption, error) {
return d.databaseDriverOptions, nil
func (d *DBServiceUsecase) ListDBServiceDriverOption(ctx context.Context) ([]*dmsV1.DatabaseDriverOption, error) {
options, err := d.pluginUsecase.GetDatabaseDriverOptionsHandle(ctx)
if err != nil {
return nil, err
}
return options, nil
}

func (d *DBServiceUsecase) GetDriverParamsByDBType(ctx context.Context, dbType string) (pkgParams.Params, error) {
Expand All @@ -460,14 +461,27 @@ func (d *DBServiceUsecase) GetDriverParamsByDBType(ctx context.Context, dbType s
return nil, err
}
for _, driverOptions := range databaseOptions {
if driverOptions.DbType == dbType {
return driverOptions.Params, nil
if driverOptions.DBType == dbType {
return convertAdditionParamsToParams(driverOptions.Params), nil
}

}
return nil, fmt.Errorf("db type %v is not support", dbType)
}

func convertAdditionParamsToParams(additionalParam []*dmsV1.DatabaseDriverAdditionalParam) pkgParams.Params {
params := make(pkgParams.Params, len(additionalParam))
for i, item := range additionalParam {
params[i] = &pkgParams.Param{
Key: item.Name,
Value: item.Value,
Desc: item.Description,
Type: pkgParams.ParamType(item.Type),
}
}
return params
}

func (d *DBServiceUsecase) GetActiveDBServices(ctx context.Context, dbServiceIds []string) (dbServices []*DBService, err error) {
services, err := d.repo.GetDBServicesByIds(ctx, dbServiceIds)
if err != nil {
Expand Down
143 changes: 142 additions & 1 deletion internal/dms/biz/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"sync"

v1 "github.com/actiontech/dms/api/dms/service/v1"
pkgConst "github.com/actiontech/dms/internal/dms/pkg/constant"

dmsV1 "github.com/actiontech/dms/pkg/dms-common/api/dms/v1"
_const "github.com/actiontech/dms/pkg/dms-common/pkg/const"
pkgHttp "github.com/actiontech/dms/pkg/dms-common/pkg/http"

utilLog "github.com/actiontech/dms/pkg/dms-common/pkg/log"
Expand All @@ -32,6 +34,7 @@ type Plugin struct {
// eg: 删除数据源前:
// 需要sqle服务中实现接口逻辑,判断该数据源上已经没有进行中的工单
OperateDataResourceHandleUrl string
GetDatabaseDriverOptionsUrl string
}

func (p *Plugin) String() string {
Expand Down Expand Up @@ -241,3 +244,141 @@ func (p *PluginUsecase) CallOperateDataResourceHandle(ctx context.Context, url s

return nil
}

const LogoPath = "/logo/"

var databaseDriverOptions []*v1.DatabaseDriverOption

func (p *PluginUsecase) GetDatabaseDriverOptionsCache() []*v1.DatabaseDriverOption {
return databaseDriverOptions
}

func (p *PluginUsecase) ClearDatabaseDriverOptionsCache() {
databaseDriverOptions = []*v1.DatabaseDriverOption{}
}

func (p *PluginUsecase) GetDatabaseDriverOptionsHandle(ctx context.Context) ([]*v1.DatabaseDriverOption, error) {
cacheOptions := p.GetDatabaseDriverOptionsCache()
if len(cacheOptions) != 0 {
return cacheOptions, nil
}
var (
mu sync.Mutex
errs []error
wg sync.WaitGroup
dbOptions []struct {
options []*v1.DatabaseDriverOption
source string
}
)

for _, plugin := range p.registeredPlugins {
if plugin.GetDatabaseDriverOptionsUrl != "" {
wg.Add(1)
go func(plugin *Plugin) {
defer wg.Done()
op, err := p.CallDatabaseDriverOptionsHandle(ctx, plugin.GetDatabaseDriverOptionsUrl)
if err != nil {
mu.Lock()
errs = append(errs, fmt.Errorf("call plugin %s get database driver options handle failed: %v", plugin.Name, err))
mu.Unlock()
return
}
mu.Lock()
dbOptions = append(dbOptions, struct {
options []*v1.DatabaseDriverOption
source string
}{
options: op,
source: plugin.Name,
})
mu.Unlock()
}(plugin)
}
}

wg.Wait()

if len(errs) > 0 {
return nil, fmt.Errorf("encountered errors: %v", errs)
}
databaseDriverOptions = append(databaseDriverOptions, aggregateOptions(dbOptions)...)
return databaseDriverOptions, nil
}

// 根据数据库类型合并各插件的options
func aggregateOptions(optionRes []struct {
options []*v1.DatabaseDriverOption
source string
}) []*v1.DatabaseDriverOption {
dbTypeMap := make(map[string]*v1.DatabaseDriverOption)

for _, res := range optionRes {
for _, opt := range res.options {
if aggOpt, exists := dbTypeMap[opt.DBType]; exists {
// 聚合Params, 合并时如有重复以sqle为主
aggOpt.Params = mergeParamsByName(aggOpt.Params, opt.Params, res.source == _const.SqleComponentName)
} else {
dbTypeMap[opt.DBType] = &v1.DatabaseDriverOption{
DBType: opt.DBType,
LogoPath: LogoPath + getLogoFileNameByDBType(opt.DBType),
Params: opt.Params,
}
}
}
}

// 转换为切片返回
result := make([]*v1.DatabaseDriverOption, 0, len(dbTypeMap))
for _, opt := range dbTypeMap {
result = append(result, opt)
}
return result
}

func getLogoFileNameByDBType(dbType string) string {
return strings.ToLower(strings.ReplaceAll(dbType, " ", "_")) + ".png"
}

// 根据参数名合并additional和params, overwriteExisting代表是不是要以新参数覆盖旧参数
func mergeParamsByName(existing, newParams []*v1.DatabaseDriverAdditionalParam, overwriteExisting bool) []*v1.DatabaseDriverAdditionalParam {
paramMap := make(map[string]*v1.DatabaseDriverAdditionalParam)

// 添加已有参数
for _, param := range existing {
paramMap[param.Name] = param
}

// 合并新参数
for _, param := range newParams {
if _, exists := paramMap[param.Name]; exists && overwriteExisting {
newAggParam := *param
paramMap[param.Name] = &newAggParam // 覆盖已有参数
} else if !exists {
paramMap[param.Name] = param
}
}

// 转换为切片返回
result := make([]*v1.DatabaseDriverAdditionalParam, 0, len(paramMap))
for _, param := range paramMap {
result = append(result, param)
}
return result
}

func (p *PluginUsecase) CallDatabaseDriverOptionsHandle(ctx context.Context, url string) ([]*v1.DatabaseDriverOption, error) {
header := map[string]string{
"Authorization": pkgHttp.DefaultDMSToken,
}
reply := &v1.ListDBServiceDriverOptionReply{}

if err := pkgHttp.Get(ctx, url, header, nil, reply); err != nil {
return nil, err
}
if reply.Code != 0 {
return nil, fmt.Errorf("reply code(%v) error: %v", reply.Code, reply.Message)
}

return reply.Data, nil
}
2 changes: 1 addition & 1 deletion internal/dms/service/cloudbeaver.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ func NewAndInitCloudbeaverService(logger utilLog.Logger, opts *conf.DMSOptions)
projectRepo := storage.NewProjectRepo(logger, st)
projectUsecase := biz.NewProjectUsecase(logger, tx, projectRepo, memberUsecase, opPermissionVerifyUsecase, pluginUseCase)
dbServiceRepo := storage.NewDBServiceRepo(logger, st)
dbServiceUseCase := biz.NewDBServiceUsecase(logger, dbServiceRepo, pluginUseCase, opPermissionVerifyUsecase, projectUsecase, dmsProxyTargetRepo, opts.DatabaseDriverOptions)
dbServiceUseCase := biz.NewDBServiceUsecase(logger, dbServiceRepo, pluginUseCase, opPermissionVerifyUsecase, projectUsecase, dmsProxyTargetRepo)

ldapConfigurationRepo := storage.NewLDAPConfigurationRepo(logger, st)
ldapConfigurationUsecase := biz.NewLDAPConfigurationUsecase(logger, tx, ldapConfigurationRepo)
Expand Down
14 changes: 2 additions & 12 deletions internal/dms/service/db_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,20 +610,10 @@ func (d *DMSService) ListDBServiceDriverOption(ctx context.Context) (reply *dmsV

ret := make([]*dmsV1.DatabaseDriverOption, 0, len(options))
for _, item := range options {
additionalParams := make([]*dmsV1.DatabaseDriverAdditionalParam, 0, len(item.Params))
for _, param := range item.Params {
additionalParams = append(additionalParams, &dmsV1.DatabaseDriverAdditionalParam{
Name: param.Key,
Value: param.Value,
Type: string(param.Type),
Description: param.Desc,
})
}

ret = append(ret, &dmsV1.DatabaseDriverOption{
DBType: item.DbType,
DBType: item.DBType,
LogoPath: item.LogoPath,
Params: additionalParams,
Params: item.Params,
})
}

Expand Down
4 changes: 3 additions & 1 deletion internal/dms/service/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ func (d *DMSService) RegisterDMSPlugin(ctx context.Context, currentUserUid strin
if err := d.PluginUsecase.RegisterPlugin(ctx, &biz.Plugin{
Name: req.Plugin.Name,
OperateDataResourceHandleUrl: req.Plugin.OperateDataResourceHandleUrl,
GetDatabaseDriverOptionsUrl: req.Plugin.GetDatabaseDriverOptionsUrl,
}, currentUserUid); err != nil {
return fmt.Errorf("register dms plugin failed: %v", err)
}

// 当有plugin注册时,初始化切片,重新调用接口获取数据库选项
d.PluginUsecase.ClearDatabaseDriverOptionsCache()
return nil
}
Loading

0 comments on commit 33b2116

Please sign in to comment.