diff --git a/crates/core/src/bucket_priority.rs b/crates/core/src/bucket_priority.rs new file mode 100644 index 0000000..454f1fe --- /dev/null +++ b/crates/core/src/bucket_priority.rs @@ -0,0 +1,89 @@ +use serde::{de::Visitor, Deserialize}; +use sqlite_nostd::ResultCode; + +use crate::error::SQLiteError; + +#[repr(transparent)] +#[derive(Clone, Copy, PartialEq, Eq)] +pub struct BucketPriority { + pub number: i32, +} + +impl BucketPriority { + pub fn may_publish_with_outstanding_uploads(self) -> bool { + self == BucketPriority::HIGHEST + } + + pub const HIGHEST: BucketPriority = BucketPriority { number: 0 }; + + /// A low priority used to represent fully-completed sync operations across all priorities. + pub const SENTINEL: BucketPriority = BucketPriority { number: i32::MAX }; +} + +impl TryFrom for BucketPriority { + type Error = SQLiteError; + + fn try_from(value: i32) -> Result { + if value < BucketPriority::HIGHEST.number || value == Self::SENTINEL.number { + return Err(SQLiteError( + ResultCode::MISUSE, + Some("Invalid bucket priority".into()), + )); + } + + return Ok(BucketPriority { number: value }); + } +} + +impl Into for BucketPriority { + fn into(self) -> i32 { + self.number + } +} + +impl PartialOrd for BucketPriority { + fn partial_cmp(&self, other: &BucketPriority) -> Option { + Some(self.number.partial_cmp(&other.number)?.reverse()) + } +} + +impl<'de> Deserialize<'de> for BucketPriority { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + struct PriorityVisitor; + impl<'de> Visitor<'de> for PriorityVisitor { + type Value = BucketPriority; + + fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result { + formatter.write_str("a priority as an integer between 0 and 3 (inclusive)") + } + + fn visit_i32(self, v: i32) -> Result + where + E: serde::de::Error, + { + BucketPriority::try_from(v).map_err(|e| E::custom(e.1.unwrap_or_default())) + } + + fn visit_i64(self, v: i64) -> Result + where + E: serde::de::Error, + { + let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?; + Self::visit_i32(self, i) + } + + fn visit_u64(self, v: u64) -> Result + where + E: serde::de::Error, + { + let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?; + Self::visit_i32(self, i) + } + } + + deserializer.deserialize_i32(PriorityVisitor) + } +} diff --git a/crates/core/src/kv.rs b/crates/core/src/kv.rs index 70b5bd8..c5b7bbc 100644 --- a/crates/core/src/kv.rs +++ b/crates/core/src/kv.rs @@ -9,6 +9,7 @@ use sqlite::ResultCode; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; +use crate::bucket_priority::BucketPriority; use crate::create_sqlite_optional_text_fn; use crate::create_sqlite_text_fn; use crate::error::SQLiteError; @@ -46,13 +47,14 @@ fn powersync_last_synced_at_impl( let db = ctx.db_handle(); // language=SQLite - let statement = db.prepare_v2("select value from ps_kv where key = 'last_synced_at'")?; + let statement = db.prepare_v2("select last_synced_at from ps_sync_state where priority = ?")?; + statement.bind_int(1, BucketPriority::SENTINEL.into())?; if statement.step()? == ResultCode::ROW { let client_id = statement.column_text(0)?; - return Ok(Some(client_id.to_string())); + Ok(Some(client_id.to_string())) } else { - return Ok(None); + Ok(None) } } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 8c75b9e..972f21d 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -12,6 +12,7 @@ use core::ffi::{c_char, c_int}; use sqlite::ResultCode; use sqlite_nostd as sqlite; +mod bucket_priority; mod checkpoint; mod crud_vtab; mod diff; diff --git a/crates/core/src/migrations.rs b/crates/core/src/migrations.rs index bd999cd..0bfbb03 100644 --- a/crates/core/src/migrations.rs +++ b/crates/core/src/migrations.rs @@ -8,6 +8,7 @@ use sqlite::ResultCode; use sqlite_nostd as sqlite; use sqlite_nostd::{Connection, Context}; +use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; use crate::fix035::apply_v035_fix; @@ -310,5 +311,27 @@ json_array( .into_db_result(local_db)?; } + if current_version < 7 && target_version >= 7 { + const SENTINEL_PRIORITY: i32 = BucketPriority::SENTINEL.number; + let stmt = format!("\ +CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL, + last_synced_at TEXT NOT NULL +) STRICT; +INSERT OR IGNORE INTO ps_sync_state (priority, last_synced_at) + SELECT {}, value from ps_kv where key = 'last_synced_at'; + +INSERT INTO ps_migration(id, down_migrations) +VALUES(7, +json_array( +json_object('sql', 'INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = {}'), +json_object('sql', 'DROP TABLE ps_sync_state'), +json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7') +)); +", SENTINEL_PRIORITY, SENTINEL_PRIORITY); + + local_db.exec_safe(&stmt).into_db_result(local_db)?; + } + Ok(()) } diff --git a/crates/core/src/operations.rs b/crates/core/src/operations.rs index beb7b8b..2de3afe 100644 --- a/crates/core/src/operations.rs +++ b/crates/core/src/operations.rs @@ -18,7 +18,7 @@ SELECT json_extract(e.value, '$.has_more') as has_more, json_extract(e.value, '$.after') as after, json_extract(e.value, '$.next_after') as next_after -FROM json_each(json_extract(?, '$.buckets')) e", +FROM json_each(json_extract(?1, '$.buckets')) e", )?; statement.bind_text(1, data, sqlite::Destructor::STATIC)?; diff --git a/crates/core/src/operations_vtab.rs b/crates/core/src/operations_vtab.rs index fbafaae..4ac441b 100644 --- a/crates/core/src/operations_vtab.rs +++ b/crates/core/src/operations_vtab.rs @@ -76,16 +76,15 @@ extern "C" fn update( } else if rowid.value_type() == sqlite::ColumnType::Null { // INSERT let op = args[2].text(); - let data = args[3].text(); let tab = unsafe { &mut *vtab.cast::() }; let db = tab.db; if op == "save" { - let result = insert_operation(db, data); + let result = insert_operation(db, args[3].text()); vtab_result(vtab, result) } else if op == "sync_local" { - let result = sync_local(db, data); + let result = sync_local(db, &args[3]); if let Ok(result_row) = result { unsafe { *p_row_id = result_row; @@ -93,13 +92,13 @@ extern "C" fn update( } vtab_result(vtab, result) } else if op == "clear_remove_ops" { - let result = clear_remove_ops(db, data); + let result = clear_remove_ops(db, args[3].text()); vtab_result(vtab, result) } else if op == "delete_pending_buckets" { - let result = delete_pending_buckets(db, data); + let result = delete_pending_buckets(db, args[3].text()); vtab_result(vtab, result) } else if op == "delete_bucket" { - let result = delete_bucket(db, data); + let result = delete_bucket(db, args[3].text()); vtab_result(vtab, result) } else { ResultCode::MISUSE as c_int diff --git a/crates/core/src/sync_local.rs b/crates/core/src/sync_local.rs index 0e6602e..89646f4 100644 --- a/crates/core/src/sync_local.rs +++ b/crates/core/src/sync_local.rs @@ -1,64 +1,193 @@ use alloc::collections::BTreeSet; use alloc::format; use alloc::string::String; +use alloc::vec::Vec; +use serde::Deserialize; +use crate::bucket_priority::BucketPriority; use crate::error::{PSResult, SQLiteError}; -use sqlite_nostd as sqlite; +use sqlite_nostd::{self as sqlite, Destructor, ManagedStmt, Value}; use sqlite_nostd::{ColumnType, Connection, ResultCode}; use crate::ext::SafeManagedStmt; use crate::util::{internal_table_name, quote_internal_name}; -pub fn can_update_local(db: *mut sqlite::sqlite3) -> Result { - // language=SQLite - let statement = db.prepare_v2( - "\ -SELECT group_concat(name) -FROM ps_buckets -WHERE target_op > last_op", - )?; - - if statement.step()? != ResultCode::ROW { - return Err(SQLiteError::from(ResultCode::ABORT)); - } +pub fn sync_local(db: *mut sqlite::sqlite3, data: &V) -> Result { + let mut operation = SyncOperation::new(db, data)?; + operation.apply() +} - if statement.column_type(0)? == ColumnType::Text { - return Ok(false); - } +struct PartialSyncOperation<'a> { + /// The lowest priority part of the partial sync operation. + priority: BucketPriority, + /// The JSON-encoded arguments passed by the client SDK. This includes the priority and a list + /// of bucket names in that (and higher) priorities. + args: &'a str, +} - // This is specifically relevant for when data is added to crud before another batch is completed. +struct SyncOperation<'a> { + db: *mut sqlite::sqlite3, + data_tables: BTreeSet, + partial: Option>, +} + +impl<'a> SyncOperation<'a> { + fn new(db: *mut sqlite::sqlite3, data: &'a V) -> Result { + return Ok(Self { + db: db, + data_tables: BTreeSet::new(), + partial: match data.value_type() { + ColumnType::Text => { + let text = data.text(); + if text.len() > 0 { + #[derive(Deserialize)] + struct PartialSyncLocalArguments { + #[serde(rename = "buckets")] + _buckets: Vec, + priority: BucketPriority, + } - // language=SQLite - let statement = db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?; - if statement.step()? != ResultCode::DONE { - return Ok(false); + let args: PartialSyncLocalArguments = serde_json::from_str(text)?; + Some(PartialSyncOperation { + priority: args.priority, + args: text, + }) + } else { + None + } + } + _ => None, + }, + }); } - Ok(true) -} + fn can_apply_sync_changes(&self) -> Result { + // Don't publish downloaded data until the upload queue is empty (except for downloaded data + //in priority 0, which is published earlier). + + let needs_check = match &self.partial { + Some(p) => !p.priority.may_publish_with_outstanding_uploads(), + None => true, + }; -pub fn sync_local(db: *mut sqlite::sqlite3, _data: &str) -> Result { - if !can_update_local(db)? { - return Ok(0); + if needs_check { + // language=SQLite + let statement = self.db.prepare_v2( + "\ + SELECT group_concat(name) + FROM ps_buckets + WHERE target_op > last_op AND name = '$local'", + )?; + + if statement.step()? != ResultCode::ROW { + return Err(SQLiteError::from(ResultCode::ABORT)); + } + + if statement.column_type(0)? == ColumnType::Text { + return Ok(false); + } + + let statement = self.db.prepare_v2("SELECT 1 FROM ps_crud LIMIT 1")?; + if statement.step()? != ResultCode::DONE { + return Ok(false); + } + } + + Ok(true) } - // language=SQLite - let statement = db - .prepare_v2("SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'") - .into_db_result(db)?; - let mut tables: BTreeSet = BTreeSet::new(); + fn apply(&mut self) -> Result { + if !self.can_apply_sync_changes()? { + return Ok(0); + } + + self.collect_tables()?; + let statement = self.collect_full_operations()?; + // TODO: cache statements + while statement.step().into_db_result(self.db)? == ResultCode::ROW { + let type_name = statement.column_text(0)?; + let id = statement.column_text(1)?; + let buckets = statement.column_int(3)?; + let data = statement.column_text(2); + + let table_name = internal_table_name(type_name); + + if self.data_tables.contains(&table_name) { + let quoted = quote_internal_name(type_name, false); + + if buckets == 0 { + // DELETE + let delete_statement = self + .db + .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) + .into_db_result(self.db)?; + delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; + delete_statement.exec()?; + } else { + // INSERT/UPDATE + let insert_statement = self + .db + .prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted)) + .into_db_result(self.db)?; + insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; + insert_statement.exec()?; + } + } else { + if buckets == 0 { + // DELETE + // language=SQLite + let delete_statement = self + .db + .prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?") + .into_db_result(self.db)?; + delete_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; + delete_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; + delete_statement.exec()?; + } else { + // INSERT/UPDATE + // language=SQLite + let insert_statement = self + .db + .prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)") + .into_db_result(self.db)?; + insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; + insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?; + insert_statement.exec()?; + } + } + } + + self.set_last_applied_op()?; + self.mark_completed()?; - while statement.step()? == ResultCode::ROW { - let name = statement.column_text(0)?; - tables.insert(String::from(name)); + Ok(1) } - // Query for updated objects + fn collect_tables(&mut self) -> Result<(), SQLiteError> { + // language=SQLite + let statement = self + .db + .prepare_v2( + "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'", + ) + .into_db_result(self.db)?; - // language=SQLite - let statement = db - .prepare_v2( - "\ + while statement.step()? == ResultCode::ROW { + let name = statement.column_text(0)?; + self.data_tables.insert(String::from(name)); + } + Ok(()) + } + + fn collect_full_operations(&self) -> Result { + Ok(match &self.partial { + None => { + // Complete sync + self.db + .prepare_v2( + "\ -- 1. Filter oplog by the ops added but not applied yet (oplog b). -- SELECT DISTINCT / UNION is important for cases with many duplicate ids. WITH updated_rows AS ( @@ -68,6 +197,41 @@ WITH updated_rows AS ( UNION SELECT row_type, row_id FROM ps_updated_rows ) +-- 3. Group the objects from different buckets together into a single one (ops). +SELECT b.row_type as type, + b.row_id as id, + r.data as data, + count(r.bucket) as buckets, + /* max() affects which row is used for 'data' */ + max(r.op_id) as op_id +-- 2. Find *all* current ops over different buckets for those objects (oplog r). +FROM updated_rows b + LEFT OUTER JOIN ps_oplog AS r + ON r.row_type = b.row_type + AND r.row_id = b.row_id +-- Group for (3) +GROUP BY b.row_type, b.row_id", + ) + .into_db_result(self.db)? + } + Some(partial) => { + let stmt = self + .db + .prepare_v2( + "\ +-- 1. Filter oplog by the ops added but not applied yet (oplog b). +-- SELECT DISTINCT / UNION is important for cases with many duplicate ids. +WITH + involved_buckets (id) AS MATERIALIZED ( + SELECT id FROM ps_buckets WHERE ?1 IS NULL + OR name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets'))) + ), + updated_rows AS ( + SELECT DISTINCT FALSE as local, b.row_type, b.row_id FROM ps_buckets AS buckets + CROSS JOIN ps_oplog AS b ON b.bucket = buckets.id AND (b.op_id > buckets.last_applied_op) + WHERE buckets.id IN (SELECT id FROM involved_buckets) + ) + -- 3. Group the objects from different buckets together into a single one (ops). SELECT b.row_type as type, b.row_id as id, @@ -80,79 +244,81 @@ FROM updated_rows b LEFT OUTER JOIN ps_oplog AS r ON r.row_type = b.row_type AND r.row_id = b.row_id + AND r.bucket IN (SELECT id FROM involved_buckets) -- Group for (3) GROUP BY b.row_type, b.row_id", - ) - .into_db_result(db)?; + ) + .into_db_result(self.db)?; + stmt.bind_text(1, partial.args, Destructor::STATIC)?; - // TODO: cache statements - - while statement.step().into_db_result(db)? == ResultCode::ROW { - let type_name = statement.column_text(0)?; - let id = statement.column_text(1)?; - let buckets = statement.column_int(3)?; - let data = statement.column_text(2); - - let table_name = internal_table_name(type_name); - - if tables.contains(&table_name) { - let quoted = quote_internal_name(type_name, false); - - if buckets == 0 { - // DELETE - let delete_statement = db - .prepare_v2(&format!("DELETE FROM {} WHERE id = ?", quoted)) - .into_db_result(db)?; - delete_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; - delete_statement.exec()?; - } else { - // INSERT/UPDATE - let insert_statement = db - .prepare_v2(&format!("REPLACE INTO {}(id, data) VALUES(?, ?)", quoted)) - .into_db_result(db)?; - insert_statement.bind_text(1, id, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(2, data?, sqlite::Destructor::STATIC)?; - insert_statement.exec()?; + stmt } - } else { - if buckets == 0 { - // DELETE + }) + } + + fn set_last_applied_op(&self) -> Result<(), SQLiteError> { + match &self.partial { + Some(partial) => { // language=SQLite - let delete_statement = db - .prepare_v2("DELETE FROM ps_untyped WHERE type = ? AND id = ?") - .into_db_result(db)?; - delete_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; - delete_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; - delete_statement.exec()?; - } else { - // INSERT/UPDATE + let updated = self + .db + .prepare_v2( "\ + UPDATE ps_buckets + SET last_applied_op = last_op + WHERE last_applied_op != last_op AND + name IN (SELECT value FROM json_each(json_extract(?1, '$.buckets')))", + ) + .into_db_result(self.db)?; + updated.bind_text(1, partial.args, Destructor::STATIC)?; + updated.exec()?; + } + None => { // language=SQLite - let insert_statement = db - .prepare_v2("REPLACE INTO ps_untyped(type, id, data) VALUES(?, ?, ?)") - .into_db_result(db)?; - insert_statement.bind_text(1, type_name, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(2, id, sqlite::Destructor::STATIC)?; - insert_statement.bind_text(3, data?, sqlite::Destructor::STATIC)?; - insert_statement.exec()?; + self.db + .exec_safe( + "UPDATE ps_buckets + SET last_applied_op = last_op + WHERE last_applied_op != last_op", + ) + .into_db_result(self.db)?; } } + + Ok(()) } - // language=SQLite - db.exec_safe( - "UPDATE ps_buckets - SET last_applied_op = last_op - WHERE last_applied_op != last_op", - ) - .into_db_result(db)?; + fn mark_completed(&self) -> Result<(), SQLiteError> { + let priority_code: i32 = match &self.partial { + None => { + // language=SQLite + self.db + .exec_safe("DELETE FROM ps_updated_rows") + .into_db_result(self.db)?; + BucketPriority::SENTINEL + } + Some(partial) => partial.priority, + } + .into(); - // language=SQLite - db.exec_safe("DELETE FROM ps_updated_rows") - .into_db_result(db)?; + // Higher-priority buckets are always part of lower-priority sync operations too, so we can + // delete information about higher-priority syncs (represented as lower priority numbers). + // A complete sync is represented by a number higher than the lowest priority we allow. + // language=SQLite + let stmt = self + .db + .prepare_v2("DELETE FROM ps_sync_state WHERE priority < ?1;") + .into_db_result(self.db)?; + stmt.bind_int(1, priority_code)?; + stmt.exec()?; - // language=SQLite - db.exec_safe("insert or replace into ps_kv(key, value) values('last_synced_at', datetime())") - .into_db_result(db)?; + // language=SQLite + let stmt = self + .db + .prepare_v2("INSERT OR REPLACE INTO ps_sync_state (priority, last_synced_at) VALUES (?, datetime());") + .into_db_result(self.db)?; + stmt.bind_int(1, priority_code)?; + stmt.exec()?; - Ok(1) + Ok(()) + } } diff --git a/crates/core/src/sync_types.rs b/crates/core/src/sync_types.rs index 429980d..060dd25 100644 --- a/crates/core/src/sync_types.rs +++ b/crates/core/src/sync_types.rs @@ -18,4 +18,5 @@ pub struct Checkpoint { pub struct BucketChecksum { pub bucket: String, pub checksum: i32, + pub priority: Option, } diff --git a/crates/core/src/view_admin.rs b/crates/core/src/view_admin.rs index b58ebaf..16f7a7f 100644 --- a/crates/core/src/view_admin.rs +++ b/crates/core/src/view_admin.rs @@ -120,7 +120,7 @@ fn powersync_init_impl( setup_internal_views(local_db)?; - powersync_migrate(ctx, 6)?; + powersync_migrate(ctx, 7)?; Ok(String::from("")) } diff --git a/crates/sqlite/build.rs b/crates/sqlite/build.rs index 01382c9..20b2953 100644 --- a/crates/sqlite/build.rs +++ b/crates/sqlite/build.rs @@ -1,4 +1,3 @@ - fn main() { let mut cfg = cc::Build::new(); diff --git a/dart/pubspec.lock b/dart/pubspec.lock index 53c0592..bdc6d4a 100644 --- a/dart/pubspec.lock +++ b/dart/pubspec.lock @@ -5,74 +5,79 @@ packages: dependency: transitive description: name: _fe_analyzer_shared - sha256: "0b2f2bd91ba804e53a61d757b986f89f1f9eaed5b11e4b2f5a2468d86d6c9fc7" + sha256: "03f6da266a27a4538a69295ec142cb5717d7d4e5727b84658b63e1e1509bac9c" url: "https://pub.dev" source: hosted - version: "67.0.0" + version: "79.0.0" + _macros: + dependency: transitive + description: dart + source: sdk + version: "0.3.3" analyzer: dependency: transitive description: name: analyzer - sha256: "37577842a27e4338429a1cbc32679d508836510b056f1eedf0c8d20e39c1383d" + sha256: c9040fc56483c22a5e04a9f6a251313118b1a3c42423770623128fa484115643 url: "https://pub.dev" source: hosted - version: "6.4.1" + version: "7.2.0" args: dependency: transitive description: name: args - sha256: "7cf60b9f0cc88203c5a190b4cd62a99feea42759a7fa695010eb5de1c0b2252a" + sha256: bf9f5caeea8d8fe6721a9c358dd8a5c1947b27f1cfaa18b39c301273594919e6 url: "https://pub.dev" source: hosted - version: "2.5.0" + version: "2.6.0" async: dependency: transitive description: name: async - sha256: "947bfcf187f74dbc5e146c9eb9c0f10c9f8b30743e341481c1e2ed3ecc18c20c" + sha256: d2872f9c19731c2e5f10444b14686eb7cc85c76274bd6c16e1816bff9a3bab63 url: "https://pub.dev" source: hosted - version: "2.11.0" + version: "2.12.0" boolean_selector: dependency: transitive description: name: boolean_selector - sha256: "6cfb5af12253eaf2b368f07bacc5a80d1301a071c73360d746b7f2e32d762c66" + sha256: "8aab1771e1243a5063b8b0ff68042d67334e3feab9e95b9490f9a6ebf73b42ea" url: "https://pub.dev" source: hosted - version: "2.1.1" + version: "2.1.2" collection: dependency: transitive description: name: collection - sha256: ee67cb0715911d28db6bf4af1026078bd6f0128b07a5f66fb2ed94ec6783c09a + sha256: "2f5709ae4d3d59dd8f7cd309b4e023046b57d8a6c82130785d2b0e5868084e76" url: "https://pub.dev" source: hosted - version: "1.18.0" + version: "1.19.1" convert: dependency: transitive description: name: convert - sha256: "0f08b14755d163f6e2134cb58222dd25ea2a2ee8a195e53983d57c075324d592" + sha256: b30acd5944035672bc15c6b7a8b47d773e41e2f17de064350988c5d02adb1c68 url: "https://pub.dev" source: hosted - version: "3.1.1" + version: "3.1.2" coverage: dependency: transitive description: name: coverage - sha256: c1fb2dce3c0085f39dc72668e85f8e0210ec7de05345821ff58530567df345a5 + sha256: e3493833ea012784c740e341952298f1cc77f1f01b1bbc3eb4eecf6984fb7f43 url: "https://pub.dev" source: hosted - version: "1.9.2" + version: "1.11.1" crypto: dependency: transitive description: name: crypto - sha256: ec30d999af904f33454ba22ed9a86162b35e52b44ac4807d1d93c288041d7d27 + sha256: "1e445881f28f22d6140f181e07737b22f1e099a5e1ff94b0af2f9e4a463f4855" url: "https://pub.dev" source: hosted - version: "3.0.5" + version: "3.0.6" ffi: dependency: transitive description: @@ -85,10 +90,10 @@ packages: dependency: transitive description: name: file - sha256: "5fc22d7c25582e38ad9a8515372cd9a93834027aacf1801cf01164dac0ffa08c" + sha256: a3b4f84adafef897088c160faf7dfffb7696046cb13ae90b508c2cbc95d3b8d4 url: "https://pub.dev" source: hosted - version: "7.0.0" + version: "7.0.1" frontend_server_client: dependency: transitive description: @@ -101,34 +106,34 @@ packages: dependency: transitive description: name: glob - sha256: "0e7014b3b7d4dac1ca4d6114f82bf1782ee86745b9b42a92c9289c23d8a0ab63" + sha256: c3f1ee72c96f8f78935e18aa8cecced9ab132419e8625dc187e1c2408efc20de url: "https://pub.dev" source: hosted - version: "2.1.2" + version: "2.1.3" http_multi_server: dependency: transitive description: name: http_multi_server - sha256: "97486f20f9c2f7be8f514851703d0119c3596d14ea63227af6f7a481ef2b2f8b" + sha256: aa6199f908078bb1c5efb8d8638d4ae191aac11b311132c3ef48ce352fb52ef8 url: "https://pub.dev" source: hosted - version: "3.2.1" + version: "3.2.2" http_parser: dependency: transitive description: name: http_parser - sha256: "2aa08ce0341cc9b354a498388e30986515406668dbcc4f7c950c3e715496693b" + sha256: "178d74305e7866013777bab2c3d8726205dc5a4dd935297175b19a23a2e66571" url: "https://pub.dev" source: hosted - version: "4.0.2" + version: "4.1.2" io: dependency: transitive description: name: io - sha256: "2ec25704aba361659e10e3e5f5d672068d332fc8ac516421d483a11e5cbd061e" + sha256: dfd5a80599cf0165756e3181807ed3e77daf6dd4137caaad72d0b7931597650b url: "https://pub.dev" source: hosted - version: "1.0.4" + version: "1.0.5" js: dependency: transitive description: @@ -141,34 +146,42 @@ packages: dependency: transitive description: name: logging - sha256: "623a88c9594aa774443aa3eb2d41807a48486b5613e67599fb4c41c0ad47c340" + sha256: c8245ada5f1717ed44271ed1c26b8ce85ca3228fd2ffdb75468ab01979309d61 url: "https://pub.dev" source: hosted - version: "1.2.0" + version: "1.3.0" + macros: + dependency: transitive + description: + name: macros + sha256: "1d9e801cd66f7ea3663c45fc708450db1fa57f988142c64289142c9b7ee80656" + url: "https://pub.dev" + source: hosted + version: "0.1.3-main.0" matcher: dependency: transitive description: name: matcher - sha256: d2323aa2060500f906aa31a895b4030b6da3ebdcc5619d14ce1aada65cd161cb + sha256: dc58c723c3c24bf8d3e2d3ad3f2f9d7bd9cf43ec6feaa64181775e60190153f2 url: "https://pub.dev" source: hosted - version: "0.12.16+1" + version: "0.12.17" meta: dependency: transitive description: name: meta - sha256: "7687075e408b093f36e6bbf6c91878cc0d4cd10f409506f7bc996f68220b9136" + sha256: e3641ec5d63ebf0d9b41bd43201a66e3fc79a65db5f61fc181f04cd27aab950c url: "https://pub.dev" source: hosted - version: "1.12.0" + version: "1.16.0" mime: dependency: transitive description: name: mime - sha256: "801fd0b26f14a4a58ccb09d5892c3fbdeff209594300a542492cf13fba9d247a" + sha256: "41a20518f0cb1256669420fdba0cd90d21561e560ac240f26ef8322e45bb7ed6" url: "https://pub.dev" source: hosted - version: "1.0.6" + version: "2.0.0" node_preamble: dependency: transitive description: @@ -181,18 +194,18 @@ packages: dependency: transitive description: name: package_config - sha256: "1c5b77ccc91e4823a5af61ee74e6b972db1ef98c2ff5a18d3161c982a55448bd" + sha256: "92d4488434b520a62570293fbd33bb556c7d49230791c1b4bbd973baf6d2dc67" url: "https://pub.dev" source: hosted - version: "2.1.0" + version: "2.1.1" path: dependency: transitive description: name: path - sha256: "087ce49c3f0dc39180befefc60fdb4acd8f8620e5682fe2476afd0b3688bb4af" + sha256: "75cca69d1490965be98c73ceaea117e8a04dd21217b37b292c9ddbec0d955bc5" url: "https://pub.dev" source: hosted - version: "1.9.0" + version: "1.9.1" pool: dependency: transitive description: @@ -205,18 +218,18 @@ packages: dependency: transitive description: name: pub_semver - sha256: "40d3ab1bbd474c4c2328c91e3a7df8c6dd629b79ece4c4bd04bee496a224fb0c" + sha256: "7b3cfbf654f3edd0c6298ecd5be782ce997ddf0e00531b9464b55245185bbbbd" url: "https://pub.dev" source: hosted - version: "2.1.4" + version: "2.1.5" shelf: dependency: transitive description: name: shelf - sha256: ad29c505aee705f41a4d8963641f91ac4cee3c8fad5947e033390a7bd8180fa4 + sha256: e7dd780a7ffb623c57850b33f43309312fc863fb6aa3d276a754bb299839ef12 url: "https://pub.dev" source: hosted - version: "1.4.1" + version: "1.4.2" shelf_packages_handler: dependency: transitive description: @@ -237,10 +250,10 @@ packages: dependency: transitive description: name: shelf_web_socket - sha256: "9ca081be41c60190ebcb4766b2486a7d50261db7bd0f5d9615f2d653637a84c1" + sha256: cc36c297b52866d203dbf9332263c94becc2fe0ceaa9681d07b6ef9807023b67 url: "https://pub.dev" source: hosted - version: "1.0.4" + version: "2.0.1" source_map_stack_trace: dependency: transitive description: @@ -253,122 +266,130 @@ packages: dependency: transitive description: name: source_maps - sha256: "708b3f6b97248e5781f493b765c3337db11c5d2c81c3094f10904bfa8004c703" + sha256: "190222579a448b03896e0ca6eca5998fa810fda630c1d65e2f78b3f638f54812" url: "https://pub.dev" source: hosted - version: "0.10.12" + version: "0.10.13" source_span: dependency: transitive description: name: source_span - sha256: "53e943d4206a5e30df338fd4c6e7a077e02254531b138a15aec3bd143c1a8b3c" + sha256: "254ee5351d6cb365c859e20ee823c3bb479bf4a293c22d17a9f1bf144ce86f7c" url: "https://pub.dev" source: hosted - version: "1.10.0" + version: "1.10.1" sqlite3: dependency: "direct main" description: name: sqlite3 - sha256: "45f168ae2213201b54e09429ed0c593dc2c88c924a1488d6f9c523a255d567cb" + sha256: "35d3726fe18ab1463403a5cc8d97dbc81f2a0b08082e8173851363fcc97b6627" url: "https://pub.dev" source: hosted - version: "2.4.6" + version: "2.7.2" stack_trace: dependency: transitive description: name: stack_trace - sha256: "73713990125a6d93122541237550ee3352a2d84baad52d375a4cad2eb9b7ce0b" + sha256: "8b27215b45d22309b5cddda1aa2b19bdfec9df0e765f2de506401c071d38d1b1" url: "https://pub.dev" source: hosted - version: "1.11.1" + version: "1.12.1" stream_channel: dependency: transitive description: name: stream_channel - sha256: ba2aa5d8cc609d96bbb2899c28934f9e1af5cddbd60a827822ea467161eb54e7 + sha256: "969e04c80b8bcdf826f8f16579c7b14d780458bd97f56d107d3950fdbeef059d" url: "https://pub.dev" source: hosted - version: "2.1.2" + version: "2.1.4" string_scanner: dependency: transitive description: name: string_scanner - sha256: "688af5ed3402a4bde5b3a6c15fd768dbf2621a614950b17f04626c431ab3c4c3" + sha256: "921cd31725b72fe181906c6a94d987c78e3b98c2e205b397ea399d4054872b43" url: "https://pub.dev" source: hosted - version: "1.3.0" + version: "1.4.1" term_glyph: dependency: transitive description: name: term_glyph - sha256: a29248a84fbb7c79282b40b8c72a1209db169a2e0542bce341da992fe1bc7e84 + sha256: "7f554798625ea768a7518313e58f83891c7f5024f88e46e7182a4558850a4b8e" url: "https://pub.dev" source: hosted - version: "1.2.1" + version: "1.2.2" test: dependency: "direct dev" description: name: test - sha256: "7ee446762c2c50b3bd4ea96fe13ffac69919352bd3b4b17bac3f3465edc58073" + sha256: "8391fbe68d520daf2314121764d38e37f934c02fd7301ad18307bd93bd6b725d" url: "https://pub.dev" source: hosted - version: "1.25.2" + version: "1.25.14" test_api: dependency: transitive description: name: test_api - sha256: "9955ae474176f7ac8ee4e989dadfb411a58c30415bcfb648fa04b2b8a03afa7f" + sha256: fb31f383e2ee25fbbfe06b40fe21e1e458d14080e3c67e7ba0acfde4df4e0bbd url: "https://pub.dev" source: hosted - version: "0.7.0" + version: "0.7.4" test_core: dependency: transitive description: name: test_core - sha256: "2bc4b4ecddd75309300d8096f781c0e3280ca1ef85beda558d33fcbedc2eead4" + sha256: "84d17c3486c8dfdbe5e12a50c8ae176d15e2a771b96909a9442b40173649ccaa" url: "https://pub.dev" source: hosted - version: "0.6.0" + version: "0.6.8" typed_data: dependency: transitive description: name: typed_data - sha256: facc8d6582f16042dd49f2463ff1bd6e2c9ef9f3d5da3d9b087e244a7b564b3c + sha256: f9049c039ebfeb4cf7a7104a675823cd72dba8297f264b6637062516699fa006 url: "https://pub.dev" source: hosted - version: "1.3.2" + version: "1.4.0" vm_service: dependency: transitive description: name: vm_service - sha256: "5c5f338a667b4c644744b661f309fb8080bb94b18a7e91ef1dbd343bed00ed6d" + sha256: ddfa8d30d89985b96407efce8acbdd124701f96741f2d981ca860662f1c0dc02 url: "https://pub.dev" source: hosted - version: "14.2.5" + version: "15.0.0" watcher: dependency: transitive description: name: watcher - sha256: "3d2ad6751b3c16cf07c7fca317a1413b3f26530319181b37e3b9039b84fc01d8" + sha256: "69da27e49efa56a15f8afe8f4438c4ec02eff0a117df1b22ea4aad194fe1c104" url: "https://pub.dev" source: hosted - version: "1.1.0" + version: "1.1.1" web: dependency: transitive description: name: web - sha256: d43c1d6b787bf0afad444700ae7f4db8827f701bc61c255ac8d328c6f4d52062 + sha256: cd3543bd5798f6ad290ea73d210f423502e71900302dde696f8bff84bf89a1cb url: "https://pub.dev" source: hosted - version: "1.0.0" + version: "1.1.0" + web_socket: + dependency: transitive + description: + name: web_socket + sha256: "3c12d96c0c9a4eec095246debcea7b86c0324f22df69893d538fcc6f1b8cce83" + url: "https://pub.dev" + source: hosted + version: "0.1.6" web_socket_channel: dependency: transitive description: name: web_socket_channel - sha256: d88238e5eac9a42bb43ca4e721edba3c08c6354d4a53063afaa568516217621b + sha256: "0b8e2457400d8a859b7b2030786835a28a8e80836ef64402abef392ff4f1d0e5" url: "https://pub.dev" source: hosted - version: "2.4.0" + version: "3.0.2" webkit_inspection_protocol: dependency: transitive description: @@ -381,9 +402,9 @@ packages: dependency: transitive description: name: yaml - sha256: "75769501ea3489fca56601ff33454fe45507ea3bfb014161abc3b43ae25989d5" + sha256: b9da305ac7c39faa3f030eccd175340f968459dae4af175130b3fc47e40d76ce url: "https://pub.dev" source: hosted - version: "3.1.2" + version: "3.1.3" sdks: - dart: ">=3.4.0 <4.0.0" + dart: ">=3.5.0 <4.0.0" diff --git a/dart/test/sync_test.dart b/dart/test/sync_test.dart new file mode 100644 index 0000000..f4f367c --- /dev/null +++ b/dart/test/sync_test.dart @@ -0,0 +1,208 @@ +import 'dart:convert'; + +import 'package:sqlite3/common.dart'; +import 'package:test/test.dart'; + +import 'utils/native_test_utils.dart'; + +void main() { + group('sync tests', () { + late CommonDatabase db; + + setUp(() async { + db = openTestDatabase() + ..select('select powersync_init();') + ..select('select powersync_replace_schema(?)', [json.encode(_schema)]); + }); + + tearDown(() { + db.dispose(); + }); + + void pushSyncData( + String bucket, + String opId, + String rowId, + Object op, + Object? data, { + Object? descriptions = _bucketDescriptions, + }) { + final encoded = json.encode({ + 'buckets': [ + { + 'bucket': bucket, + 'data': [ + { + 'op_id': opId, + 'op': op, + 'object_type': 'items', + 'object_id': rowId, + 'checksum': 0, + 'data': data, + } + ], + } + ], + if (descriptions != null) 'descriptions': descriptions, + }); + + db.execute('insert into powersync_operations (op, data) VALUES (?, ?);', + ['save', encoded]); + } + + bool pushCheckpointComplete( + String lastOpId, String? writeCheckpoint, List checksums, + {int? priority}) { + final [row] = db.select('select powersync_validate_checkpoint(?) as r;', [ + json.encode({ + 'last_op_id': lastOpId, + 'write_checkpoint': writeCheckpoint, + 'buckets': [ + for (final cs in checksums.cast>()) + if (priority == null || cs['priority'] <= priority) cs + ], + 'priority': priority, + }) + ]); + + final decoded = json.decode(row['r']); + if (decoded['valid'] != true) { + fail(row['r']); + } + + db.execute( + 'UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))', + [ + lastOpId, + json.encode(checksums.map((e) => (e as Map)['bucket']).toList()) + ], + ); + + db.execute('INSERT INTO powersync_operations(op, data) VALUES (?, ?)', [ + 'sync_local', + priority != null + ? jsonEncode({ + 'priority': priority, + 'buckets': [ + for (final cs in checksums.cast>()) + if (cs['priority'] <= priority) cs['bucket'] + ], + }) + : null, + ]); + return db.lastInsertRowId == 1; + } + + ResultSet fetchRows() { + return db.select('select * from items'); + } + + test('does not publish until reaching checkpoint', () { + expect(fetchRows(), isEmpty); + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect(fetchRows(), isEmpty); + + expect( + pushCheckpointComplete( + '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), + isTrue); + expect(fetchRows(), [ + {'id': 'row-0', 'col': 'hi'} + ]); + }); + + test('does not publish with pending local data', () { + expect(fetchRows(), isEmpty); + db.execute("insert into items (id, col) values ('local', 'data');"); + expect(fetchRows(), isNotEmpty); + + pushSyncData('prio1', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect( + pushCheckpointComplete( + '1', null, [_bucketChecksum('prio1', 1, checksum: 0)]), + isFalse); + expect(fetchRows(), [ + {'id': 'local', 'col': 'data'} + ]); + }); + + test('publishes with local data for prio=0 buckets', () { + expect(fetchRows(), isEmpty); + db.execute("insert into items (id, col) values ('local', 'data');"); + expect(fetchRows(), isNotEmpty); + + pushSyncData('prio0', '1', 'row-0', 'PUT', {'col': 'hi'}); + expect( + pushCheckpointComplete( + '1', + null, + [_bucketChecksum('prio0', 0, checksum: 0)], + priority: 0, + ), + isTrue, + ); + expect(fetchRows(), [ + {'id': 'local', 'col': 'data'}, + {'id': 'row-0', 'col': 'hi'}, + ]); + }); + + test('can publish partial checkpoints under different priorities', () { + for (var i = 0; i < 4; i++) { + pushSyncData('prio$i', '1', 'row-$i', 'PUT', {'col': '$i'}); + } + expect(fetchRows(), isEmpty); + + // Simulate a partial checkpoint complete for each of the buckets. + for (var i = 0; i < 4; i++) { + expect( + pushCheckpointComplete( + '1', + null, + [ + for (var j = 0; j <= 4; j++) + _bucketChecksum( + 'prio$j', + j, + // Give buckets outside of the current priority a wrong + // checksum. They should not be validated yet. + checksum: j <= i ? 0 : 1234, + ), + ], + priority: i, + ), + isTrue, + ); + + expect(fetchRows(), [ + for (var j = 0; j <= i; j++) {'id': 'row-$j', 'col': '$j'}, + ]); + + expect(db.select('select 1 from ps_sync_state where priority = ?', [i]), + isNotEmpty); + } + }); + }); +} + +Object? _bucketChecksum(String bucket, int prio, {int checksum = 0}) { + return {'bucket': bucket, 'priority': prio, 'checksum': checksum}; +} + +const _schema = { + 'tables': [ + { + 'name': 'items', + 'columns': [ + {'name': 'col', 'type': 'text'} + ], + } + ] +}; + +const _bucketDescriptions = { + 'prio0': {'priority': 0}, + 'prio1': {'priority': 1}, + 'prio2': {'priority': 2}, + 'prio3': {'priority': 3}, +}; diff --git a/dart/test/utils/migration_fixtures.dart b/dart/test/utils/migration_fixtures.dart index 3797163..3cc0e2a 100644 --- a/dart/test/utils/migration_fixtures.dart +++ b/dart/test/utils/migration_fixtures.dart @@ -1,5 +1,5 @@ /// The current database version -const databaseVersion = 6; +const databaseVersion = 7; /// This is the base database state that we expect at various schema versions. /// Generated by loading the specific library version, and exporting the schema. @@ -172,7 +172,51 @@ const expectedState = { ;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') ;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') -''' +''', + 7: r''' +;CREATE TABLE ps_buckets( + id INTEGER PRIMARY KEY, + name TEXT NOT NULL, + last_applied_op INTEGER NOT NULL DEFAULT 0, + last_op INTEGER NOT NULL DEFAULT 0, + target_op INTEGER NOT NULL DEFAULT 0, + add_checksum INTEGER NOT NULL DEFAULT 0, + op_checksum INTEGER NOT NULL DEFAULT 0, + pending_delete INTEGER NOT NULL DEFAULT 0 +) STRICT +;CREATE TABLE ps_crud (id INTEGER PRIMARY KEY AUTOINCREMENT, data TEXT, tx_id INTEGER) +;CREATE TABLE ps_kv(key TEXT PRIMARY KEY NOT NULL, value BLOB) +;CREATE TABLE ps_migration(id INTEGER PRIMARY KEY, down_migrations TEXT) +;CREATE TABLE ps_oplog( + bucket INTEGER NOT NULL, + op_id INTEGER NOT NULL, + row_type TEXT, + row_id TEXT, + key TEXT, + data TEXT, + hash INTEGER NOT NULL) STRICT +;CREATE TABLE ps_sync_state ( + priority INTEGER NOT NULL, + last_synced_at TEXT NOT NULL +) STRICT +;CREATE TABLE ps_tx(id INTEGER PRIMARY KEY NOT NULL, current_tx INTEGER, next_tx INTEGER) +;CREATE TABLE ps_untyped(type TEXT NOT NULL, id TEXT NOT NULL, data TEXT, PRIMARY KEY (type, id)) +;CREATE TABLE ps_updated_rows( + row_type TEXT, + row_id TEXT, + PRIMARY KEY(row_type, row_id)) STRICT, WITHOUT ROWID +;CREATE UNIQUE INDEX ps_buckets_name ON ps_buckets (name) +;CREATE INDEX ps_oplog_key ON ps_oplog (bucket, key) +;CREATE INDEX ps_oplog_opid ON ps_oplog (bucket, op_id) +;CREATE INDEX ps_oplog_row ON ps_oplog (row_type, row_id) +;INSERT INTO ps_migration(id, down_migrations) VALUES(1, null) +;INSERT INTO ps_migration(id, down_migrations) VALUES(2, '[{"sql":"DELETE FROM ps_migration WHERE id >= 2","params":[]},{"sql":"DROP TABLE ps_tx","params":[]},{"sql":"ALTER TABLE ps_crud DROP COLUMN tx_id","params":[]}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(3, '[{"sql":"DELETE FROM ps_migration WHERE id >= 3"},{"sql":"DROP TABLE ps_kv"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(4, '[{"sql":"DELETE FROM ps_migration WHERE id >= 4"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN op_checksum"},{"sql":"ALTER TABLE ps_buckets DROP COLUMN remove_operations"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(5, '[{"sql":"SELECT powersync_drop_view(view.name)\n FROM sqlite_master view\n WHERE view.type = ''view''\n AND view.sql GLOB ''*-- powersync-auto-generated''"},{"sql":"ALTER TABLE ps_buckets RENAME TO ps_buckets_5"},{"sql":"ALTER TABLE ps_oplog RENAME TO ps_oplog_5"},{"sql":"CREATE TABLE ps_buckets(\n name TEXT PRIMARY KEY,\n last_applied_op INTEGER NOT NULL DEFAULT 0,\n last_op INTEGER NOT NULL DEFAULT 0,\n target_op INTEGER NOT NULL DEFAULT 0,\n add_checksum INTEGER NOT NULL DEFAULT 0,\n pending_delete INTEGER NOT NULL DEFAULT 0\n, op_checksum INTEGER NOT NULL DEFAULT 0, remove_operations INTEGER NOT NULL DEFAULT 0)"},{"sql":"INSERT INTO ps_buckets(name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete)\n SELECT name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete FROM ps_buckets_5"},{"sql":"CREATE TABLE ps_oplog(\n bucket TEXT NOT NULL,\n op_id INTEGER NOT NULL,\n op INTEGER NOT NULL,\n row_type TEXT,\n row_id TEXT,\n key TEXT,\n data TEXT,\n hash INTEGER NOT NULL,\n superseded INTEGER NOT NULL)"},{"sql":"CREATE INDEX ps_oplog_by_row ON ps_oplog (row_type, row_id) WHERE superseded = 0"},{"sql":"CREATE INDEX ps_oplog_by_opid ON ps_oplog (bucket, op_id)"},{"sql":"CREATE INDEX ps_oplog_by_key ON ps_oplog (bucket, key) WHERE superseded = 0"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, key, data, hash, superseded)\n SELECT ps_buckets_5.name, oplog.op_id, 3, oplog.row_type, oplog.row_id, oplog.key, oplog.data, oplog.hash, 0\n FROM ps_oplog_5 oplog\n JOIN ps_buckets_5\n ON ps_buckets_5.id = oplog.bucket"},{"sql":"DROP TABLE ps_oplog_5"},{"sql":"DROP TABLE ps_buckets_5"},{"sql":"INSERT INTO ps_oplog(bucket, op_id, op, row_type, row_id, hash, superseded)\n SELECT ''$local'', 1, 4, r.row_type, r.row_id, 0, 0\n FROM ps_updated_rows r"},{"sql":"INSERT OR REPLACE INTO ps_buckets(name, pending_delete, last_op, target_op) VALUES(''$local'', 1, 0, 9223372036854775807)"},{"sql":"DROP TABLE ps_updated_rows"},{"sql":"DELETE FROM ps_migration WHERE id >= 5"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(6, '[{"sql":"DELETE FROM ps_migration WHERE id >= 6"}]') +;INSERT INTO ps_migration(id, down_migrations) VALUES(7, '[{"sql":"INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = 2147483647"},{"sql":"DROP TABLE ps_sync_state"},{"sql":"DELETE FROM ps_migration WHERE id >= 7"}]') +''', }; final finalState = expectedState[databaseVersion]!; @@ -230,6 +274,17 @@ const data1 = { (2, 3, 'lists', 'l1', '', '{}', 3) ;INSERT INTO ps_updated_rows(row_type, row_id) VALUES ('lists', 'l2') +''', + 7: r''' +;INSERT INTO ps_buckets(id, name, last_applied_op, last_op, target_op, add_checksum, op_checksum, pending_delete) VALUES + (1, 'b1', 0, 0, 0, 0, 120, 0), + (2, 'b2', 0, 0, 0, 1005, 3, 0) +;INSERT INTO ps_oplog(bucket, op_id, row_type, row_id, key, data, hash) VALUES + (1, 1, 'todos', 't1', '', '{}', 100), + (1, 2, 'todos', 't2', '', '{}', 20), + (2, 3, 'lists', 'l1', '', '{}', 3) +;INSERT INTO ps_updated_rows(row_type, row_id) VALUES + ('lists', 'l2') ''' }; @@ -270,7 +325,8 @@ final dataDown1 = { ('b1', 2, 3, 'todos', 't2', '', '{}', 20, 0), ('b2', 3, 3, 'lists', 'l1', '', '{}', 3, 0) ''', - 5: data1[5]! + 5: data1[5]!, + 6: data1[5]! }; final finalData1 = data1[databaseVersion]!; diff --git a/dart/test/utils/native_test_utils.dart b/dart/test/utils/native_test_utils.dart index 284142e..bc8b637 100644 --- a/dart/test/utils/native_test_utils.dart +++ b/dart/test/utils/native_test_utils.dart @@ -3,6 +3,7 @@ import 'dart:ffi'; import 'package:sqlite3/common.dart'; import 'package:sqlite3/open.dart' as sqlite_open; import 'package:sqlite3/sqlite3.dart'; +import 'package:path/path.dart' as p; const defaultSqlitePath = 'libsqlite3.so.0'; @@ -22,29 +23,24 @@ CommonDatabase openTestDatabase() { } String getLibraryForPlatform({String? path = "."}) { - switch (Abi.current()) { - case Abi.androidArm: - case Abi.androidArm64: - case Abi.androidX64: - return '$path/libpowersync.so'; - case Abi.macosArm64: - case Abi.macosX64: - return '$path/libpowersync.dylib'; - case Abi.linuxX64: - case Abi.linuxArm64: - return '$path/libpowersync.so'; - case Abi.windowsX64: - return '$path/powersync.dll'; - case Abi.androidIA32: - throw ArgumentError( + // Using an absolute path is required for macOS, where Dart can't dlopen + // relative paths due to being a "hardened program". + return p.normalize(p.absolute(switch (Abi.current()) { + Abi.androidArm || + Abi.androidArm64 || + Abi.androidX64 => + '$path/libpowersync.so', + Abi.macosArm64 || Abi.macosX64 => '$path/libpowersync.dylib', + Abi.linuxX64 || Abi.linuxArm64 => '$path/libpowersync.so', + Abi.windowsX64 => '$path/powersync.dll', + Abi.androidIA32 => throw ArgumentError( 'Unsupported processor architecture. X86 Android emulators are not ' 'supported. Please use an x86_64 emulator instead. All physical ' 'Android devices are supported including 32bit ARM.', - ); - default: - throw ArgumentError( + ), + _ => throw ArgumentError( 'Unsupported processor architecture "${Abi.current()}". ' 'Please open an issue on GitHub to request it.', - ); - } + ) + })); }