Skip to content

Commit 9a9e2c0

Browse files
authored
Merge pull request zhaofengli#98 from Mic92/sharding
implement sharding
2 parents bdafd64 + 484e380 commit 9a9e2c0

File tree

2 files changed

+103
-8
lines changed

2 files changed

+103
-8
lines changed

integration-tests/basic/default.nix

+2-2
Original file line numberDiff line numberDiff line change
@@ -234,9 +234,9 @@ in {
234234
235235
${lib.optionalString (config.storage == "local") ''
236236
with subtest("Check that all chunks are actually deleted after GC"):
237-
files = server.succeed("find /var/lib/atticd/storage -type f")
237+
files = server.succeed("find /var/lib/atticd/storage -type f ! -name 'VERSION'")
238238
print(f"Remaining files: {files}")
239-
assert files.strip() == ""
239+
assert files.strip() == "", "Some files remain after GC: " + files
240240
''}
241241
242242
with subtest("Check that we can include the upload info in the payload"):

server/src/storage/local.rs

+101-6
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
//! Local file storage.
22
3+
use std::ffi::OsStr;
4+
use std::os::unix::ffi::OsStrExt;
5+
use std::path::Path;
36
use std::path::PathBuf;
47

58
use async_trait::async_trait;
@@ -30,17 +33,95 @@ pub struct LocalRemoteFile {
3033
pub name: String,
3134
}
3235

36+
async fn read_version(storage_path: &Path) -> ServerResult<u32> {
37+
let version_path = storage_path.join("VERSION");
38+
let v = match fs::read_to_string(&version_path).await {
39+
Ok(version) => version
40+
.trim()
41+
.parse()
42+
.map_err(|_| ErrorKind::StorageError(anyhow::anyhow!("Invalid version file")))?,
43+
Err(e) if e.kind() == io::ErrorKind::NotFound => 0,
44+
Err(e) => {
45+
return Err(ErrorKind::StorageError(anyhow::anyhow!(
46+
"Failed to read version file: {}",
47+
e
48+
))
49+
.into());
50+
}
51+
};
52+
Ok(v)
53+
}
54+
55+
async fn write_version(storage_path: &Path, version: u32) -> ServerResult<()> {
56+
let version_path = storage_path.join("VERSION");
57+
fs::write(&version_path, format!("{}", version))
58+
.await
59+
.map_err(ServerError::storage_error)?;
60+
Ok(())
61+
}
62+
63+
async fn upgrade_0_to_1(storage_path: &Path) -> ServerResult<()> {
64+
let mut files = fs::read_dir(storage_path)
65+
.await
66+
.map_err(ServerError::storage_error)?;
67+
// move all files to subdirectory using the first two characters of the filename
68+
while let Some(file) = files
69+
.next_entry()
70+
.await
71+
.map_err(ServerError::storage_error)?
72+
{
73+
if file
74+
.file_type()
75+
.await
76+
.map_err(ServerError::storage_error)?
77+
.is_file()
78+
{
79+
let name = file.file_name();
80+
let name_bytes = name.as_os_str().as_bytes();
81+
let parents = storage_path
82+
.join(OsStr::from_bytes(&name_bytes[0..1]))
83+
.join(OsStr::from_bytes(&name_bytes[0..2]));
84+
let new_path = parents.join(name);
85+
fs::create_dir_all(&parents).await.map_err(|e| {
86+
ErrorKind::StorageError(anyhow::anyhow!("Failed to create directory {}", e))
87+
})?;
88+
fs::rename(&file.path(), &new_path).await.map_err(|e| {
89+
ErrorKind::StorageError(anyhow::anyhow!(
90+
"Failed to move file {} to {}: {}",
91+
file.path().display(),
92+
new_path.display(),
93+
e
94+
))
95+
})?;
96+
}
97+
}
98+
99+
Ok(())
100+
}
101+
33102
impl LocalBackend {
34103
pub async fn new(config: LocalStorageConfig) -> ServerResult<Self> {
35-
fs::create_dir_all(&config.path)
36-
.await
37-
.map_err(ServerError::storage_error)?;
104+
fs::create_dir_all(&config.path).await.map_err(|e| {
105+
ErrorKind::StorageError(anyhow::anyhow!(
106+
"Failed to create storage directory {}: {}",
107+
config.path.display(),
108+
e
109+
))
110+
})?;
111+
112+
let version = read_version(&config.path).await?;
113+
if version == 0 {
114+
upgrade_0_to_1(&config.path).await?;
115+
}
116+
write_version(&config.path, 1).await?;
38117

39118
Ok(Self { config })
40119
}
41120

42121
fn get_path(&self, p: &str) -> PathBuf {
43-
self.config.path.join(p)
122+
let level1 = &p[0..1];
123+
let level2 = &p[0..2];
124+
self.config.path.join(level1).join(level2).join(p)
44125
}
45126
}
46127

@@ -51,9 +132,23 @@ impl StorageBackend for LocalBackend {
51132
name: String,
52133
mut stream: &mut (dyn AsyncRead + Unpin + Send),
53134
) -> ServerResult<RemoteFile> {
54-
let mut file = File::create(self.get_path(&name))
135+
let path = self.get_path(&name);
136+
fs::create_dir_all(path.parent().unwrap())
55137
.await
56-
.map_err(ServerError::storage_error)?;
138+
.map_err(|e| {
139+
ErrorKind::StorageError(anyhow::anyhow!(
140+
"Failed to create directory {}: {}",
141+
path.parent().unwrap().display(),
142+
e
143+
))
144+
})?;
145+
let mut file = File::create(self.get_path(&name)).await.map_err(|e| {
146+
ErrorKind::StorageError(anyhow::anyhow!(
147+
"Failed to create file {}: {}",
148+
self.get_path(&name).display(),
149+
e
150+
))
151+
})?;
57152

58153
io::copy(&mut stream, &mut file)
59154
.await

0 commit comments

Comments
 (0)