Skip to content

Commit 4c5754b

Browse files
committed
Update advisory locks in fs event log.
1 parent 6a610d5 commit 4c5754b

File tree

1 file changed

+31
-62
lines changed

1 file changed

+31
-62
lines changed

crates/filesystem/src/event_log.rs

+31-62
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ use crate::{
2020
},
2121
Error, Result,
2222
};
23-
use async_fd_lock::LockWrite;
23+
use async_fd_lock::{LockRead, LockWrite};
2424
use async_stream::try_stream;
2525
use async_trait::async_trait;
2626
use binary_stream::futures::{BinaryReader, Decodable, Encodable};
@@ -40,10 +40,8 @@ use std::result::Result as StdResult;
4040
use std::{
4141
io::SeekFrom,
4242
path::{Path, PathBuf},
43-
sync::Arc,
4443
};
4544
use tokio::io::{AsyncReadExt, AsyncSeekExt, AsyncWriteExt, BufReader};
46-
use tokio::sync::Mutex;
4745

4846
#[cfg(feature = "files")]
4947
use sos_core::events::FileEvent;
@@ -69,20 +67,19 @@ type Iter = Box<dyn FormatStreamIterator<EventLogRecord> + Send + Sync>;
6967
/// Read the bytes for the encoded event
7068
/// inside the log record.
7169
async fn read_event_buffer(
72-
handle: Arc<Mutex<File>>,
70+
file_path: impl AsRef<Path>,
7371
record: &EventLogRecord,
7472
) -> Result<Vec<u8>> {
75-
let mut file = handle.lock().await;
76-
77-
// let _guard = file.lock().await;
73+
let file = File::open(file_path.as_ref()).await?;
74+
let mut guard = file.lock_read().await.map_err(|e| e.error)?;
7875

7976
let offset = record.value();
8077
let row_len = offset.end - offset.start;
8178

82-
file.seek(SeekFrom::Start(offset.start)).await?;
79+
guard.seek(SeekFrom::Start(offset.start)).await?;
8380

8481
let mut buf = vec![0u8; row_len as usize];
85-
file.read_exact(&mut buf).await?;
82+
guard.read_exact(&mut buf).await?;
8683

8784
Ok(buf)
8885
}
@@ -104,7 +101,6 @@ where
104101
+ Sync
105102
+ 'static,
106103
{
107-
file: Arc<Mutex<File>>,
108104
tree: CommitTree,
109105
data: PathBuf,
110106
identity: &'static [u8],
@@ -133,11 +129,11 @@ where
133129
) -> BoxStream<'async_trait, StdResult<EventRecord, Self::Error>> {
134130
let mut it =
135131
self.iter(reverse).await.expect("to initialize iterator");
136-
let handle = self.file();
132+
let file_path = self.data.clone();
137133
Box::pin(try_stream! {
138134
while let Some(record) = it.next().await? {
139135
let event_buffer = read_event_buffer(
140-
handle.clone(), &record).await?;
136+
file_path.clone(), &record).await?;
141137
let event_record = record.into_event_record(event_buffer);
142138
yield event_record;
143139
}
@@ -152,11 +148,11 @@ where
152148
let mut it =
153149
self.iter(reverse).await.expect("to initialize iterator");
154150

155-
let handle = self.file();
151+
let file_path = self.data.clone();
156152
Box::pin(try_stream! {
157153
while let Some(record) = it.next().await? {
158154
let event_buffer = read_event_buffer(
159-
handle.clone(), &record).await?;
155+
file_path.clone(), &record).await?;
160156
let event_record = record.into_event_record(event_buffer);
161157
let event = event_record.decode_event::<T>().await?;
162158
yield (event_record, event);
@@ -208,16 +204,11 @@ where
208204

209205
tracing::trace!(length = %length, "event_log::rewind");
210206

211-
let handle = self.file();
212207
let mut records = Vec::new();
213208

214209
while let Some(record) = it.next().await? {
215210
// Found the target commit
216211
if &record.commit() == commit.as_ref() {
217-
// Acquire file lock as we will truncate
218-
let file = self.file();
219-
let _guard = file.lock().await;
220-
221212
// Rewrite the in-memory tree
222213
let mut leaves = self.tree().leaves().unwrap_or_default();
223214
if leaves.len() > records.len() {
@@ -237,7 +228,6 @@ where
237228
let mut guard =
238229
file.lock_write().await.map_err(|e| e.error)?;
239230
guard.inner_mut().set_len(length).await?;
240-
// file.set_len(length).await?;
241231

242232
return Ok(records);
243233
}
@@ -249,8 +239,7 @@ where
249239
length -= byte_length;
250240
}
251241

252-
let event_buffer =
253-
read_event_buffer(handle.clone(), &record).await?;
242+
let event_buffer = read_event_buffer(&self.data, &record).await?;
254243

255244
let event_record = record.into_event_record(event_buffer);
256245
records.push(event_record);
@@ -310,9 +299,6 @@ where
310299
commits.push(*record.commit());
311300
}
312301

313-
let rw = self.file();
314-
let _lock = rw.lock().await;
315-
316302
#[allow(unused_mut)]
317303
let mut file = OpenOptions::new()
318304
.write(true)
@@ -433,16 +419,15 @@ where
433419
commit: Option<&CommitHash>,
434420
) -> StdResult<Vec<EventRecord>, Self::Error> {
435421
let mut events = Vec::new();
436-
let file = self.file();
422+
// let file = self.file();
437423
let mut it = self.iter(true).await?;
438424
while let Some(record) = it.next().await? {
439425
if let Some(commit) = commit {
440426
if &record.commit() == commit.as_ref() {
441427
return Ok(events);
442428
}
443429
}
444-
let buffer =
445-
read_event_buffer(Arc::clone(&file), &record).await?;
430+
let buffer = read_event_buffer(&self.data, &record).await?;
446431
// Iterating in reverse order as we would typically
447432
// be looking for commits near the end of the event log
448433
// but we want the patch events in the order they were
@@ -488,7 +473,6 @@ where
488473
AsyncSeekExt as TokioAsyncSeekExt,
489474
AsyncWriteExt as TokioAsyncWriteExt,
490475
};
491-
let _ = self.file.lock().await;
492476

493477
// Workaround for set_len(0) failing with "Access Denied" on Windows
494478
// SEE: https://github.com/rust-lang/rust/issues/105437
@@ -518,12 +502,12 @@ where
518502
) -> StdResult<T, E> {
519503
let value = item.value();
520504

521-
let rw = self.file();
522-
let mut file = rw.lock().await;
505+
let file = File::open(&self.data).await?;
506+
let mut guard = file.lock_read().await.map_err(|e| e.error)?;
523507

524-
file.seek(SeekFrom::Start(value.start)).await?;
508+
guard.seek(SeekFrom::Start(value.start)).await?;
525509
let mut buffer = vec![0; (value.end - value.start) as usize];
526-
file.read_exact(buffer.as_mut_slice()).await?;
510+
guard.read_exact(buffer.as_mut_slice()).await?;
527511

528512
let mut stream = BufReader::new(Cursor::new(&mut buffer));
529513
let mut reader = BinaryReader::new(&mut stream, encoding_options());
@@ -549,10 +533,6 @@ where
549533
Ok(it)
550534
}
551535

552-
fn file(&self) -> Arc<Mutex<File>> {
553-
Arc::clone(&self.file)
554-
}
555-
556536
/// Length of the file magic bytes and optional
557537
/// encoding version.
558538
#[doc(hidden)]
@@ -589,26 +569,23 @@ where
589569
}
590570
*/
591571

592-
/// Create the writer for an event log file.
593-
async fn create_writer<P: AsRef<Path>>(
572+
/// Create an event log file if it does not exist.
573+
///
574+
/// Ensure the identity bytes are written when the file
575+
/// length is zero.
576+
async fn initialize_event_log<P: AsRef<Path>>(
594577
path: P,
595578
identity: &'static [u8],
596579
encoding_version: Option<u16>,
597-
) -> StdResult<File, E> {
580+
) -> StdResult<(), E> {
598581
let file = OpenOptions::new()
599582
.create(true)
600-
.read(true)
601-
.append(true)
583+
.write(true)
602584
.open(path.as_ref())
603585
.await?;
604586

605587
let size = vfs::metadata(path.as_ref()).await?.len();
606588
if size == 0 {
607-
let file = OpenOptions::new()
608-
.create(true)
609-
.append(true)
610-
.open(path.as_ref())
611-
.await?;
612589
let mut guard = file.lock_write().await.map_err(|e| e.error)?;
613590
let mut header = identity.to_vec();
614591
if let Some(version) = encoding_version {
@@ -618,14 +595,11 @@ where
618595
guard.flush().await?;
619596
}
620597

621-
Ok(file)
598+
Ok(())
622599
}
623600

624601
#[doc(hidden)]
625602
async fn try_create_snapshot(&self) -> StdResult<Option<PathBuf>, E> {
626-
let file = self.file();
627-
let _guard = file.lock().await;
628-
629603
if let Some(root) = self.tree().root() {
630604
let mut snapshot_path = self.data.clone();
631605
snapshot_path.set_extension(&format!("snapshot-{}", root));
@@ -653,9 +627,6 @@ where
653627
) -> StdResult<(), E> {
654628
let source_path = self.data.clone();
655629

656-
let file = self.file();
657-
let _guard = file.lock().await;
658-
659630
let metadata = vfs::metadata(snapshot_path).await?;
660631
tracing::debug!(
661632
file_size = %metadata.len(),
@@ -689,7 +660,7 @@ where
689660
// Note that for backwards compatibility we don't
690661
// encode a version, later we will need to upgrade
691662
// the encoding to include a version
692-
let writer = Self::create_writer(
663+
Self::initialize_event_log(
693664
path.as_ref(),
694665
&FOLDER_EVENT_LOG_IDENTITY,
695666
None,
@@ -700,7 +671,6 @@ where
700671
.await?;
701672

702673
Ok(Self {
703-
file: Arc::new(Mutex::new(writer)),
704674
data: path.as_ref().to_path_buf(),
705675
tree: Default::default(),
706676
identity: &FOLDER_EVENT_LOG_IDENTITY,
@@ -726,7 +696,7 @@ where
726696
use sos_core::{
727697
constants::ACCOUNT_EVENT_LOG_IDENTITY, encoding::VERSION,
728698
};
729-
let writer = Self::create_writer(
699+
Self::initialize_event_log(
730700
path.as_ref(),
731701
&ACCOUNT_EVENT_LOG_IDENTITY,
732702
Some(VERSION),
@@ -737,7 +707,6 @@ where
737707
.await?;
738708

739709
Ok(Self {
740-
file: Arc::new(Mutex::new(writer)),
741710
data: path.as_ref().to_path_buf(),
742711
tree: Default::default(),
743712
identity: &ACCOUNT_EVENT_LOG_IDENTITY,
@@ -763,7 +732,8 @@ where
763732
use sos_core::{
764733
constants::DEVICE_EVENT_LOG_IDENTITY, encoding::VERSION,
765734
};
766-
let writer = Self::create_writer(
735+
736+
Self::initialize_event_log(
767737
path.as_ref(),
768738
&DEVICE_EVENT_LOG_IDENTITY,
769739
Some(VERSION),
@@ -774,7 +744,6 @@ where
774744
.await?;
775745

776746
Ok(Self {
777-
file: Arc::new(Mutex::new(writer)),
778747
data: path.as_ref().to_path_buf(),
779748
tree: Default::default(),
780749
identity: &DEVICE_EVENT_LOG_IDENTITY,
@@ -801,7 +770,8 @@ where
801770
use sos_core::{
802771
constants::FILE_EVENT_LOG_IDENTITY, encoding::VERSION,
803772
};
804-
let writer = Self::create_writer(
773+
774+
Self::initialize_event_log(
805775
path.as_ref(),
806776
&FILE_EVENT_LOG_IDENTITY,
807777
Some(VERSION),
@@ -812,7 +782,6 @@ where
812782
.await?;
813783

814784
Ok(Self {
815-
file: Arc::new(Mutex::new(writer)),
816785
data: path.as_ref().to_path_buf(),
817786
tree: Default::default(),
818787
identity: &FILE_EVENT_LOG_IDENTITY,

0 commit comments

Comments
 (0)