Skip to content

Support bucket with different priorities #55

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Feb 12, 2025
89 changes: 89 additions & 0 deletions crates/core/src/bucket_priority.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
use serde::{de::Visitor, Deserialize};
use sqlite_nostd::ResultCode;

use crate::error::SQLiteError;

#[repr(transparent)]
#[derive(Clone, Copy, PartialEq, Eq)]
pub struct BucketPriority {
pub number: i32,
}

impl BucketPriority {
pub fn may_publish_with_outstanding_uploads(self) -> bool {
self == BucketPriority::HIGHEST
}

pub const HIGHEST: BucketPriority = BucketPriority { number: 0 };

/// A low priority used to represent fully-completed sync operations across all priorities.
pub const SENTINEL: BucketPriority = BucketPriority { number: i32::MAX };
}

impl TryFrom<i32> for BucketPriority {
type Error = SQLiteError;

fn try_from(value: i32) -> Result<Self, Self::Error> {
if value < BucketPriority::HIGHEST.number || value == Self::SENTINEL.number {
return Err(SQLiteError(
ResultCode::MISUSE,
Some("Invalid bucket priority".into()),
));
}

return Ok(BucketPriority { number: value });
}
}

impl Into<i32> for BucketPriority {
fn into(self) -> i32 {
self.number
}
}

impl PartialOrd<BucketPriority> for BucketPriority {
fn partial_cmp(&self, other: &BucketPriority) -> Option<core::cmp::Ordering> {
Some(self.number.partial_cmp(&other.number)?.reverse())
}
}

impl<'de> Deserialize<'de> for BucketPriority {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
struct PriorityVisitor;
impl<'de> Visitor<'de> for PriorityVisitor {
type Value = BucketPriority;

fn expecting(&self, formatter: &mut core::fmt::Formatter) -> core::fmt::Result {
formatter.write_str("a priority as an integer between 0 and 3 (inclusive)")
}

fn visit_i32<E>(self, v: i32) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
BucketPriority::try_from(v).map_err(|e| E::custom(e.1.unwrap_or_default()))
}

fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?;
Self::visit_i32(self, i)
}

fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: serde::de::Error,
{
let i: i32 = v.try_into().map_err(|_| E::custom("int too large"))?;
Self::visit_i32(self, i)
}
}

deserializer.deserialize_i32(PriorityVisitor)
}
}
8 changes: 5 additions & 3 deletions crates/core/src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use sqlite::ResultCode;
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::bucket_priority::BucketPriority;
use crate::create_sqlite_optional_text_fn;
use crate::create_sqlite_text_fn;
use crate::error::SQLiteError;
Expand Down Expand Up @@ -46,13 +47,14 @@ fn powersync_last_synced_at_impl(
let db = ctx.db_handle();

// language=SQLite
let statement = db.prepare_v2("select value from ps_kv where key = 'last_synced_at'")?;
let statement = db.prepare_v2("select last_synced_at from ps_sync_state where priority = ?")?;
statement.bind_int(1, BucketPriority::SENTINEL.into())?;

if statement.step()? == ResultCode::ROW {
let client_id = statement.column_text(0)?;
return Ok(Some(client_id.to_string()));
Ok(Some(client_id.to_string()))
} else {
return Ok(None);
Ok(None)
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use core::ffi::{c_char, c_int};
use sqlite::ResultCode;
use sqlite_nostd as sqlite;

mod bucket_priority;
mod checkpoint;
mod crud_vtab;
mod diff;
Expand Down
23 changes: 23 additions & 0 deletions crates/core/src/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use sqlite::ResultCode;
use sqlite_nostd as sqlite;
use sqlite_nostd::{Connection, Context};

use crate::bucket_priority::BucketPriority;
use crate::error::{PSResult, SQLiteError};
use crate::fix035::apply_v035_fix;

Expand Down Expand Up @@ -310,5 +311,27 @@ json_array(
.into_db_result(local_db)?;
}

if current_version < 7 && target_version >= 7 {
const SENTINEL_PRIORITY: i32 = BucketPriority::SENTINEL.number;
let stmt = format!("\
CREATE TABLE ps_sync_state (
priority INTEGER NOT NULL,
last_synced_at TEXT NOT NULL
) STRICT;
INSERT OR IGNORE INTO ps_sync_state (priority, last_synced_at)
SELECT {}, value from ps_kv where key = 'last_synced_at';

INSERT INTO ps_migration(id, down_migrations)
VALUES(7,
json_array(
json_object('sql', 'INSERT OR REPLACE INTO ps_kv(key, value) SELECT ''last_synced_at'', last_synced_at FROM ps_sync_state WHERE priority = {}'),
json_object('sql', 'DROP TABLE ps_sync_state'),
json_object('sql', 'DELETE FROM ps_migration WHERE id >= 7')
));
", SENTINEL_PRIORITY, SENTINEL_PRIORITY);

local_db.exec_safe(&stmt).into_db_result(local_db)?;
}

Ok(())
}
2 changes: 1 addition & 1 deletion crates/core/src/operations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ SELECT
json_extract(e.value, '$.has_more') as has_more,
json_extract(e.value, '$.after') as after,
json_extract(e.value, '$.next_after') as next_after
FROM json_each(json_extract(?, '$.buckets')) e",
FROM json_each(json_extract(?1, '$.buckets')) e",
)?;
statement.bind_text(1, data, sqlite::Destructor::STATIC)?;

Expand Down
11 changes: 5 additions & 6 deletions crates/core/src/operations_vtab.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,30 +76,29 @@ extern "C" fn update(
} else if rowid.value_type() == sqlite::ColumnType::Null {
// INSERT
let op = args[2].text();
let data = args[3].text();

let tab = unsafe { &mut *vtab.cast::<VirtualTable>() };
let db = tab.db;

if op == "save" {
let result = insert_operation(db, data);
let result = insert_operation(db, args[3].text());
vtab_result(vtab, result)
} else if op == "sync_local" {
let result = sync_local(db, data);
let result = sync_local(db, &args[3]);
if let Ok(result_row) = result {
unsafe {
*p_row_id = result_row;
}
}
vtab_result(vtab, result)
} else if op == "clear_remove_ops" {
let result = clear_remove_ops(db, data);
let result = clear_remove_ops(db, args[3].text());
vtab_result(vtab, result)
} else if op == "delete_pending_buckets" {
let result = delete_pending_buckets(db, data);
let result = delete_pending_buckets(db, args[3].text());
vtab_result(vtab, result)
} else if op == "delete_bucket" {
let result = delete_bucket(db, data);
let result = delete_bucket(db, args[3].text());
vtab_result(vtab, result)
} else {
ResultCode::MISUSE as c_int
Expand Down
Loading