Skip to content

Commit 6a610d5

Browse files
committed
Add audit_stream() for db audit provider.
1 parent 54edf5b commit 6a610d5

File tree

6 files changed

+47
-5
lines changed

6 files changed

+47
-5
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+4-1
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,13 @@ tracing = "0.1"
109109
tracing-subscriber = { version = "0.3.16", features = ["env-filter", "json"] }
110110
secrecy = { version = "0.10", features = ["serde"] }
111111
serde = { version = "1", features = ["derive"] }
112+
112113
tokio = { version = "1", features = ["rt", "macros", "time", "sync"]}
114+
tokio-util = { version = "0.7", default-features = false, features = ["io", "compat"] }
115+
tokio-stream = "0.1"
116+
113117
serde_json = "1"
114118
serde_with = { version = "3", features = ["base64"] }
115-
tokio-util = { version = "0.7", default-features = false, features = ["io", "compat"] }
116119
async-trait = "0.1"
117120
async-recursion = "1"
118121
typeshare = "1"

Makefile.toml

+3-1
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,9 @@ args = [
8585
"--workspace",
8686
"--all-features",
8787
"--open",
88-
"--no-deps"
88+
"--no-deps",
89+
"--exclude",
90+
"sos-vfs",
8991
]
9092
dependencies = ["clean-doc"]
9193

crates/database/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ sha2.workspace = true
4848
serde.workspace = true
4949
tokio.workspace = true
5050
url.workspace = true
51+
tokio-stream.workspace = true
5152

5253
# sqlite
5354
async-sqlite = { workspace = true, optional = true }

crates/database/src/audit_provider.rs

+36-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
//! Database audit log provider.
2-
use crate::{db::AuditEntity, Error};
2+
use crate::{
3+
db::{AuditEntity, AuditRecord, AuditRow},
4+
Error,
5+
};
36
use async_sqlite::Client;
47
use async_trait::async_trait;
58
use futures::stream::BoxStream;
69
use sos_audit::{AuditEvent, AuditStreamSink};
10+
use tokio_stream::wrappers::ReceiverStream;
711

812
/// Audit provider that appends to a database table.
913
pub struct AuditDatabaseProvider<E>
@@ -76,6 +80,36 @@ where
7680
BoxStream<'static, std::result::Result<AuditEvent, Self::Error>>,
7781
Self::Error,
7882
> {
79-
todo!();
83+
let (tx, rx) = tokio::sync::mpsc::channel::<
84+
std::result::Result<AuditEvent, Self::Error>,
85+
>(16);
86+
87+
self.client
88+
.conn_and_then(move |conn| {
89+
let mut stmt = if reverse {
90+
conn.prepare("SELECT * FROM audit_logs ORDER DESC")?
91+
} else {
92+
conn.prepare("SELECT * FROM audit_logs ORDER ASC")?
93+
};
94+
let mut rows = stmt.query([])?;
95+
96+
while let Some(row) = rows.next()? {
97+
let row: AuditRow = row.try_into()?;
98+
let record: AuditRecord = row.try_into()?;
99+
let inner_tx = tx.clone();
100+
futures::executor::block_on(async move {
101+
if let Err(e) = inner_tx.send(Ok(record.event)).await
102+
{
103+
tracing::error!(error = %e);
104+
}
105+
});
106+
}
107+
108+
Ok::<_, Error>(())
109+
})
110+
.await
111+
.map_err(Error::from)?;
112+
113+
Ok(Box::pin(ReceiverStream::new(rx)))
80114
}
81115
}

crates/database/src/db/mod.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,8 @@ mod server;
1313
mod system_message;
1414

1515
pub use account::{AccountEntity, AccountRecord};
16-
pub use audit::AuditEntity;
16+
pub(crate) use audit::AuditRow;
17+
pub use audit::{AuditEntity, AuditRecord};
1718
pub use event::{CommitRecord, EventEntity, EventTable};
1819
#[cfg(feature = "files")]
1920
pub use file::FileEntity;

0 commit comments

Comments
 (0)