Skip to content

Commit d65ba4b

Browse files
craig[bot]msbutler
craig[bot]
andcommitted
140952: Revert "jobs: parse internal jobs queries at init time" r=yuzefovich a=msbutler This reverts commit 28315da. Epic: None Co-authored-by: Michael Butler <[email protected]>
2 parents ecfb74b + 8e5590c commit d65ba4b

File tree

3 files changed

+90
-156
lines changed

3 files changed

+90
-156
lines changed

pkg/jobs/BUILD.bazel

-2
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,6 @@ go_library(
5252
"//pkg/sql/catalog/descpb",
5353
"//pkg/sql/catalog/descs",
5454
"//pkg/sql/isql",
55-
"//pkg/sql/parser",
56-
"//pkg/sql/parser/statements",
5755
"//pkg/sql/pgwire/pgerror",
5856
"//pkg/sql/protoreflect",
5957
"//pkg/sql/sem/catconstants",

pkg/jobs/job_info_storage.go

+39-121
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@ import (
1515
"github.com/cockroachdb/cockroach/pkg/roachpb"
1616
"github.com/cockroachdb/cockroach/pkg/settings"
1717
"github.com/cockroachdb/cockroach/pkg/sql/isql"
18-
"github.com/cockroachdb/cockroach/pkg/sql/parser"
19-
"github.com/cockroachdb/cockroach/pkg/sql/parser/statements"
2018
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
2119
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
2220
"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
@@ -35,16 +33,6 @@ var (
3533
)
3634
)
3735

38-
// Parse all job storage SQL statements at init time to avoid parsing them on
39-
// every execution.
40-
func mustParseOne(s string) statements.Statement[tree.Statement] {
41-
stmt, err := parser.ParseOne(s)
42-
if err != nil {
43-
panic(err)
44-
}
45-
return stmt
46-
}
47-
4836
// ProgressStorage reads and writes progress rows.
4937
type ProgressStorage jobspb.JobID
5038

@@ -53,10 +41,6 @@ func (j *Job) ProgressStorage() ProgressStorage {
5341
return ProgressStorage(j.id)
5442
}
5543

56-
var getJobProgressQuery = mustParseOne(
57-
"SELECT written, fraction, resolved FROM system.job_progress WHERE job_id = $1",
58-
)
59-
6044
// Get returns the latest progress report for the job along with when it was
6145
// written. If the fraction is null it is returned as NaN, and if the resolved
6246
// ts is null, it is empty.
@@ -66,10 +50,10 @@ func (i ProgressStorage) Get(
6650
ctx, sp := tracing.ChildSpan(ctx, "get-job-progress")
6751
defer sp.Finish()
6852

69-
row, err := txn.QueryRowExParsed(
53+
row, err := txn.QueryRowEx(
7054
ctx, "job-progress-get", txn.KV(),
7155
sessiondata.NodeUserSessionDataOverride,
72-
getJobProgressQuery, i,
56+
"SELECT written, fraction, resolved FROM system.job_progress WHERE job_id = $1", i,
7357
)
7458

7559
if err != nil || row == nil {
@@ -107,22 +91,6 @@ func (i ProgressStorage) Get(
10791
return fraction, ts, written.Time, nil
10892
}
10993

110-
var (
111-
deleteJobProgressStmt = mustParseOne(
112-
`DELETE FROM system.job_progress WHERE job_id = $1`,
113-
)
114-
insertJobProgressStmt = mustParseOne(
115-
`INSERT INTO system.job_progress (job_id, written, fraction, resolved) VALUES ($1, now(), $2, $3)`,
116-
)
117-
insertJobProgressHistoryStmt = mustParseOne(
118-
`UPSERT INTO system.job_progress_history (job_id, written, fraction, resolved) VALUES ($1, now(), $2, $3)`,
119-
)
120-
pruneJobProgressHistoryStmt = mustParseOne(
121-
`DELETE FROM system.job_progress_history WHERE job_id = $1 AND written IN
122-
(SELECT written FROM system.job_progress_history WHERE job_id = $1 ORDER BY written DESC OFFSET $2)`,
123-
)
124-
)
125-
12694
// Set records a progress update. If fraction is NaN or resolved is empty, that
12795
// field is left null. The time at which the progress was reported is recorded.
12896
func (i ProgressStorage) Set(
@@ -131,9 +99,9 @@ func (i ProgressStorage) Set(
13199
ctx, sp := tracing.ChildSpan(ctx, "write-job-progress")
132100
defer sp.Finish()
133101

134-
if _, err := txn.ExecParsed(
102+
if _, err := txn.ExecEx(
135103
ctx, "write-job-progress-delete", txn.KV(), sessiondata.NodeUserSessionDataOverride,
136-
deleteJobProgressStmt, i,
104+
`DELETE FROM system.job_progress WHERE job_id = $1`, i,
137105
); err != nil {
138106
return err
139107
}
@@ -146,25 +114,26 @@ func (i ProgressStorage) Set(
146114
ts = resolved.AsOfSystemTime()
147115
}
148116

149-
if _, err := txn.ExecParsed(
117+
if _, err := txn.ExecEx(
150118
ctx, "write-job-progress-insert", txn.KV(), sessiondata.NodeUserSessionDataOverride,
151-
insertJobProgressStmt,
119+
`INSERT INTO system.job_progress (job_id, written, fraction, resolved) VALUES ($1, now(), $2, $3)`,
152120
i, frac, ts,
153121
); err != nil {
154122
return err
155123
}
156124

157-
if _, err := txn.ExecParsed(
125+
if _, err := txn.ExecEx(
158126
ctx, "write-job-progress-history-insert", txn.KV(), sessiondata.NodeUserSessionDataOverride,
159-
insertJobProgressHistoryStmt,
127+
`UPSERT INTO system.job_progress_history (job_id, written, fraction, resolved) VALUES ($1, now(), $2, $3)`,
160128
i, frac, ts,
161129
); err != nil {
162130
return err
163131
}
164132

165-
if _, err := txn.ExecParsed(
133+
if _, err := txn.ExecEx(
166134
ctx, "write-job-progress-history-prune", txn.KV(), sessiondata.NodeUserSessionDataOverride,
167-
pruneJobProgressHistoryStmt,
135+
`DELETE FROM system.job_progress_history WHERE job_id = $1 AND written IN
136+
(SELECT written FROM system.job_progress_history WHERE job_id = $1 ORDER BY written DESC OFFSET $2)`,
168137
i, retainedProgressHistory.Get(txn.KV().DB().SettingsValues()),
169138
); err != nil {
170139
return err
@@ -196,23 +165,15 @@ func (j *Job) StatusStorage() StatusStorage {
196165
return StatusStorage(j.id)
197166
}
198167

199-
var deleteJobStatusStmt = mustParseOne(
200-
`DELETE FROM system.job_status WHERE job_id = $1`,
201-
)
202-
203168
// Clear clears the status message row for the job, if it exists.
204169
func (i StatusStorage) Clear(ctx context.Context, txn isql.Txn) error {
205-
_, err := txn.ExecParsed(
170+
_, err := txn.ExecEx(
206171
ctx, "clear-job-status-delete", txn.KV(), sessiondata.NodeUserSessionDataOverride,
207-
deleteJobStatusStmt, i,
172+
`DELETE FROM system.job_status WHERE job_id = $1`, i,
208173
)
209174
return err
210175
}
211176

212-
var insertJobStatusStmt = mustParseOne(
213-
`INSERT INTO system.job_status (job_id, written, status) VALUES ($1, now(), $2)`,
214-
)
215-
216177
// Sets writes the current status, replacing the current one if it exists.
217178
// Setting an empty status is the same as calling Clear().
218179
func (i StatusStorage) Set(ctx context.Context, txn isql.Txn, status string) error {
@@ -225,9 +186,9 @@ func (i StatusStorage) Set(ctx context.Context, txn isql.Txn, status string) err
225186
return err
226187
}
227188

228-
if _, err := txn.ExecParsed(
189+
if _, err := txn.ExecEx(
229190
ctx, "write-job-status-insert", txn.KV(), sessiondata.NodeUserSessionDataOverride,
230-
insertJobStatusStmt,
191+
`INSERT INTO system.job_status (job_id, written, status) VALUES ($1, now(), $2)`,
231192
i, status,
232193
); err != nil {
233194
return err
@@ -240,18 +201,14 @@ func (i StatusStorage) Set(ctx context.Context, txn isql.Txn, status string) err
240201
return nil
241202
}
242203

243-
var getJobStatusQuery = mustParseOne(
244-
"SELECT written, status FROM system.job_status WHERE job_id = $1",
245-
)
246-
247204
// Get gets the current status mesasge for a job, if any.
248205
func (i StatusStorage) Get(ctx context.Context, txn isql.Txn) (string, time.Time, error) {
249206
ctx, sp := tracing.ChildSpan(ctx, "get-job-status")
250207
defer sp.Finish()
251208

252-
row, err := txn.QueryRowExParsed(
209+
row, err := txn.QueryRowEx(
253210
ctx, "job-status-get", txn.KV(), sessiondata.NodeUserSessionDataOverride,
254-
getJobStatusQuery,
211+
"SELECT written, status FROM system.job_status WHERE job_id = $1",
255212
i,
256213
)
257214

@@ -292,17 +249,6 @@ func (j *Job) Messages() MessageStorage {
292249
return MessageStorage(j.id)
293250
}
294251

295-
var (
296-
upsertJobMessageStmt = mustParseOne(
297-
`UPSERT INTO system.job_message (job_id, written, kind, message) VALUES ($1, now(),$2, $3)`,
298-
)
299-
pruneJobMessageStmt = mustParseOne(
300-
`DELETE FROM system.job_message WHERE job_id = $1 AND kind = $2 AND written IN (
301-
SELECT written FROM system.job_message WHERE job_id = $1 AND kind = $2 ORDER BY written DESC OFFSET $3
302-
)`,
303-
)
304-
)
305-
306252
// Record writes a human readable message of the specified kind to the message
307253
// log for this job, and prunes retained messages of the same kind based on the
308254
// configured limit to keep the total number of retained messages bounded.
@@ -311,18 +257,20 @@ func (i MessageStorage) Record(ctx context.Context, txn isql.Txn, kind, message
311257
defer sp.Finish()
312258

313259
// Insert the new message.
314-
if _, err := txn.ExecParsed(
260+
if _, err := txn.ExecEx(
315261
ctx, "write-job-message-insert", txn.KV(), sessiondata.NodeUserSessionDataOverride,
316-
upsertJobMessageStmt,
262+
`UPSERT INTO system.job_message (job_id, written, kind, message) VALUES ($1, now(),$2, $3)`,
317263
i, kind, message,
318264
); err != nil {
319265
return err
320266
}
321267

322268
// Prune old messages of the same kind to bound historical data.
323-
if _, err := txn.ExecParsed(
269+
if _, err := txn.ExecEx(
324270
ctx, "write-job-message-prune", txn.KV(), sessiondata.NodeUserSessionDataOverride,
325-
pruneJobMessageStmt,
271+
`DELETE FROM system.job_message WHERE job_id = $1 AND kind = $2 AND written IN (
272+
SELECT written FROM system.job_message WHERE job_id = $1 AND kind = $2 ORDER BY written DESC OFFSET $3
273+
)`,
326274
i, kind, retainedMessageHistory.Get(txn.KV().DB().SettingsValues()),
327275
); err != nil {
328276
return err
@@ -404,18 +352,14 @@ func InfoStorageForJob(txn isql.Txn, jobID jobspb.JobID) InfoStorage {
404352
return InfoStorage{j: &Job{id: jobID}, txn: txn}
405353
}
406354

407-
var checkClaimSessionQuery = mustParseOne(
408-
`SELECT claim_session_id FROM system.jobs WHERE id = $1`,
409-
)
410-
411355
func (i *InfoStorage) checkClaimSession(ctx context.Context) error {
412356
if i.claimChecked {
413357
return nil
414358
}
415359

416-
row, err := i.txn.QueryRowExParsed(ctx, "check-claim-session", i.txn.KV(),
360+
row, err := i.txn.QueryRowEx(ctx, "check-claim-session", i.txn.KV(),
417361
sessiondata.NodeUserSessionDataOverride,
418-
checkClaimSessionQuery, i.j.ID())
362+
`SELECT claim_session_id FROM system.jobs WHERE id = $1`, i.j.ID())
419363
if err != nil {
420364
return err
421365
}
@@ -435,10 +379,6 @@ func (i *InfoStorage) checkClaimSession(ctx context.Context) error {
435379
return nil
436380
}
437381

438-
var getJobInfoQuery = mustParseOne(
439-
"SELECT value FROM system.job_info WHERE job_id = $1 AND info_key::string = $2 ORDER BY written DESC LIMIT 1",
440-
)
441-
442382
func (i InfoStorage) get(ctx context.Context, opName, infoKey string) ([]byte, bool, error) {
443383
if i.txn == nil {
444384
return nil, false, errors.New("cannot access the job info table without an associated txn")
@@ -452,10 +392,10 @@ func (i InfoStorage) get(ctx context.Context, opName, infoKey string) ([]byte, b
452392
// We expect there to be only a single row for a given <job_id, info_key>.
453393
// This is because all older revisions are deleted before a new one is
454394
// inserted in `InfoStorage.Write`.
455-
row, err := i.txn.QueryRowExParsed(
395+
row, err := i.txn.QueryRowEx(
456396
ctx, "job-info-get", i.txn.KV(),
457397
sessiondata.NodeUserSessionDataOverride,
458-
getJobInfoQuery,
398+
"SELECT value FROM system.job_info WHERE job_id = $1 AND info_key::string = $2 ORDER BY written DESC LIMIT 1",
459399
j.ID(), infoKey,
460400
)
461401

@@ -475,22 +415,13 @@ func (i InfoStorage) get(ctx context.Context, opName, infoKey string) ([]byte, b
475415
return []byte(*value), true, nil
476416
}
477417

478-
var (
479-
deleteJobInfoStmt = mustParseOne(
480-
`DELETE FROM system.job_info WHERE job_id = $1 AND info_key::string = $2`,
481-
)
482-
insertJobInfoStmt = mustParseOne(
483-
`INSERT INTO system.job_info (job_id, info_key, written, value) VALUES ($1, $2, now(), $3)`,
484-
)
485-
)
486-
487418
func (i InfoStorage) write(ctx context.Context, infoKey string, value []byte) error {
488419
return i.doWrite(ctx, func(ctx context.Context, j *Job, txn isql.Txn) error {
489420
// First clear out any older revisions of this info.
490-
_, err := txn.ExecParsed(
421+
_, err := txn.ExecEx(
491422
ctx, "write-job-info-delete", txn.KV(),
492423
sessiondata.NodeUserSessionDataOverride,
493-
deleteJobInfoStmt,
424+
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key::string = $2",
494425
j.ID(), infoKey,
495426
)
496427
if err != nil {
@@ -502,10 +433,10 @@ func (i InfoStorage) write(ctx context.Context, infoKey string, value []byte) er
502433
return nil
503434
}
504435
// Write the new info, using the same transaction.
505-
_, err = txn.ExecParsed(
436+
_, err = txn.ExecEx(
506437
ctx, "write-job-info-insert", txn.KV(),
507438
sessiondata.NodeUserSessionDataOverride,
508-
insertJobInfoStmt,
439+
`INSERT INTO system.job_info (job_id, info_key, written, value) VALUES ($1, $2, now(), $3)`,
509440
j.ID(), infoKey, value,
510441
)
511442
return err
@@ -625,46 +556,33 @@ func (i InfoStorage) Delete(ctx context.Context, infoKey string) error {
625556
return i.write(ctx, infoKey, nil /* value */)
626557
}
627558

628-
var (
629-
deleteJobInfoRangeLimitStmt = mustParseOne(
630-
`DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3
631-
ORDER BY info_key ASC LIMIT $4`,
632-
)
633-
deleteJobInfoRangeStmt = mustParseOne(
634-
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
635-
)
636-
)
637-
638559
// DeleteRange removes the info records between the provided
639560
// start key (inclusive) and end key (exclusive).
640561
func (i InfoStorage) DeleteRange(
641562
ctx context.Context, startInfoKey, endInfoKey string, limit int,
642563
) error {
643564
return i.doWrite(ctx, func(ctx context.Context, j *Job, txn isql.Txn) error {
644565
if limit > 0 {
645-
_, err := txn.ExecParsed(
646-
ctx, "write-job-info-delete-limit", txn.KV(),
566+
_, err := txn.ExecEx(
567+
ctx, "write-job-info-delete", txn.KV(),
647568
sessiondata.NodeUserSessionDataOverride,
648-
deleteJobInfoRangeLimitStmt,
569+
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3 "+
570+
"ORDER BY info_key ASC LIMIT $4",
649571
j.ID(), startInfoKey, endInfoKey, limit,
650572
)
651573
return err
652574
} else {
653-
_, err := txn.ExecParsed(
575+
_, err := txn.ExecEx(
654576
ctx, "write-job-info-delete", txn.KV(),
655577
sessiondata.NodeUserSessionDataOverride,
656-
deleteJobInfoRangeStmt,
578+
"DELETE FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
657579
j.ID(), startInfoKey, endInfoKey,
658580
)
659581
return err
660582
}
661583
})
662584
}
663585

664-
var getJobInfoCountQuery = mustParseOne(
665-
"SELECT count(*) FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
666-
)
667-
668586
// Count counts the info records in the range [start, end).
669587
func (i InfoStorage) Count(ctx context.Context, startInfoKey, endInfoKey string) (int, error) {
670588
if i.txn == nil {
@@ -674,10 +592,10 @@ func (i InfoStorage) Count(ctx context.Context, startInfoKey, endInfoKey string)
674592
ctx, sp := tracing.ChildSpan(ctx, "count-job-info")
675593
defer sp.Finish()
676594

677-
row, err := i.txn.QueryRowExParsed(
595+
row, err := i.txn.QueryRowEx(
678596
ctx, "job-info-count", i.txn.KV(),
679597
sessiondata.NodeUserSessionDataOverride,
680-
getJobInfoCountQuery,
598+
"SELECT count(*) FROM system.job_info WHERE job_id = $1 AND info_key >= $2 AND info_key < $3",
681599
i.j.ID(), startInfoKey, endInfoKey,
682600
)
683601

0 commit comments

Comments
 (0)