From 12488555bf394024bd8319affc15faae4d17b321 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Wed, 18 Dec 2024 00:13:11 +0200 Subject: [PATCH 1/5] kv: add iterator helpers --- src/kv.rs | 123 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) diff --git a/src/kv.rs b/src/kv.rs index 516880c..097cb60 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -23,6 +23,9 @@ pub enum KvAction { BeginTx, Commit { tx_id: u64 }, Backup, + IterStart { prefix: Option> }, + IterNext { iterator_id: u64, count: u64 }, + IterClose { iterator_id: u64 }, } #[derive(Debug, Serialize, Deserialize)] @@ -31,6 +34,15 @@ pub enum KvResponse { BeginTx { tx_id: u64 }, Get { key: Vec }, Err { error: KvError }, + IterStart { + iterator_id: u64, + }, + IterNext { + done: bool, + }, + IterClose { + iterator_id: u64, + }, } #[derive(Debug, Serialize, Deserialize, Error)] @@ -181,6 +193,117 @@ where } } + /// Get all key-value pairs with an optional prefix + /// + /// # Example + /// ``` + /// let entries = kv.iter_all(Some(&"user_"), 100)?; + /// for (key, value) in entries { + /// println!("key: {}, value: {:?}", key, value); + /// } + /// ``` + pub fn iter_all(&self, prefix: Option<&K>, batch_size: u64) -> anyhow::Result> { + // Start the iterator + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::IterStart { + prefix: prefix.map(|p| serde_json::to_vec(p)).transpose()? + }, + })?) + .send_and_await_response(self.timeout)?; + + let iterator_id = match res { + Ok(Message::Response { body, .. }) => { + match serde_json::from_slice::(&body)? { + KvResponse::IterStart { iterator_id } => iterator_id, + KvResponse::Err { error } => return Err(error.into()), + _ => return Err(anyhow::anyhow!("kv: unexpected response")), + } + } + _ => return Err(anyhow::anyhow!("kv: unexpected message")), + }; + + let mut all_entries = Vec::new(); + + // Collect all entries + loop { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::IterNext { + iterator_id, + count: batch_size, + }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + match serde_json::from_slice::(&body)? { + KvResponse::IterNext { done } => { + let entries_bytes = get_blob().ok_or_else(|| anyhow::anyhow!("No blob data"))?; + let entries: Vec<(Vec, Vec)> = serde_json::from_slice(&entries_bytes)?; + for (key_bytes, value_bytes) in entries { + let key = serde_json::from_slice(&key_bytes)?; + let value = serde_json::from_slice(&value_bytes)?; + all_entries.push((key, value)); + } + if done { + break; + } + } + KvResponse::Err { error } => return Err(error.into()), + _ => return Err(anyhow::anyhow!("kv: unexpected response")), + } + } + _ => return Err(anyhow::anyhow!("kv: unexpected message")), + } + } + + // Clean up + let _ = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::IterClose { iterator_id }, + })?) + .send_and_await_response(self.timeout)?; + + Ok(all_entries) + } + + /// Get all keys with an optional prefix + /// + /// # Example + /// ``` + /// let keys = kv.collect_keys(Some(&"user_"))?; + /// for key in keys { + /// println!("key: {}", key); + /// } + /// ``` + pub fn collect_keys(&self, prefix: Option<&K>) -> anyhow::Result> { + Ok(self.iter_all(prefix, 100)?.into_iter().map(|(k, _)| k).collect()) + } + + /// Get all values with an optional key prefix + /// + /// # Example + /// ``` + /// let values = kv.collect_values(Some(&"user_"))?; + /// for value in values { + /// println!("value: {:?}", value); + /// } + /// ``` + pub fn collect_values(&self, prefix: Option<&K>) -> anyhow::Result> { + Ok(self.iter_all(prefix, 100)?.into_iter().map(|(_, v)| v).collect()) + } + /// Commit a transaction. pub fn commit_tx(&self, tx_id: u64) -> anyhow::Result<()> { let res = Request::new() From ee43e9da6d3accc71340abc378ba184830a6826a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 17 Dec 2024 22:13:37 +0000 Subject: [PATCH 2/5] Format Rust code using rustfmt --- src/kv.rs | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/src/kv.rs b/src/kv.rs index 097cb60..fd99970 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -34,15 +34,9 @@ pub enum KvResponse { BeginTx { tx_id: u64 }, Get { key: Vec }, Err { error: KvError }, - IterStart { - iterator_id: u64, - }, - IterNext { - done: bool, - }, - IterClose { - iterator_id: u64, - }, + IterStart { iterator_id: u64 }, + IterNext { done: bool }, + IterClose { iterator_id: u64 }, } #[derive(Debug, Serialize, Deserialize, Error)] @@ -194,7 +188,7 @@ where } /// Get all key-value pairs with an optional prefix - /// + /// /// # Example /// ``` /// let entries = kv.iter_all(Some(&"user_"), 100)?; @@ -209,8 +203,8 @@ where .body(serde_json::to_vec(&KvRequest { package_id: self.package_id.clone(), db: self.db.clone(), - action: KvAction::IterStart { - prefix: prefix.map(|p| serde_json::to_vec(p)).transpose()? + action: KvAction::IterStart { + prefix: prefix.map(|p| serde_json::to_vec(p)).transpose()?, }, })?) .send_and_await_response(self.timeout)?; @@ -227,7 +221,7 @@ where }; let mut all_entries = Vec::new(); - + // Collect all entries loop { let res = Request::new() @@ -246,8 +240,10 @@ where Ok(Message::Response { body, .. }) => { match serde_json::from_slice::(&body)? { KvResponse::IterNext { done } => { - let entries_bytes = get_blob().ok_or_else(|| anyhow::anyhow!("No blob data"))?; - let entries: Vec<(Vec, Vec)> = serde_json::from_slice(&entries_bytes)?; + let entries_bytes = + get_blob().ok_or_else(|| anyhow::anyhow!("No blob data"))?; + let entries: Vec<(Vec, Vec)> = + serde_json::from_slice(&entries_bytes)?; for (key_bytes, value_bytes) in entries { let key = serde_json::from_slice(&key_bytes)?; let value = serde_json::from_slice(&value_bytes)?; @@ -279,7 +275,7 @@ where } /// Get all keys with an optional prefix - /// + /// /// # Example /// ``` /// let keys = kv.collect_keys(Some(&"user_"))?; @@ -288,11 +284,15 @@ where /// } /// ``` pub fn collect_keys(&self, prefix: Option<&K>) -> anyhow::Result> { - Ok(self.iter_all(prefix, 100)?.into_iter().map(|(k, _)| k).collect()) + Ok(self + .iter_all(prefix, 100)? + .into_iter() + .map(|(k, _)| k) + .collect()) } /// Get all values with an optional key prefix - /// + /// /// # Example /// ``` /// let values = kv.collect_values(Some(&"user_"))?; @@ -301,7 +301,11 @@ where /// } /// ``` pub fn collect_values(&self, prefix: Option<&K>) -> anyhow::Result> { - Ok(self.iter_all(prefix, 100)?.into_iter().map(|(_, v)| v).collect()) + Ok(self + .iter_all(prefix, 100)? + .into_iter() + .map(|(_, v)| v) + .collect()) } /// Commit a transaction. From 997485139b75cda84df30017f49b9dba0b469647 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Fri, 20 Dec 2024 00:07:16 +0200 Subject: [PATCH 3/5] kv: comments and tweaks --- src/kv.rs | 114 +++++++++++++++++++++++++++++++++++++++++++++++++----- 1 file changed, 105 insertions(+), 9 deletions(-) diff --git a/src/kv.rs b/src/kv.rs index fd99970..773fdee 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -13,47 +13,143 @@ pub struct KvRequest { pub action: KvAction, } +/// IPC Action format, representing operations that can be performed on the key-value runtime module. +/// These actions are included in a KvRequest sent to the kv:distro:sys runtime module. #[derive(Debug, Serialize, Deserialize, Clone)] pub enum KvAction { + /// Opens an existing key-value database or creates a new one if it doesn't exist. Open, + /// Permanently deletes the entire key-value database. RemoveDb, + /// Sets a value for the specified key in the database. + /// + /// # Parameters + /// * `key` - The key as a byte vector + /// * `tx_id` - Optional transaction ID if this operation is part of a transaction Set { key: Vec, tx_id: Option }, + /// Deletes a key-value pair from the database. + /// + /// # Parameters + /// * `key` - The key to delete as a byte vector + /// * `tx_id` - Optional transaction ID if this operation is part of a transaction Delete { key: Vec, tx_id: Option }, + /// Retrieves the value associated with the specified key. + /// + /// # Parameters + /// * `key` - The key to look up as a byte vector Get { key: Vec }, + /// Begins a new transaction for atomic operations. BeginTx, + /// Commits all operations in the specified transaction. + /// + /// # Parameters + /// * `tx_id` - The ID of the transaction to commit Commit { tx_id: u64 }, + /// Creates a backup of the database. Backup, + /// Starts an iterator over the database contents. + /// + /// # Parameters + /// * `prefix` - Optional byte vector to filter keys by prefix IterStart { prefix: Option> }, + /// Advances the iterator and returns the next batch of items. + /// + /// # Parameters + /// * `iterator_id` - The ID of the iterator to advance + /// * `count` - Maximum number of items to return IterNext { iterator_id: u64, count: u64 }, + /// Closes an active iterator. + /// + /// # Parameters + /// * `iterator_id` - The ID of the iterator to close IterClose { iterator_id: u64 }, } +/// Response types for key-value store operations. +/// These responses are returned after processing a KvAction request. #[derive(Debug, Serialize, Deserialize)] pub enum KvResponse { + /// Indicates successful completion of an operation. Ok, + /// Returns the transaction ID for a newly created transaction. + /// + /// # Fields + /// * `tx_id` - The ID of the newly created transaction BeginTx { tx_id: u64 }, + /// Returns the key that was retrieved from the database. + /// + /// # Fields + /// * `key` - The retrieved key as a byte vector Get { key: Vec }, + /// Indicates an error occurred during the operation. + /// + /// # Fields + /// * `error` - The specific error that occurred Err { error: KvError }, + /// Returns the ID of a newly created iterator. + /// + /// # Fields + /// * `iterator_id` - The ID of the created iterator IterStart { iterator_id: u64 }, + /// Indicates whether the iterator has more items. + /// + /// # Fields + /// * `done` - True if there are no more items to iterate over IterNext { done: bool }, + /// Confirms the closure of an iterator. + /// + /// # Fields + /// * `iterator_id` - The ID of the closed iterator IterClose { iterator_id: u64 }, } +/// Errors that can occur during key-value store operations. +/// These errors are returned as part of `KvResponse::Err` when an operation fails. #[derive(Debug, Serialize, Deserialize, Error)] pub enum KvError { - #[error("kv: DbDoesNotExist")] + /// The requested database does not exist. + #[error("Database does not exist")] NoDb, - #[error("kv: KeyNotFound")] + + /// The requested key was not found in the database. + #[error("Key not found in database")] KeyNotFound, - #[error("kv: no Tx found")] + + /// No active transaction found for the given transaction ID. + #[error("Transaction not found")] NoTx, - #[error("kv: No capability: {error}")] + + /// The specified iterator was not found. + #[error("Iterator not found")] + NoIterator, + + /// The operation requires capabilities that the caller doesn't have. + /// + /// # Fields + /// * `error` - Description of the missing capability or permission + #[error("Missing required capability: {error}")] NoCap { error: String }, - #[error("kv: rocksdb internal error: {error}")] + + /// An internal RocksDB error occurred during the operation. + /// + /// # Fields + /// * `action` - The operation that was being performed + /// * `error` - The specific error message from RocksDB + #[error("RocksDB error during {action}: {error}")] RocksDBError { action: String, error: String }, - #[error("kv: input bytes/json/key error: {error}")] + + /// Error parsing or processing input data. + /// + /// # Fields + /// * `error` - Description of what was invalid about the input + #[error("Invalid input: {error}")] InputError { error: String }, - #[error("kv: IO error: {error}")] + + /// An I/O error occurred during the operation. + /// + /// # Fields + /// * `error` - Description of the I/O error + #[error("I/O error: {error}")] IOError { error: String }, } @@ -240,10 +336,10 @@ where Ok(Message::Response { body, .. }) => { match serde_json::from_slice::(&body)? { KvResponse::IterNext { done } => { - let entries_bytes = + let blob = get_blob().ok_or_else(|| anyhow::anyhow!("No blob data"))?; let entries: Vec<(Vec, Vec)> = - serde_json::from_slice(&entries_bytes)?; + serde_json::from_slice(&blob.bytes)?; for (key_bytes, value_bytes) in entries { let key = serde_json::from_slice(&key_bytes)?; let value = serde_json::from_slice(&value_bytes)?; From c5cdb1a1e9a6d6572e8401a04d4598c5f47d809d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 19 Dec 2024 22:07:42 +0000 Subject: [PATCH 4/5] Format Rust code using rustfmt --- src/kv.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/kv.rs b/src/kv.rs index 773fdee..29a79a6 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -336,8 +336,7 @@ where Ok(Message::Response { body, .. }) => { match serde_json::from_slice::(&body)? { KvResponse::IterNext { done } => { - let blob = - get_blob().ok_or_else(|| anyhow::anyhow!("No blob data"))?; + let blob = get_blob().ok_or_else(|| anyhow::anyhow!("No blob data"))?; let entries: Vec<(Vec, Vec)> = serde_json::from_slice(&blob.bytes)?; for (key_bytes, value_bytes) in entries { From d1a13e92cfd1fa2747e5e195c6dd45685dfeb8b8 Mon Sep 17 00:00:00 2001 From: bitful-pannul Date: Fri, 20 Dec 2024 15:02:59 +0200 Subject: [PATCH 5/5] kv: add raw bytes helpers --- src/kv.rs | 99 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 99 insertions(+) diff --git a/src/kv.rs b/src/kv.rs index 765f462..0d383ce 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -528,6 +528,105 @@ where } } +impl Kv, Vec> { + /// Get raw bytes directly + pub fn get_raw(&self, key: &[u8]) -> anyhow::Result> { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Get { key: key.to_vec() }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Get { .. } => { + let bytes = match get_blob() { + Some(bytes) => bytes.bytes, + None => return Err(anyhow::anyhow!("kv: no blob")), + }; + Ok(bytes) + } + KvResponse::Err { error } => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Set raw bytes directly + pub fn set_raw(&self, key: &[u8], value: &[u8], tx_id: Option) -> anyhow::Result<()> { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Set { + key: key.to_vec(), + tx_id, + }, + })?) + .blob_bytes(value.to_vec()) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err { error } => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Delete raw bytes directly + pub fn delete_raw(&self, key: &[u8], tx_id: Option) -> anyhow::Result<()> { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Delete { + key: key.to_vec(), + tx_id, + }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err { error } => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } +} + +/// Helper function to open a raw bytes key-value store +pub fn open_raw( + package_id: PackageId, + db: &str, + timeout: Option, +) -> anyhow::Result, Vec>> { + open(package_id, db, timeout) +} + /// Opens or creates a kv db. pub fn open(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result> where