Skip to content

Commit 7b385d7

Browse files
authored
Merge pull request cybertec-postgresql#382 from cybertec-postgresql/381_active_chain_handling
Rewrite active chain handling
2 parents fe2f499 + 81b1f3b commit 7b385d7

14 files changed

+127
-150
lines changed

.github/workflows/docker.yml

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ jobs:
2424
- name: Set up Golang
2525
uses: actions/setup-go@v2
2626
with:
27-
go-version: '1.16'
27+
go-version: '1.17'
2828

2929
# despite the fact docker will build binary internally
3030
# we want to stop workflow in case of any error before pushing to registry

internal/pgengine/access.go

+13-37
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,11 @@ import (
66
"os"
77

88
"github.com/georgysavva/scany/pgxscan"
9-
"github.com/jackc/pgx/v4"
109
)
1110

1211
// InvalidOid specifies value for non-existent objects
1312
const InvalidOid = 0
1413

15-
/*FixSchedulerCrash make sure that task chains which are not complete due to a scheduler crash are "fixed"
16-
and marked as stopped at a certain point */
17-
func (pge *PgEngine) FixSchedulerCrash(ctx context.Context) {
18-
_, err := pge.ConfigDb.Exec(ctx, `SELECT timetable.health_check($1)`, pge.ClientName)
19-
if err != nil {
20-
pge.l.WithError(err).Error("Failed to perform health check")
21-
}
22-
}
23-
2414
// DeleteChainConfig delete chain configuration for self destructive chains
2515
func (pge *PgEngine) DeleteChainConfig(ctx context.Context, chainID int) bool {
2616
pge.l.WithField("chain", chainID).Info("Deleting self destructive chain configuration")
@@ -51,40 +41,26 @@ VALUES ($1, $2, $3, $4, clock_timestamp() - $5 :: interval, clock_timestamp(), $
5141
}
5242

5343
// InsertChainRunStatus inits the execution run log, which will be use to effectively control scheduler concurrency
54-
func (pge *PgEngine) InsertChainRunStatus(ctx context.Context, chainID int) int {
55-
const sqlInsertRunStatus = `INSERT INTO timetable.run_status
56-
(execution_status, chain_id, client_name)
57-
SELECT 'CHAIN_STARTED', c.chain_id, $2
58-
FROM timetable.chain c
59-
WHERE
60-
c.chain_id = $1 AND
44+
func (pge *PgEngine) InsertChainRunStatus(ctx context.Context, chainID int, maxInstances int) bool {
45+
const sqlInsertRunStatus = `INSERT INTO timetable.active_chain (chain_id, client_name)
46+
SELECT $1, $2 WHERE
6147
(
62-
SELECT COALESCE(count(*) < c.max_instances, TRUE)
63-
FROM timetable.get_chain_running_statuses(c.chain_id)
64-
)
65-
RETURNING run_status_id;`
66-
id := -1
67-
err := pge.ConfigDb.QueryRow(ctx, sqlInsertRunStatus, chainID, pge.ClientName).Scan(&id)
48+
SELECT COALESCE(count(*) < $3, TRUE)
49+
FROM timetable.active_chain ac WHERE ac.chain_id = $1
50+
)`
51+
res, err := pge.ConfigDb.Exec(ctx, sqlInsertRunStatus, chainID, pge.ClientName, maxInstances)
6852
if err != nil {
69-
if err == pgx.ErrNoRows {
70-
return -1
71-
}
7253
pge.l.WithError(err).Error("Cannot save information about the chain run status")
54+
return false
7355
}
74-
return id
56+
return res.RowsAffected() == 1
7557
}
7658

77-
// AddChainRunStatus inserts status information about running chain elements
78-
func (pge *PgEngine) AddChainRunStatus(ctx context.Context, task *ChainTask, runStatusID int, status string) {
79-
const sqlInsertFinishStatus = `INSERT INTO timetable.run_status
80-
(task_id, execution_status, start_status_id, chain_id, client_name)
81-
VALUES
82-
($1, $2, $3, $4, $5)`
83-
var err error
84-
_, err = pge.ConfigDb.Exec(ctx, sqlInsertFinishStatus,
85-
task.TaskID, status, runStatusID, task.ChainID, pge.ClientName)
59+
func (pge *PgEngine) RemoveChainRunStatus(ctx context.Context, chainID int) {
60+
const sqlRemoveRunStatus = `DELETE FROM timetable.active_chain WHERE chain_id = $1 and client_name = $2`
61+
_, err := pge.ConfigDb.Exec(ctx, sqlRemoveRunStatus, chainID, pge.ClientName)
8662
if err != nil {
87-
pge.l.WithError(err).Error("Update chain status failed")
63+
pge.l.WithError(err).Error("Cannot save information about the chain run status")
8864
}
8965
}
9066

internal/pgengine/access_test.go

+11-28
Original file line numberDiff line numberDiff line change
@@ -33,47 +33,30 @@ func TestDeleteChainConfig(t *testing.T) {
3333
assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
3434
}
3535

36-
func TestFixSchedulerCrash(t *testing.T) {
37-
initmockdb(t)
38-
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")
39-
defer mockPool.Close()
40-
41-
t.Run("Check FixSchedulerCrash if sql fails", func(t *testing.T) {
42-
ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime*time.Second+2)
43-
defer cancel()
44-
mockPool.ExpectExec(`SELECT timetable\.health_check`).WillReturnError(errors.New("error"))
45-
pge.FixSchedulerCrash(ctx)
46-
})
47-
48-
assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
49-
}
50-
5136
func TestInsertChainRunStatus(t *testing.T) {
5237
initmockdb(t)
5338
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")
39+
pge.ClientName = "test_client"
5440
defer mockPool.Close()
5541

56-
t.Run("Check InsertChainRunStatus if sql fails", func(t *testing.T) {
57-
ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime*time.Second+2)
58-
defer cancel()
59-
mockPool.ExpectQuery("INSERT INTO timetable\\.run_status").WillReturnError(errors.New("error"))
60-
pge.InsertChainRunStatus(ctx, 0)
61-
})
42+
mockPool.ExpectExec("INSERT INTO timetable\\.active_chain").
43+
WithArgs(0, pge.ClientName, 1).
44+
WillReturnError(errors.New("error"))
45+
pge.InsertChainRunStatus(context.Background(), 0, 1)
6246

6347
assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
6448
}
6549

66-
func TestUpdateChainRunStatus(t *testing.T) {
50+
func TestRemoveChainRunStatus(t *testing.T) {
6751
initmockdb(t)
6852
pge := pgengine.NewDB(mockPool, "pgengine_unit_test")
53+
pge.ClientName = "test_client"
6954
defer mockPool.Close()
7055

71-
t.Run("Check UpdateChainRunStatus if sql fails", func(t *testing.T) {
72-
ctx, cancel := context.WithTimeout(context.Background(), pgengine.WaitTime*time.Second+2)
73-
defer cancel()
74-
mockPool.ExpectExec("INSERT INTO timetable\\.run_status").WillReturnError(errors.New("error"))
75-
pge.AddChainRunStatus(ctx, &pgengine.ChainTask{}, 0, "STATUS")
76-
})
56+
mockPool.ExpectExec("DELETE FROM timetable\\.active_chain").
57+
WithArgs(0, pge.ClientName).
58+
WillReturnError(errors.New("error"))
59+
pge.RemoveChainRunStatus(context.Background(), 0)
7760

7861
assert.NoError(t, mockPool.ExpectationsWereMet(), "there were unfulfilled expectations")
7962
}

internal/pgengine/bootstrap.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -252,16 +252,18 @@ func (pge *PgEngine) ExecuteSchemaScripts(ctx context.Context) error {
252252
// Finalize closes session
253253
func (pge *PgEngine) Finalize() {
254254
pge.l.Info("Closing session")
255-
_, err := pge.ConfigDb.Exec(context.Background(), "DELETE FROM timetable.active_session WHERE client_pid = $1 AND client_name = $2", os.Getpid(), pge.ClientName)
255+
sql := `WITH del_ch AS (DELETE FROM timetable.active_chain WHERE client_name = $1)
256+
DELETE FROM timetable.active_session WHERE client_name = $1`
257+
_, err := pge.ConfigDb.Exec(context.Background(), sql, pge.ClientName)
256258
if err != nil {
257259
pge.l.WithError(err).Error("Cannot finalize database session")
258260
}
259261
pge.ConfigDb.Close()
260262
pge.ConfigDb = nil
261263
}
262264

263-
//ReconnectAndFixLeftovers keeps trying reconnecting every `waitTime` seconds till connection established
264-
func (pge *PgEngine) ReconnectAndFixLeftovers(ctx context.Context) bool {
265+
//Reconnect keeps trying reconnecting every `waitTime` seconds till connection established
266+
func (pge *PgEngine) Reconnect(ctx context.Context) bool {
265267
for pge.ConfigDb.Ping(ctx) != nil {
266268
pge.l.Info("Connection to the server was lost. Waiting for ", WaitTime, " sec...")
267269
select {
@@ -273,6 +275,5 @@ func (pge *PgEngine) ReconnectAndFixLeftovers(ctx context.Context) bool {
273275
}
274276
}
275277
pge.l.Info("Connection reestablished...")
276-
pge.FixSchedulerCrash(ctx)
277278
return true
278279
}

internal/pgengine/bootstrap_test.go

+2-3
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,15 @@ func TestReconnectAndFixLeftovers(t *testing.T) {
7474
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
7575
defer cancel()
7676
mockPool.ExpectPing()
77-
mockPool.ExpectExec(`SELECT timetable\.health_check`).WillReturnResult(pgxmock.NewResult("EXECUTE", 0))
78-
assert.True(t, mockpge.ReconnectAndFixLeftovers(ctx))
77+
assert.True(t, mockpge.Reconnect(ctx))
7978
})
8079

8180
t.Run("Check ReconnectAndFixLeftovers if error returned", func(t *testing.T) {
8281
ctx, cancel := context.WithTimeout(context.Background(), (pgengine.WaitTime+2)*time.Second)
8382
defer cancel()
8483
mockPool.ExpectPing().WillReturnError(errors.New("expected"))
8584
mockPool.ExpectPing().WillDelayFor(pgengine.WaitTime * time.Second * 2)
86-
assert.False(t, mockpge.ReconnectAndFixLeftovers(ctx))
85+
assert.False(t, mockpge.Reconnect(ctx))
8786
})
8887
assert.NoError(t, mockPool.ExpectationsWereMet())
8988
}

internal/pgengine/migration.go

+6
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,12 @@ var Migrations func() migrator.Option = func() migrator.Option {
9090
return ExecuteMigrationScript(ctx, tx, "00334.sql")
9191
},
9292
},
93+
&migrator.Migration{
94+
Name: "00381 Rewrite active chain handling",
95+
Func: func(ctx context.Context, tx pgx.Tx) error {
96+
return ExecuteMigrationScript(ctx, tx, "00381.sql")
97+
},
98+
},
9399
// adding new migration here, update "timetable"."migration" in "sql/ddl.sql"!
94100

95101
// &migrator.Migration{

internal/pgengine/pgengine_test.go

+5-11
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ func TestInitAndTestConfigDBConnection(t *testing.T) {
6868

6969
t.Run("Check timetable tables", func(t *testing.T) {
7070
var oid int
71-
tableNames := []string{"task", "chain", "parameter", "log", "execution_log", "run_status"}
71+
tableNames := []string{"task", "chain", "parameter", "log", "execution_log", "active_session", "active_chain"}
7272
for _, tableName := range tableNames {
7373
err := pge.ConfigDb.QueryRow(ctx, fmt.Sprintf("SELECT COALESCE(to_regclass('timetable.%s'), 0) :: int", tableName)).Scan(&oid)
7474
assert.NoError(t, err, fmt.Sprintf("Query for %s existence failed", tableName))
@@ -80,8 +80,6 @@ func TestInitAndTestConfigDBConnection(t *testing.T) {
8080
var oid int
8181
funcNames := []string{"_validate_json_schema_type(text, jsonb)",
8282
"validate_json_schema(jsonb, jsonb, jsonb)",
83-
"get_chain_running_statuses(bigint)",
84-
"health_check(TEXT)",
8583
"add_task(timetable.command_kind, TEXT, BIGINT, DOUBLE PRECISION)",
8684
"add_job(TEXT, timetable.cron, TEXT, JSONB, timetable.command_kind, TEXT, INTEGER, BOOLEAN, BOOLEAN, BOOLEAN, BOOLEAN)",
8785
"is_cron_in_time(timetable.cron, timestamptz)"}
@@ -121,7 +119,7 @@ func TestInitAndTestConfigDBConnection(t *testing.T) {
121119
})
122120

123121
t.Run("Check Reconnecting Database", func(t *testing.T) {
124-
assert.Equal(t, true, pge.ReconnectAndFixLeftovers(ctx),
122+
assert.Equal(t, true, pge.Reconnect(ctx),
125123
"Should succeed for reconnect")
126124
})
127125
}
@@ -140,10 +138,6 @@ func TestSchedulerFunctions(t *testing.T) {
140138

141139
ctx := context.Background()
142140

143-
t.Run("Check FixSchedulerCrash function", func(t *testing.T) {
144-
assert.NotPanics(t, func() { pge.FixSchedulerCrash(ctx) }, "Fix scheduler crash failed")
145-
})
146-
147141
t.Run("Check DeleteChainConfig funсtion", func(t *testing.T) {
148142
assert.Equal(t, false, pge.DeleteChainConfig(ctx, 0), "Should not delete in clean database")
149143
})
@@ -169,10 +163,10 @@ func TestSchedulerFunctions(t *testing.T) {
169163
})
170164

171165
t.Run("Check InsertChainRunStatus funсtion", func(t *testing.T) {
172-
var id int
173-
assert.NotPanics(t, func() { id = pge.InsertChainRunStatus(ctx, 0) },
166+
var res bool
167+
assert.NotPanics(t, func() { res = pge.InsertChainRunStatus(ctx, 0, 1) },
174168
"Should no error in clean database")
175-
assert.NotZero(t, id, "Run status id should be greater then 0")
169+
assert.True(t, res, "Active chain should be inserted")
176170
})
177171

178172
t.Run("Check ExecuteSQLCommand function", func(t *testing.T) {

internal/pgengine/sql/ddl.sql

+19-14
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,8 @@ VALUES
1717
(1, '00305 Fix timetable.is_cron_in_time'),
1818
(2, '00323 Append timetable.delete_job function'),
1919
(3, '00329 Migration required for some new added functions'),
20-
(4, '00334 Refactor timetable.task as plain schema without tree-like dependencies');
20+
(4, '00334 Refactor timetable.task as plain schema without tree-like dependencies'),
21+
(5, '00381 Rewrite active chain handling');
2122

2223
CREATE DOMAIN timetable.cron AS TEXT CHECK(
2324
substr(VALUE, 1, 6) IN ('@every', '@after') AND (substr(VALUE, 7) :: INTERVAL) IS NOT NULL
@@ -110,7 +111,6 @@ CREATE UNLOGGED TABLE timetable.active_session(
110111
COMMENT ON TABLE timetable.active_session IS
111112
'Stores information about active sessions';
112113

113-
114114
CREATE TYPE timetable.log_type AS ENUM ('DEBUG', 'NOTICE', 'LOG', 'ERROR', 'PANIC', 'USER');
115115

116116
CREATE OR REPLACE FUNCTION timetable.get_client_name(integer) RETURNS TEXT AS
@@ -129,7 +129,9 @@ CREATE TABLE timetable.log
129129
message_data jsonb
130130
);
131131

132-
-- log timetable related action
132+
COMMENT ON TABLE timetable.log IS
133+
'Stores log entries of active sessions';
134+
133135
CREATE TABLE timetable.execution_log (
134136
chain_id BIGINT,
135137
task_id BIGINT,
@@ -143,19 +145,17 @@ CREATE TABLE timetable.execution_log (
143145
client_name TEXT NOT NULL
144146
);
145147

146-
CREATE TYPE timetable.execution_status AS ENUM ('CHAIN_STARTED', 'CHAIN_FAILED', 'CHAIN_DONE', 'TASK_STARTED', 'TASK_DONE', 'DEAD');
147-
148-
CREATE TABLE timetable.run_status (
149-
run_status_id BIGSERIAL PRIMARY KEY,
150-
start_status_id BIGINT REFERENCES timetable.run_status(run_status_id)
151-
ON UPDATE CASCADE ON DELETE CASCADE,
152-
chain_id BIGINT,
153-
task_id BIGINT,
154-
created_at TIMESTAMPTZ DEFAULT clock_timestamp(),
155-
execution_status timetable.execution_status,
156-
client_name TEXT NOT NULL
148+
COMMENT ON TABLE timetable.execution_log IS
149+
'Stores log entries of executed tasks and chains';
150+
151+
CREATE UNLOGGED TABLE timetable.active_chain(
152+
chain_id BIGINT NOT NULL,
153+
client_name TEXT NOT NULL
157154
);
158155

156+
COMMENT ON TABLE timetable.active_chain IS
157+
'Stores information about active chains within session';
158+
159159
CREATE OR REPLACE FUNCTION timetable.try_lock_client_name(worker_pid BIGINT, worker_name TEXT)
160160
RETURNS bool AS
161161
$CODE$
@@ -172,6 +172,11 @@ BEGIN
172172
FROM pg_catalog.pg_stat_activity
173173
WHERE application_name = 'pg_timetable'
174174
);
175+
DELETE
176+
FROM timetable.active_chain
177+
WHERE client_name NOT IN (
178+
SELECT client_name FROM timetable.active_session
179+
);
175180
-- check if there any active sessions with the client name but different client pid
176181
PERFORM 1
177182
FROM timetable.active_session s

internal/pgengine/sql/job_functions.sql

-33
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,3 @@
1-
CREATE OR REPLACE FUNCTION timetable.get_chain_running_statuses(chain_id BIGINT) RETURNS SETOF BIGINT AS $$
2-
SELECT start_status.run_status_id
3-
FROM timetable.run_status start_status
4-
WHERE start_status.execution_status = 'CHAIN_STARTED'
5-
AND start_status.chain_id = $1
6-
AND NOT EXISTS (
7-
SELECT 1
8-
FROM timetable.run_status finish_status
9-
WHERE start_status.run_status_id = finish_status.start_status_id
10-
AND finish_status.execution_status IN ('CHAIN_FAILED', 'CHAIN_DONE', 'DEAD')
11-
)
12-
ORDER BY 1
13-
$$ LANGUAGE SQL STRICT;
14-
15-
COMMENT ON FUNCTION timetable.get_chain_running_statuses(chain_id BIGINT) IS
16-
'Returns a set of active run status IDs for a given chain';
17-
18-
CREATE OR REPLACE FUNCTION timetable.health_check(client_name TEXT) RETURNS void AS $$
19-
INSERT INTO timetable.run_status
20-
(execution_status, start_status_id, client_name)
21-
SELECT
22-
'DEAD', start_status.run_status_id, $1
23-
FROM timetable.run_status start_status
24-
WHERE start_status.execution_status = 'CHAIN_STARTED'
25-
AND start_status.client_name = $1
26-
AND NOT EXISTS (
27-
SELECT 1
28-
FROM timetable.run_status finish_status
29-
WHERE start_status.run_status_id = finish_status.start_status_id
30-
AND finish_status.execution_status IN ('CHAIN_FAILED', 'CHAIN_DONE', 'DEAD')
31-
)
32-
$$ LANGUAGE SQL STRICT;
33-
341
-- add_task() will add a task to the same chain as the task with `parent_id`
352
CREATE OR REPLACE FUNCTION timetable.add_task(
363
IN kind timetable.command_kind,

0 commit comments

Comments
 (0)