Skip to content

Commit

Permalink
Merge pull request #69 from actiontech/issuse-1024
Browse files Browse the repository at this point in the history
fix: cloudbeaver
  • Loading branch information
LordofAvernus authored Nov 3, 2023
2 parents 5edd4f1 + 1718ead commit db7eb46
Show file tree
Hide file tree
Showing 3 changed files with 85 additions and 22 deletions.
72 changes: 58 additions & 14 deletions internal/dms/biz/cloudbeaver.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,13 +58,14 @@ type CloudbeaverConnection struct {
type CloudbeaverRepo interface {
GetCloudbeaverUserByID(ctx context.Context, cloudbeaverUserId string) (*CloudbeaverUser, bool, error)
UpdateCloudbeaverUserCache(ctx context.Context, u *CloudbeaverUser) error
GetDbServiceIdByConnectionId(ctx context.Context, connectionId string) (string, error)
GetCloudbeaverConnectionByDMSDBServiceIds(ctx context.Context, dmsDBServiceIds []string) ([]*CloudbeaverConnection, error)
UpdateCloudbeaverConnectionCache(ctx context.Context, u *CloudbeaverConnection) error
}

type CloudbeaverUsecase struct {
graphQl cloudbeaver.GraphQLImpl
cloudbeaverCfg CloudbeaverCfg
cloudbeaverCfg *CloudbeaverCfg
log *utilLog.Helper
userUsecase *UserUsecase
dbServiceUsecase *DBServiceUsecase
Expand All @@ -73,7 +74,7 @@ type CloudbeaverUsecase struct {
proxyTargetRepo ProxyTargetRepo
}

func NewCloudbeaverUsecase(log utilLog.Logger, cfg CloudbeaverCfg, userUsecase *UserUsecase, dbServiceUsecase *DBServiceUsecase, opPermissionVerifyUsecase *OpPermissionVerifyUsecase, cloudbeaverRepo CloudbeaverRepo, proxyTargetRepo ProxyTargetRepo) (cu *CloudbeaverUsecase) {
func NewCloudbeaverUsecase(log utilLog.Logger, cfg *CloudbeaverCfg, userUsecase *UserUsecase, dbServiceUsecase *DBServiceUsecase, opPermissionVerifyUsecase *OpPermissionVerifyUsecase, cloudbeaverRepo CloudbeaverRepo, proxyTargetRepo ProxyTargetRepo) (cu *CloudbeaverUsecase) {
cu = &CloudbeaverUsecase{
repo: cloudbeaverRepo,
proxyTargetRepo: proxyTargetRepo,
Expand All @@ -92,24 +93,26 @@ func (cu *CloudbeaverUsecase) GetRootUri() string {
}

func (cu *CloudbeaverUsecase) IsCloudbeaverConfigured() bool {
if cu.cloudbeaverCfg == nil {
return false
}

return cu.cloudbeaverCfg.Host != "" && cu.cloudbeaverCfg.Port != "" && cu.cloudbeaverCfg.AdminUser != "" && cu.cloudbeaverCfg.AdminPassword != ""
}

var graphQLOnce = &sync.Once{}

func (cu *CloudbeaverUsecase) initialGraphQL() {
func (cu *CloudbeaverUsecase) initialGraphQL() error {
if cu.IsCloudbeaverConfigured() && cu.graphQl == nil {
graphQLOnce.Do(func() {
graphQl, err := cloudbeaver.NewGraphQL(cu.getGraphQLServerURI())
if err != nil {
cu.log.Errorf("NewGraphQL err: %v", err)
graphQl, graphQlErr := cloudbeaver.NewGraphQL(cu.getGraphQLServerURI())
if graphQlErr != nil {
cu.log.Errorf("NewGraphQL err: %v", graphQlErr)

return
}
return fmt.Errorf("initial graphql client err: %v", graphQlErr)
}

cu.graphQl = graphQl
})
cu.graphQl = graphQl
}

return nil
}

func (cu *CloudbeaverUsecase) getGraphQLServerURI() string {
Expand Down Expand Up @@ -142,7 +145,9 @@ func (cu *CloudbeaverUsecase) Login() echo.MiddlewareFunc {
return errors.New("get user name from token failed")
}

cu.initialGraphQL()
if err = cu.initialGraphQL(); err != nil {
return err
}

cloudbeaverSessionId := cu.getCloudbeaverSession(dmsUserId, dmsToken)
if cloudbeaverSessionId != "" {
Expand Down Expand Up @@ -281,6 +286,18 @@ func (cu *CloudbeaverUsecase) GraphQLDistributor() echo.MiddlewareFunc {
}

if cloudbeaverHandle.UseLocalHandler {
if params.OperationName == "asyncSqlExecuteQuery" {
isEnableSqlAudit, err := cu.isEnableSQLAudit(c.Request().Context(), params)
if err != nil {
cu.log.Error(err)
return err
}

if !isEnableSqlAudit {
return next(c)
}
}

params.ReadTime = graphql.TraceTiming{
Start: graphql.Now(),
End: graphql.Now(),
Expand Down Expand Up @@ -351,6 +368,33 @@ func (cu *CloudbeaverUsecase) GraphQLDistributor() echo.MiddlewareFunc {
}
}

func (cu *CloudbeaverUsecase) isEnableSQLAudit(ctx context.Context, params *graphql.RawParams) (bool, error) {
var connectionId interface{}
var connectionIdStr string
var ok bool

connectionId, ok = params.Variables["connectionId"]
if !ok {
return false, fmt.Errorf("missing connectionId in %s query", params.OperationName)
}

connectionIdStr, ok = connectionId.(string)
if !ok {
return false, fmt.Errorf("connectionId %s convert failed", connectionId)
}
dbServiceId, err := cu.repo.GetDbServiceIdByConnectionId(ctx, connectionIdStr)
if err != nil {
return false, err
}

dbService, err := cu.dbServiceUsecase.GetDBService(ctx, dbServiceId)
if err != nil {
return false, err
}

return dbService.SQLEConfig.SQLQueryConfig.AuditEnabled, nil
}

type ActiveUserQueryRes struct {
User interface{} `json:"user"`
}
Expand Down
19 changes: 11 additions & 8 deletions internal/dms/service/cloudbeaver.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ type CloudbeaverService struct {
}

func NewAndInitCloudbeaverService(logger utilLog.Logger, opts *conf.DMSOptions) (*CloudbeaverService, error) {
cfg := biz.CloudbeaverCfg{
EnableHttps: opts.CloudbeaverOpts.EnableHttps,
Host: opts.CloudbeaverOpts.Host,
Port: opts.CloudbeaverOpts.Port,
AdminUser: opts.CloudbeaverOpts.AdminUser,
AdminPassword: opts.CloudbeaverOpts.AdminPassword,
}

// todo: because cloudbeaver required userUsecase, optimisation may be needed here
st, err := storage.NewStorage(logger, &storage.StorageConfig{
User: opts.ServiceOpts.Database.UserName,
Expand Down Expand Up @@ -64,6 +56,17 @@ func NewAndInitCloudbeaverService(logger utilLog.Logger, opts *conf.DMSOptions)
opPermissionUsecase := biz.NewOpPermissionUsecase(logger, tx, opPermissionRepo, pluginUseCase)
userUsecase := biz.NewUserUsecase(logger, tx, userRepo, userGroupRepo, pluginUseCase, opPermissionUsecase, opPermissionVerifyUsecase, ldapConfigurationUsecase)

var cfg *biz.CloudbeaverCfg
if opts.CloudbeaverOpts != nil {
cfg = &biz.CloudbeaverCfg{
EnableHttps: opts.CloudbeaverOpts.EnableHttps,
Host: opts.CloudbeaverOpts.Host,
Port: opts.CloudbeaverOpts.Port,
AdminUser: opts.CloudbeaverOpts.AdminUser,
AdminPassword: opts.CloudbeaverOpts.AdminPassword,
}
}

cloudbeaverRepo := storage.NewCloudbeaverRepo(logger, st)
cloudbeaverUsecase := biz.NewCloudbeaverUsecase(logger, cfg, userUsecase, dbServiceUseCase, opPermissionVerifyUsecase, cloudbeaverRepo, dmsProxyTargetRepo)
proxyUsecase := biz.NewCloudbeaverProxyUsecase(logger, cloudbeaverUsecase)
Expand Down
16 changes: 16 additions & 0 deletions internal/dms/storage/cloudbeaver.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ func (cr *CloudbeaverRepo) UpdateCloudbeaverUserCache(ctx context.Context, u *bi
})
}

func (cr *CloudbeaverRepo) GetDbServiceIdByConnectionId(ctx context.Context, connectionId string) (string, error) {
var cloudbeaverConnection model.CloudbeaverConnectionCache
err := transaction(cr.log, ctx, cr.db, func(tx *gorm.DB) error {
if err := tx.Where("cloudbeaver_connection_id = ?", connectionId).First(&cloudbeaverConnection).Error; err != nil {
return fmt.Errorf("failed to get cloudbeaver db_service_id: %v", err)
}
return nil
})

if err != nil {
return "", err
}

return cloudbeaverConnection.DMSDBServiceID, nil
}

func (cr *CloudbeaverRepo) GetCloudbeaverConnectionByDMSDBServiceIds(ctx context.Context, dmsDBServiceIds []string) ([]*biz.CloudbeaverConnection, error) {
var cloudbeaverConnections []*model.CloudbeaverConnectionCache
err := transaction(cr.log, ctx, cr.db, func(tx *gorm.DB) error {
Expand Down

0 comments on commit db7eb46

Please sign in to comment.