Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kv: iterators #125

Draft
wants to merge 6 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
335 changes: 328 additions & 7 deletions src/kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,41 +13,143 @@ pub struct KvRequest {
pub action: KvAction,
}

/// IPC Action format, representing operations that can be performed on the key-value runtime module.
/// These actions are included in a KvRequest sent to the kv:distro:sys runtime module.
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum KvAction {
/// Opens an existing key-value database or creates a new one if it doesn't exist.
Open,
/// Permanently deletes the entire key-value database.
RemoveDb,
/// Sets a value for the specified key in the database.
///
/// # Parameters
/// * `key` - The key as a byte vector
/// * `tx_id` - Optional transaction ID if this operation is part of a transaction
Set { key: Vec<u8>, tx_id: Option<u64> },
/// Deletes a key-value pair from the database.
///
/// # Parameters
/// * `key` - The key to delete as a byte vector
/// * `tx_id` - Optional transaction ID if this operation is part of a transaction
Delete { key: Vec<u8>, tx_id: Option<u64> },
/// Retrieves the value associated with the specified key.
///
/// # Parameters
/// * `key` - The key to look up as a byte vector
Get { key: Vec<u8> },
/// Begins a new transaction for atomic operations.
BeginTx,
/// Commits all operations in the specified transaction.
///
/// # Parameters
/// * `tx_id` - The ID of the transaction to commit
Commit { tx_id: u64 },
/// Creates a backup of the database.
Backup,
/// Starts an iterator over the database contents.
///
/// # Parameters
/// * `prefix` - Optional byte vector to filter keys by prefix
IterStart { prefix: Option<Vec<u8>> },
/// Advances the iterator and returns the next batch of items.
///
/// # Parameters
/// * `iterator_id` - The ID of the iterator to advance
/// * `count` - Maximum number of items to return
IterNext { iterator_id: u64, count: u64 },
/// Closes an active iterator.
///
/// # Parameters
/// * `iterator_id` - The ID of the iterator to close
IterClose { iterator_id: u64 },
}

/// Response types for key-value store operations.
/// These responses are returned after processing a KvAction request.
#[derive(Debug, Serialize, Deserialize)]
pub enum KvResponse {
/// Indicates successful completion of an operation.
Ok,
/// Returns the transaction ID for a newly created transaction.
///
/// # Fields
/// * `tx_id` - The ID of the newly created transaction
BeginTx { tx_id: u64 },
/// Returns the key that was retrieved from the database.
///
/// # Fields
/// * `key` - The retrieved key as a byte vector
Get { key: Vec<u8> },
/// Indicates an error occurred during the operation.
///
/// # Fields
/// * `error` - The specific error that occurred
Err { error: KvError },
/// Returns the ID of a newly created iterator.
///
/// # Fields
/// * `iterator_id` - The ID of the created iterator
IterStart { iterator_id: u64 },
/// Indicates whether the iterator has more items.
///
/// # Fields
/// * `done` - True if there are no more items to iterate over
IterNext { done: bool },
/// Confirms the closure of an iterator.
///
/// # Fields
/// * `iterator_id` - The ID of the closed iterator
IterClose { iterator_id: u64 },
}

/// Errors that can occur during key-value store operations.
/// These errors are returned as part of `KvResponse::Err` when an operation fails.
#[derive(Debug, Serialize, Deserialize, Error)]
pub enum KvError {
#[error("kv: DbDoesNotExist")]
/// The requested database does not exist.
#[error("Database does not exist")]
NoDb,
#[error("kv: KeyNotFound")]

/// The requested key was not found in the database.
#[error("Key not found in database")]
KeyNotFound,
#[error("kv: no Tx found")]

/// No active transaction found for the given transaction ID.
#[error("Transaction not found")]
NoTx,
#[error("kv: No capability: {error}")]

/// The specified iterator was not found.
#[error("Iterator not found")]
NoIterator,

/// The operation requires capabilities that the caller doesn't have.
///
/// # Fields
/// * `error` - Description of the missing capability or permission
#[error("Missing required capability: {error}")]
NoCap { error: String },
#[error("kv: rocksdb internal error: {error}")]

/// An internal RocksDB error occurred during the operation.
///
/// # Fields
/// * `action` - The operation that was being performed
/// * `error` - The specific error message from RocksDB
#[error("RocksDB error during {action}: {error}")]
RocksDBError { action: String, error: String },
#[error("kv: input bytes/json/key error: {error}")]

/// Error parsing or processing input data.
///
/// # Fields
/// * `error` - Description of what was invalid about the input
#[error("Invalid input: {error}")]
InputError { error: String },
#[error("kv: IO error: {error}")]

/// An I/O error occurred during the operation.
///
/// # Fields
/// * `error` - Description of the I/O error
#[error("I/O error: {error}")]
IOError { error: String },
}

Expand Down Expand Up @@ -280,6 +382,126 @@ where
}
}

/// Get all key-value pairs with an optional prefix
///
/// # Example
/// ```
/// let entries = kv.iter_all(Some(&"user_"), 100)?;
/// for (key, value) in entries {
/// println!("key: {}, value: {:?}", key, value);
/// }
/// ```
pub fn iter_all(&self, prefix: Option<&K>, batch_size: u64) -> anyhow::Result<Vec<(K, V)>> {
// Start the iterator
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::IterStart {
prefix: prefix.map(|p| serde_json::to_vec(p)).transpose()?,
},
})?)
.send_and_await_response(self.timeout)?;

let iterator_id = match res {
Ok(Message::Response { body, .. }) => {
match serde_json::from_slice::<KvResponse>(&body)? {
KvResponse::IterStart { iterator_id } => iterator_id,
KvResponse::Err { error } => return Err(error.into()),
_ => return Err(anyhow::anyhow!("kv: unexpected response")),
}
}
_ => return Err(anyhow::anyhow!("kv: unexpected message")),
};

let mut all_entries = Vec::new();

// Collect all entries
loop {
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::IterNext {
iterator_id,
count: batch_size,
},
})?)
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
match serde_json::from_slice::<KvResponse>(&body)? {
KvResponse::IterNext { done } => {
let blob = get_blob().ok_or_else(|| anyhow::anyhow!("No blob data"))?;
let entries: Vec<(Vec<u8>, Vec<u8>)> =
serde_json::from_slice(&blob.bytes)?;
for (key_bytes, value_bytes) in entries {
let key = serde_json::from_slice(&key_bytes)?;
let value = serde_json::from_slice(&value_bytes)?;
all_entries.push((key, value));
}
if done {
break;
}
}
KvResponse::Err { error } => return Err(error.into()),
_ => return Err(anyhow::anyhow!("kv: unexpected response")),
}
}
_ => return Err(anyhow::anyhow!("kv: unexpected message")),
}
}

// Clean up
let _ = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::IterClose { iterator_id },
})?)
.send_and_await_response(self.timeout)?;

Ok(all_entries)
}

/// Get all keys with an optional prefix
///
/// # Example
/// ```
/// let keys = kv.collect_keys(Some(&"user_"))?;
/// for key in keys {
/// println!("key: {}", key);
/// }
/// ```
pub fn collect_keys(&self, prefix: Option<&K>) -> anyhow::Result<Vec<K>> {
Ok(self
.iter_all(prefix, 100)?
.into_iter()
.map(|(k, _)| k)
.collect())
}

/// Get all values with an optional key prefix
///
/// # Example
/// ```
/// let values = kv.collect_values(Some(&"user_"))?;
/// for value in values {
/// println!("value: {:?}", value);
/// }
/// ```
pub fn collect_values(&self, prefix: Option<&K>) -> anyhow::Result<Vec<V>> {
Ok(self
.iter_all(prefix, 100)?
.into_iter()
.map(|(_, v)| v)
.collect())
}

/// Commit a transaction.
pub fn commit_tx(&self, tx_id: u64) -> anyhow::Result<()> {
let res = Request::new()
Expand All @@ -306,6 +528,105 @@ where
}
}

impl Kv<Vec<u8>, Vec<u8>> {
/// Get raw bytes directly
pub fn get_raw(&self, key: &[u8]) -> anyhow::Result<Vec<u8>> {
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Get { key: key.to_vec() },
})?)
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;

match response {
KvResponse::Get { .. } => {
let bytes = match get_blob() {
Some(bytes) => bytes.bytes,
None => return Err(anyhow::anyhow!("kv: no blob")),
};
Ok(bytes)
}
KvResponse::Err { error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}

/// Set raw bytes directly
pub fn set_raw(&self, key: &[u8], value: &[u8], tx_id: Option<u64>) -> anyhow::Result<()> {
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Set {
key: key.to_vec(),
tx_id,
},
})?)
.blob_bytes(value.to_vec())
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;

match response {
KvResponse::Ok => Ok(()),
KvResponse::Err { error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}

/// Delete raw bytes directly
pub fn delete_raw(&self, key: &[u8], tx_id: Option<u64>) -> anyhow::Result<()> {
let res = Request::new()
.target(("our", "kv", "distro", "sys"))
.body(serde_json::to_vec(&KvRequest {
package_id: self.package_id.clone(),
db: self.db.clone(),
action: KvAction::Delete {
key: key.to_vec(),
tx_id,
},
})?)
.send_and_await_response(self.timeout)?;

match res {
Ok(Message::Response { body, .. }) => {
let response = serde_json::from_slice::<KvResponse>(&body)?;

match response {
KvResponse::Ok => Ok(()),
KvResponse::Err { error } => Err(error.into()),
_ => Err(anyhow::anyhow!("kv: unexpected response {:?}", response)),
}
}
_ => Err(anyhow::anyhow!("kv: unexpected message: {:?}", res)),
}
}
}

/// Helper function to open a raw bytes key-value store
pub fn open_raw(
package_id: PackageId,
db: &str,
timeout: Option<u64>,
) -> anyhow::Result<Kv<Vec<u8>, Vec<u8>>> {
open(package_id, db, timeout)
}

/// Opens or creates a kv db.
pub fn open<K, V>(package_id: PackageId, db: &str, timeout: Option<u64>) -> anyhow::Result<Kv<K, V>>
where
Expand Down
Loading