diff --git a/src/kv.rs b/src/kv.rs index 5922b7f..0d383ce 100644 --- a/src/kv.rs +++ b/src/kv.rs @@ -13,41 +13,143 @@ pub struct KvRequest { pub action: KvAction, } +/// IPC Action format, representing operations that can be performed on the key-value runtime module. +/// These actions are included in a KvRequest sent to the kv:distro:sys runtime module. #[derive(Debug, Serialize, Deserialize, Clone)] pub enum KvAction { + /// Opens an existing key-value database or creates a new one if it doesn't exist. Open, + /// Permanently deletes the entire key-value database. RemoveDb, + /// Sets a value for the specified key in the database. + /// + /// # Parameters + /// * `key` - The key as a byte vector + /// * `tx_id` - Optional transaction ID if this operation is part of a transaction Set { key: Vec, tx_id: Option }, + /// Deletes a key-value pair from the database. + /// + /// # Parameters + /// * `key` - The key to delete as a byte vector + /// * `tx_id` - Optional transaction ID if this operation is part of a transaction Delete { key: Vec, tx_id: Option }, + /// Retrieves the value associated with the specified key. + /// + /// # Parameters + /// * `key` - The key to look up as a byte vector Get { key: Vec }, + /// Begins a new transaction for atomic operations. BeginTx, + /// Commits all operations in the specified transaction. + /// + /// # Parameters + /// * `tx_id` - The ID of the transaction to commit Commit { tx_id: u64 }, + /// Creates a backup of the database. Backup, + /// Starts an iterator over the database contents. + /// + /// # Parameters + /// * `prefix` - Optional byte vector to filter keys by prefix + IterStart { prefix: Option> }, + /// Advances the iterator and returns the next batch of items. + /// + /// # Parameters + /// * `iterator_id` - The ID of the iterator to advance + /// * `count` - Maximum number of items to return + IterNext { iterator_id: u64, count: u64 }, + /// Closes an active iterator. + /// + /// # Parameters + /// * `iterator_id` - The ID of the iterator to close + IterClose { iterator_id: u64 }, } +/// Response types for key-value store operations. +/// These responses are returned after processing a KvAction request. #[derive(Debug, Serialize, Deserialize)] pub enum KvResponse { + /// Indicates successful completion of an operation. Ok, + /// Returns the transaction ID for a newly created transaction. + /// + /// # Fields + /// * `tx_id` - The ID of the newly created transaction BeginTx { tx_id: u64 }, + /// Returns the key that was retrieved from the database. + /// + /// # Fields + /// * `key` - The retrieved key as a byte vector Get { key: Vec }, + /// Indicates an error occurred during the operation. + /// + /// # Fields + /// * `error` - The specific error that occurred Err { error: KvError }, + /// Returns the ID of a newly created iterator. + /// + /// # Fields + /// * `iterator_id` - The ID of the created iterator + IterStart { iterator_id: u64 }, + /// Indicates whether the iterator has more items. + /// + /// # Fields + /// * `done` - True if there are no more items to iterate over + IterNext { done: bool }, + /// Confirms the closure of an iterator. + /// + /// # Fields + /// * `iterator_id` - The ID of the closed iterator + IterClose { iterator_id: u64 }, } +/// Errors that can occur during key-value store operations. +/// These errors are returned as part of `KvResponse::Err` when an operation fails. #[derive(Debug, Serialize, Deserialize, Error)] pub enum KvError { - #[error("kv: DbDoesNotExist")] + /// The requested database does not exist. + #[error("Database does not exist")] NoDb, - #[error("kv: KeyNotFound")] + + /// The requested key was not found in the database. + #[error("Key not found in database")] KeyNotFound, - #[error("kv: no Tx found")] + + /// No active transaction found for the given transaction ID. + #[error("Transaction not found")] NoTx, - #[error("kv: No capability: {error}")] + + /// The specified iterator was not found. + #[error("Iterator not found")] + NoIterator, + + /// The operation requires capabilities that the caller doesn't have. + /// + /// # Fields + /// * `error` - Description of the missing capability or permission + #[error("Missing required capability: {error}")] NoCap { error: String }, - #[error("kv: rocksdb internal error: {error}")] + + /// An internal RocksDB error occurred during the operation. + /// + /// # Fields + /// * `action` - The operation that was being performed + /// * `error` - The specific error message from RocksDB + #[error("RocksDB error during {action}: {error}")] RocksDBError { action: String, error: String }, - #[error("kv: input bytes/json/key error: {error}")] + + /// Error parsing or processing input data. + /// + /// # Fields + /// * `error` - Description of what was invalid about the input + #[error("Invalid input: {error}")] InputError { error: String }, - #[error("kv: IO error: {error}")] + + /// An I/O error occurred during the operation. + /// + /// # Fields + /// * `error` - Description of the I/O error + #[error("I/O error: {error}")] IOError { error: String }, } @@ -280,6 +382,126 @@ where } } + /// Get all key-value pairs with an optional prefix + /// + /// # Example + /// ``` + /// let entries = kv.iter_all(Some(&"user_"), 100)?; + /// for (key, value) in entries { + /// println!("key: {}, value: {:?}", key, value); + /// } + /// ``` + pub fn iter_all(&self, prefix: Option<&K>, batch_size: u64) -> anyhow::Result> { + // Start the iterator + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::IterStart { + prefix: prefix.map(|p| serde_json::to_vec(p)).transpose()?, + }, + })?) + .send_and_await_response(self.timeout)?; + + let iterator_id = match res { + Ok(Message::Response { body, .. }) => { + match serde_json::from_slice::(&body)? { + KvResponse::IterStart { iterator_id } => iterator_id, + KvResponse::Err { error } => return Err(error.into()), + _ => return Err(anyhow::anyhow!("kv: unexpected response")), + } + } + _ => return Err(anyhow::anyhow!("kv: unexpected message")), + }; + + let mut all_entries = Vec::new(); + + // Collect all entries + loop { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::IterNext { + iterator_id, + count: batch_size, + }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + match serde_json::from_slice::(&body)? { + KvResponse::IterNext { done } => { + let blob = get_blob().ok_or_else(|| anyhow::anyhow!("No blob data"))?; + let entries: Vec<(Vec, Vec)> = + serde_json::from_slice(&blob.bytes)?; + for (key_bytes, value_bytes) in entries { + let key = serde_json::from_slice(&key_bytes)?; + let value = serde_json::from_slice(&value_bytes)?; + all_entries.push((key, value)); + } + if done { + break; + } + } + KvResponse::Err { error } => return Err(error.into()), + _ => return Err(anyhow::anyhow!("kv: unexpected response")), + } + } + _ => return Err(anyhow::anyhow!("kv: unexpected message")), + } + } + + // Clean up + let _ = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::IterClose { iterator_id }, + })?) + .send_and_await_response(self.timeout)?; + + Ok(all_entries) + } + + /// Get all keys with an optional prefix + /// + /// # Example + /// ``` + /// let keys = kv.collect_keys(Some(&"user_"))?; + /// for key in keys { + /// println!("key: {}", key); + /// } + /// ``` + pub fn collect_keys(&self, prefix: Option<&K>) -> anyhow::Result> { + Ok(self + .iter_all(prefix, 100)? + .into_iter() + .map(|(k, _)| k) + .collect()) + } + + /// Get all values with an optional key prefix + /// + /// # Example + /// ``` + /// let values = kv.collect_values(Some(&"user_"))?; + /// for value in values { + /// println!("value: {:?}", value); + /// } + /// ``` + pub fn collect_values(&self, prefix: Option<&K>) -> anyhow::Result> { + Ok(self + .iter_all(prefix, 100)? + .into_iter() + .map(|(_, v)| v) + .collect()) + } + /// Commit a transaction. pub fn commit_tx(&self, tx_id: u64) -> anyhow::Result<()> { let res = Request::new() @@ -306,6 +528,105 @@ where } } +impl Kv, Vec> { + /// Get raw bytes directly + pub fn get_raw(&self, key: &[u8]) -> anyhow::Result> { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Get { key: key.to_vec() }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Get { .. } => { + let bytes = match get_blob() { + Some(bytes) => bytes.bytes, + None => return Err(anyhow::anyhow!("kv: no blob")), + }; + Ok(bytes) + } + KvResponse::Err { error } => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Set raw bytes directly + pub fn set_raw(&self, key: &[u8], value: &[u8], tx_id: Option) -> anyhow::Result<()> { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Set { + key: key.to_vec(), + tx_id, + }, + })?) + .blob_bytes(value.to_vec()) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err { error } => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } + + /// Delete raw bytes directly + pub fn delete_raw(&self, key: &[u8], tx_id: Option) -> anyhow::Result<()> { + let res = Request::new() + .target(("our", "kv", "distro", "sys")) + .body(serde_json::to_vec(&KvRequest { + package_id: self.package_id.clone(), + db: self.db.clone(), + action: KvAction::Delete { + key: key.to_vec(), + tx_id, + }, + })?) + .send_and_await_response(self.timeout)?; + + match res { + Ok(Message::Response { body, .. }) => { + let response = serde_json::from_slice::(&body)?; + + match response { + KvResponse::Ok => Ok(()), + KvResponse::Err { error } => Err(error.into()), + _ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)), + } + } + _ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)), + } + } +} + +/// Helper function to open a raw bytes key-value store +pub fn open_raw( + package_id: PackageId, + db: &str, + timeout: Option, +) -> anyhow::Result, Vec>> { + open(package_id, db, timeout) +} + /// Opens or creates a kv db. pub fn open(package_id: PackageId, db: &str, timeout: Option) -> anyhow::Result> where