Skip to content

Commit ce3f7a9

Browse files
committed
feat(connectionpool): made username optional
1 parent 94b9b35 commit ce3f7a9

File tree

11 files changed

+365
-83
lines changed

11 files changed

+365
-83
lines changed

CHANGELOG.md

+1
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
## [MAJOR.MINOR.PATCH] - YYYY-MM-DD
44

5+
- Made `ConnectionPool` username field optional, allowing connection pools to use the credentials of the connecting client instead of a fixed service user
56
- Add `Kafka` field `userConfig.kafka_rest_config.consumer_idle_disconnect_timeout`, type `integer`:
67
Specifies the maximum duration (in seconds) a client can remain idle before it is deleted
78
- Change `ServiceIntegration` field `clickhouseKafka.tables`: maxItems ~~`100`~~`400`

Makefile

+4
Original file line numberDiff line numberDiff line change
@@ -324,3 +324,7 @@ endef
324324
PHONY: sweep
325325
sweep: ## Run sweep to remove all resources created by e2e tests.
326326
go run ./sweeper/...
327+
328+
PHONY: fumpt
329+
fumpt: ## Run gofumpt on all go files.
330+
find . -name '*.go' -type f -exec gofumpt -w {} +

api/v1alpha1/connectionpool_types.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type ConnectionPoolSpec struct {
1717

1818
// +kubebuilder:validation:MaxLength=64
1919
// Name of the service user used to connect to the database
20-
Username string `json:"username"`
20+
Username string `json:"username,omitempty"`
2121

2222
// +kubebuilder:validation:Min=1
2323
// +kubebuilder:validation:Max=1000

charts/aiven-operator-crds/templates/aiven.io_connectionpools.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ spec:
156156
- databaseName
157157
- project
158158
- serviceName
159-
- username
160159
type: object
161160
x-kubernetes-validations:
162161
- message:

config/crd/bases/aiven.io_connectionpools.yaml

-1
Original file line numberDiff line numberDiff line change
@@ -156,7 +156,6 @@ spec:
156156
- databaseName
157157
- project
158158
- serviceName
159-
- username
160159
type: object
161160
x-kubernetes-validations:
162161
- message:

controllers/common.go

+10
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,16 @@ func optionalStringPointer(u string) *string {
131131
return &u
132132
}
133133

134+
// NilIfZero returns a pointer to the value, or nil if the value equals its zero value
135+
func NilIfZero[T comparable](v T) *T {
136+
var zero T
137+
if v == zero {
138+
return nil
139+
}
140+
141+
return &v
142+
}
143+
134144
func isAivenServerError(err error) bool {
135145
var status int
136146
var old aiven.Error

controllers/connectionpool_controller.go

+116-64
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010

1111
"github.com/aiven/aiven-go-client/v2"
1212
avngen "github.com/aiven/go-client-codegen"
13+
"github.com/aiven/go-client-codegen/handler/postgresql"
1314
"github.com/aiven/go-client-codegen/handler/service"
1415
corev1 "k8s.io/api/core/v1"
1516
"k8s.io/apimachinery/pkg/api/meta"
@@ -47,85 +48,101 @@ func (r *ConnectionPoolReconciler) SetupWithManager(mgr ctrl.Manager) error {
4748
Complete(r)
4849
}
4950

50-
func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object, refs []client.Object) error {
51-
cp, err := h.convert(obj)
51+
func (h ConnectionPoolHandler) createOrUpdate(ctx context.Context, _ *aiven.Client, avnGen avngen.Client, obj client.Object, _ []client.Object) error {
52+
connPool, err := h.convert(obj)
5253
if err != nil {
5354
return err
5455
}
5556

56-
exists, err := h.exists(ctx, avn, cp)
57+
var (
58+
reason string
59+
exists bool
60+
)
61+
62+
// check if the connection pool already exists
63+
s, err := avnGen.ServiceGet(ctx, connPool.Spec.Project, connPool.Spec.ServiceName)
5764
if err != nil {
58-
return err
65+
return fmt.Errorf("cannot get service: %w", err)
5966
}
60-
var reason string
61-
if !exists {
62-
_, err := avn.ConnectionPools.Create(ctx, cp.Spec.Project, cp.Spec.ServiceName,
63-
aiven.CreateConnectionPoolRequest{
64-
Database: cp.Spec.DatabaseName,
65-
PoolMode: cp.Spec.PoolMode,
66-
PoolName: cp.Name,
67-
PoolSize: cp.Spec.PoolSize,
68-
Username: optionalStringPointer(cp.Spec.Username),
69-
})
70-
if err != nil && !isAlreadyExists(err) {
71-
return err
67+
68+
for _, connP := range s.ConnectionPools {
69+
if connP.PoolName == connPool.Name {
70+
exists = true
71+
break
7272
}
73+
}
74+
75+
if !exists {
7376
reason = "Created"
74-
} else {
75-
_, err := avn.ConnectionPools.Update(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name,
76-
aiven.UpdateConnectionPoolRequest{
77-
Database: cp.Spec.DatabaseName,
78-
PoolMode: cp.Spec.PoolMode,
79-
PoolSize: cp.Spec.PoolSize,
80-
Username: optionalStringPointer(cp.Spec.Username),
81-
})
82-
if err != nil {
83-
return err
77+
req := postgresql.ServicePgbouncerCreateIn{
78+
Database: connPool.Spec.DatabaseName,
79+
PoolMode: postgresql.PoolModeType(connPool.Spec.PoolMode),
80+
PoolName: connPool.Name,
81+
PoolSize: NilIfZero(connPool.Spec.PoolSize),
82+
Username: NilIfZero(connPool.Spec.Username),
8483
}
84+
85+
if err = avnGen.ServicePGBouncerCreate(ctx, connPool.Spec.Project, connPool.Spec.ServiceName, &req); err != nil && !isAlreadyExists(err) {
86+
return fmt.Errorf("cannot create connection pool: %w", err)
87+
}
88+
} else {
8589
reason = "Updated"
90+
req := postgresql.ServicePgbouncerUpdateIn{
91+
Database: optionalStringPointer(connPool.Spec.DatabaseName),
92+
PoolMode: postgresql.PoolModeType(connPool.Spec.PoolMode),
93+
PoolSize: NilIfZero(connPool.Spec.PoolSize),
94+
Username: NilIfZero(connPool.Spec.Username),
95+
}
96+
97+
if err = avnGen.ServicePGBouncerUpdate(ctx, connPool.Spec.Project, connPool.Spec.ServiceName, connPool.Name, &req); err != nil {
98+
return fmt.Errorf("cannot update connection pool: %w", err)
99+
}
86100
}
87101

88-
meta.SetStatusCondition(&cp.Status.Conditions,
102+
meta.SetStatusCondition(&connPool.Status.Conditions,
89103
getInitializedCondition(reason,
90104
"Successfully created or updated the instance in Aiven"))
91105

92-
metav1.SetMetaDataAnnotation(&cp.ObjectMeta,
93-
processedGenerationAnnotation, strconv.FormatInt(cp.GetGeneration(), formatIntBaseDecimal))
106+
metav1.SetMetaDataAnnotation(&connPool.ObjectMeta,
107+
processedGenerationAnnotation, strconv.FormatInt(connPool.GetGeneration(), formatIntBaseDecimal))
94108

95109
return nil
96110
}
97111

98-
func (h ConnectionPoolHandler) delete(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
112+
func (h ConnectionPoolHandler) delete(ctx context.Context, _ *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
99113
cp, err := h.convert(obj)
100114
if err != nil {
101115
return false, err
102116
}
103117

104-
err = avn.ConnectionPools.Delete(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name)
105-
if err != nil && !isNotFound(err) {
118+
if err = avnGen.ServicePGBouncerDelete(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name); err != nil && !isNotFound(err) {
106119
return false, err
107120
}
108121

109122
return true, nil
110123
}
111124

112-
func (h ConnectionPoolHandler) exists(ctx context.Context, avn *aiven.Client, cp *v1alpha1.ConnectionPool) (bool, error) {
113-
conPool, err := avn.ConnectionPools.Get(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Name)
114-
if isNotFound(err) {
115-
return false, nil
116-
}
117-
return conPool != nil, err
118-
}
119-
120-
func (h ConnectionPoolHandler) get(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) {
125+
func (h ConnectionPoolHandler) get(ctx context.Context, _ *aiven.Client, avnGen avngen.Client, obj client.Object) (*corev1.Secret, error) {
121126
connPool, err := h.convert(obj)
122127
if err != nil {
123128
return nil, err
124129
}
125130

126-
cp, err := avn.ConnectionPools.Get(ctx, connPool.Spec.Project, connPool.Spec.ServiceName, connPool.Name)
131+
// Search the connection pool
132+
var cp *service.ConnectionPoolOut
133+
serviceList, err := avnGen.ServiceGet(ctx, connPool.Spec.Project, connPool.Spec.ServiceName)
127134
if err != nil {
128-
return nil, fmt.Errorf("cannot get ConnectionPool: %w", err)
135+
return nil, fmt.Errorf("cannot get service: %w", err)
136+
}
137+
138+
for _, connP := range serviceList.ConnectionPools {
139+
if connP.PoolName == connPool.Name {
140+
cp = &connP
141+
}
142+
}
143+
144+
if cp == nil {
145+
return nil, fmt.Errorf("connection pool %q not found", connPool.Name)
129146
}
130147

131148
cert, err := avnGen.ProjectKmsGetCA(ctx, connPool.Spec.Project)
@@ -134,7 +151,7 @@ func (h ConnectionPoolHandler) get(ctx context.Context, avn *aiven.Client, avnGe
134151
}
135152

136153
// The pool comes with its own port
137-
poolURI, err := url.Parse(cp.ConnectionURI)
154+
poolURI, err := url.Parse(cp.ConnectionUri)
138155
if err != nil {
139156
return nil, fmt.Errorf("can't parse ConnectionPool URI: %w", err)
140157
}
@@ -160,7 +177,7 @@ func (h ConnectionPoolHandler) get(ctx context.Context, avn *aiven.Client, avnGe
160177
prefix + "USER": s.ServiceUriParams["user"],
161178
prefix + "PASSWORD": s.ServiceUriParams["password"],
162179
prefix + "SSLMODE": s.ServiceUriParams["sslmode"],
163-
prefix + "DATABASE_URI": cp.ConnectionURI,
180+
prefix + "DATABASE_URI": cp.ConnectionUri,
164181
prefix + "CA_CERT": cert,
165182
// todo: remove in future releases
166183
"PGHOST": s.ServiceUriParams["host"],
@@ -169,41 +186,57 @@ func (h ConnectionPoolHandler) get(ctx context.Context, avn *aiven.Client, avnGe
169186
"PGUSER": s.ServiceUriParams["user"],
170187
"PGPASSWORD": s.ServiceUriParams["password"],
171188
"PGSSLMODE": s.ServiceUriParams["sslmode"],
172-
"DATABASE_URI": cp.ConnectionURI,
189+
"DATABASE_URI": cp.ConnectionUri,
173190
}
174191

175192
return newSecret(connPool, stringData, false), nil
176193
}
177194

178-
u, err := avn.ServiceUsers.Get(ctx, connPool.Spec.Project, connPool.Spec.ServiceName, connPool.Spec.Username)
195+
u, err := avnGen.ServiceUserGet(ctx, connPool.Spec.Project, connPool.Spec.ServiceName, connPool.Spec.Username)
179196
if err != nil {
180197
return nil, fmt.Errorf("cannot get user: %w", err)
181198
}
182199

183200
prefix := getSecretPrefix(connPool)
184201
stringData := map[string]string{
185-
prefix + "NAME": connPool.Name,
186-
prefix + "HOST": s.ServiceUriParams["host"],
187-
prefix + "PORT": poolURI.Port(),
188-
prefix + "DATABASE": cp.Database,
189-
prefix + "USER": cp.Username,
202+
prefix + "NAME": connPool.Name,
203+
prefix + "HOST": s.ServiceUriParams["host"],
204+
prefix + "PORT": poolURI.Port(),
205+
prefix + "DATABASE": cp.Database,
206+
prefix + "USER": func() string {
207+
if cp.Username != nil {
208+
return *cp.Username
209+
}
210+
211+
// this should never happen, but we have to handle this case anyway
212+
// this behaviour compatible with the previous implementation with aiven.Client
213+
return ""
214+
}(),
190215
prefix + "PASSWORD": u.Password,
191216
prefix + "SSLMODE": s.ServiceUriParams["sslmode"],
192-
prefix + "DATABASE_URI": cp.ConnectionURI,
217+
prefix + "DATABASE_URI": cp.ConnectionUri,
193218
prefix + "CA_CERT": cert,
194219
// todo: remove in future releases
195-
"PGHOST": s.ServiceUriParams["host"],
196-
"PGPORT": poolURI.Port(),
197-
"PGDATABASE": cp.Database,
198-
"PGUSER": cp.Username,
220+
"PGHOST": s.ServiceUriParams["host"],
221+
"PGPORT": poolURI.Port(),
222+
"PGDATABASE": cp.Database,
223+
"PGUSER": func() string {
224+
if cp.Username != nil {
225+
return *cp.Username
226+
}
227+
228+
// this should never happen, but we have to handle this case anyway
229+
// this behaviour compatible with the previous implementation with aiven.Client
230+
return ""
231+
}(),
199232
"PGPASSWORD": u.Password,
200233
"PGSSLMODE": s.ServiceUriParams["sslmode"],
201-
"DATABASE_URI": cp.ConnectionURI,
234+
"DATABASE_URI": cp.ConnectionUri,
202235
}
203236
return newSecret(connPool, stringData, false), nil
204237
}
205238

206-
func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
239+
func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, _ *aiven.Client, avnGen avngen.Client, obj client.Object) (bool, error) {
207240
cp, err := h.convert(obj)
208241
if err != nil {
209242
return false, err
@@ -221,16 +254,35 @@ func (h ConnectionPoolHandler) checkPreconditions(ctx context.Context, avn *aive
221254
return false, nil
222255
}
223256

224-
_, err = avn.Databases.Get(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Spec.DatabaseName)
225-
if err == nil {
226-
_, err = avnGen.ServiceUserGet(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Spec.Username)
257+
// check if the database exists
258+
var exists bool
259+
dbList, err := avnGen.ServiceGet(ctx, cp.Spec.Project, cp.Spec.ServiceName)
260+
if err != nil {
261+
return false, err
227262
}
228263

229-
if isNotFound(err) {
264+
for _, db := range dbList.Databases {
265+
if db == cp.Spec.DatabaseName {
266+
exists = true
267+
break
268+
}
269+
}
270+
271+
if !exists {
230272
return false, nil
231273
}
232274

233-
return err == nil, err
275+
if cp.Spec.Username != "" {
276+
_, err = avnGen.ServiceUserGet(ctx, cp.Spec.Project, cp.Spec.ServiceName, cp.Spec.Username)
277+
if isNotFound(err) {
278+
return false, nil
279+
}
280+
if err != nil {
281+
return false, err
282+
}
283+
}
284+
285+
return true, nil
234286
}
235287

236288
func (h ConnectionPoolHandler) convert(i client.Object) (*v1alpha1.ConnectionPool, error) {

docs/docs/resources/connectionpool.md

+1-1
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,6 @@ ConnectionPoolSpec defines the desired state of ConnectionPool.
115115
- [`databaseName`](#spec.databaseName-property){: name='spec.databaseName-property'} (string, MaxLength: 40). Name of the database the pool connects to.
116116
- [`project`](#spec.project-property){: name='spec.project-property'} (string, Immutable, Pattern: `^[a-zA-Z0-9_-]+$`, MaxLength: 63). Identifies the project this resource belongs to.
117117
- [`serviceName`](#spec.serviceName-property){: name='spec.serviceName-property'} (string, Immutable, Pattern: `^[a-z][-a-z0-9]+$`, MaxLength: 63). Specifies the name of the service that this resource belongs to.
118-
- [`username`](#spec.username-property){: name='spec.username-property'} (string, MaxLength: 64). Name of the service user used to connect to the database.
119118

120119
**Optional**
121120

@@ -124,6 +123,7 @@ ConnectionPoolSpec defines the desired state of ConnectionPool.
124123
- [`connInfoSecretTargetDisabled`](#spec.connInfoSecretTargetDisabled-property){: name='spec.connInfoSecretTargetDisabled-property'} (boolean, Immutable). When true, the secret containing connection information will not be created, defaults to false. This field cannot be changed after resource creation.
125124
- [`poolMode`](#spec.poolMode-property){: name='spec.poolMode-property'} (string, Enum: `session`, `transaction`, `statement`). Mode the pool operates in (session, transaction, statement).
126125
- [`poolSize`](#spec.poolSize-property){: name='spec.poolSize-property'} (integer). Number of connections the pool may create towards the backend server.
126+
- [`username`](#spec.username-property){: name='spec.username-property'} (string, MaxLength: 64). Name of the service user used to connect to the database.
127127

128128
## authSecretRef {: #spec.authSecretRef }
129129

0 commit comments

Comments
 (0)