Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Rewrite the "sync_local" query #56

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
98 changes: 74 additions & 24 deletions crates/core/src/sync_local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,50 +55,99 @@ pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result<i64, SQLiteEr

// Query for updated objects

// Be careful with modifying this query - it is critical for performance. When modifying, make sure to check
// performance of the query with a large number of rows, and also with a large number of duplicate rows (same row_id).
//
// This form uses a subquery with max(r.op_id) instead of a JOIN to get the latest oplog entry for each updated row.
// The subquery is because:
// 1. We need the GROUP BY to execute _before_ looking up the latest op_id for each row, otherwise
// we get terrible performance if there are lots of duplicate ids (O(N^2) performance).
// 2. We want to avoid using a second GROUP BY, which would use a secondary TEMP B-TREE.
//
// It does not appear to be feasible to avoid the single TEMP B-TREE here.
//
// The query roughly does the following:
// 1. Filter oplog by the ops added but not applied yet (oplog b). These are not unique.
// 2. Use GROUP BY to get unique rows. This adds some overhead because of the TEMP B-TREE, but is necessary
// to cover cases of duplicate rows. DISTINCT would do the same in theory, but is slower than GROUP BY in practice.
// 3. For each op, find the latest version of the data. This is done using a subquery, with `max(r.op_id)`` to
// select the latest version.
//
// The subquery instead of a JOIN is because:
// 1. We need the GROUP BY to execute _before_ looking up the latest op_id for each row, otherwise
// we get terrible performance if there are lots of duplicate ids (O(N^2) performance).
// 2. We want to avoid using a second GROUP BY, which would use a second TEMP B-TREE.
//
// The `ifnull(data, max(op_id))` clause is a hack to pick the row with the largest op_id, but only select the data.
//
// QUERY PLAN
// |--CO-ROUTINE updated_rows
// | `--COMPOUND QUERY
// | |--LEFT-MOST SUBQUERY
// | | |--SCAN buckets USING COVERING INDEX ps_buckets_name
// | | `--SEARCH b USING INDEX ps_oplog_opid (bucket=? AND op_id>?)
// | `--UNION ALL
// | `--SCAN ps_updated_rows
// |--SCAN b
// |--USE TEMP B-TREE FOR GROUP BY
// `--CORRELATED SCALAR SUBQUERY 3
// `--SEARCH r USING INDEX ps_oplog_row (row_type=? AND row_id=?)

// language=SQLite
let statement = db
.prepare_v2(
"\
-- 1. Filter oplog by the ops added but not applied yet (oplog b).
-- SELECT DISTINCT / UNION is important for cases with many duplicate ids.
WITH updated_rows AS (
SELECT DISTINCT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION SELECT row_type, row_id FROM ps_updated_rows
SELECT b.row_type, b.row_id FROM ps_buckets AS buckets
CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id
AND (b.op_id > buckets.last_applied_op)
UNION ALL SELECT row_type, row_id FROM ps_updated_rows
)

-- 3. Group the objects from different buckets together into a single one (ops).
SELECT b.row_type as type,
b.row_id as id,
r.data as data,
count(r.bucket) as buckets,
/* max() affects which row is used for 'data' */
max(r.op_id) as op_id
-- 2. Find *all* current ops over different buckets for those objects (oplog r).
FROM updated_rows b
LEFT OUTER JOIN ps_oplog AS r
ON r.row_type = b.row_type
AND r.row_id = b.row_id
-- Group for (3)
GROUP BY b.row_type, b.row_id",
SELECT
b.row_type,
b.row_id,
(
SELECT ifnull(r.data, max(r.op_id))
FROM ps_oplog r
WHERE r.row_type = b.row_type
AND r.row_id = b.row_id
) as data
FROM updated_rows b;
GROUP BY b.row_type, b.row_id;
",
)
.into_db_result(db)?;

// TODO: cache statements
// An alternative form of the query is this:
//
// SELECT r.row_type as type,
// r.row_id as id,
// r.data as data,
// max(r.op_id) as op_id
// FROM ps_oplog r
// GROUP BY r.row_type, r.row_id;
//
// This form is simple and fast, but does not filter only on updated rows. It also ignores ps_updated_rows.
// We could later add heuristics to use this form on initial sync, or when a large number of rows have been re-synced.
//
// QUERY PLAN
// `--SCAN r USING INDEX ps_oplog_row

// TODO: cache individual statements

while statement.step().into_db_result(db)? == ResultCode::ROW {
let type_name = statement.column_text(0)?;
let id = statement.column_text(1)?;
let buckets = statement.column_int(3)?;
let data = statement.column_text(2);

let table_name = internal_table_name(type_name);

if tables.contains(&table_name) {
let quoted = quote_internal_name(type_name, false);

if buckets == 0 {
// is_err() is essentially a NULL check here
if data.is_err() {
// DELETE
let delete_statement = db
.prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted))
Expand All @@ -115,7 +164,8 @@ GROUP BY b.row_type, b.row_id",
insert_statement.exec()?;
}
} else {
if buckets == 0 {
// is_err() is essentially a NULL check here
if data.is_err() {
// DELETE
// language=SQLite
let delete_statement = db
Expand Down
Loading