Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: partial enable s3 and tmp file system #429

Merged
merged 26 commits into from
Nov 5, 2024
Merged
Changes from 1 commit
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
cfff2fd
chore: add dependencies
nyannyacha Jun 25, 2024
7f283db
chore: add `config.toml`
nyannyacha Oct 28, 2024
10bd89c
fix: align default resource limits as docs described
nyannyacha Oct 28, 2024
6f9425a
feat: support s3 file system
nyannyacha Oct 28, 2024
2bdb8f4
fix(sb_core): modify file api blocklist
nyannyacha Oct 28, 2024
fa323df
stamp: enable sync file api on testing
nyannyacha Oct 28, 2024
be0adcf
chore: update `.blocklisted`
nyannyacha Oct 28, 2024
7c7f421
stamp: polishing
nyannyacha Oct 29, 2024
662b02d
feat: support tmp file system
nyannyacha Oct 29, 2024
24af055
chore: add an integration test for tmp file system
nyannyacha Oct 29, 2024
0a92589
chore: update `Cargo.lock`
nyannyacha Oct 17, 2024
32ed0a1
stamp: make clippy happy
nyannyacha Oct 29, 2024
8d07aac
stamp: apply `cargo fmt`
nyannyacha Oct 29, 2024
a2d51be
stamp: don't use default member implementation
nyannyacha Oct 29, 2024
dcf13f2
stamp: adjust root path of module loader
nyannyacha Oct 29, 2024
28de6a0
stamp: polishing
nyannyacha Oct 29, 2024
c346e2c
chore: add another integration test for tmp file system
nyannyacha Oct 29, 2024
ca63eeb
chore: update `.blocklisted`
nyannyacha Oct 30, 2024
08d1019
chore: update integration tests for tmp file system
nyannyacha Oct 30, 2024
f83ef4f
chore(sb_core): update `bootstrap.js`
nyannyacha Oct 30, 2024
39ffd59
feat(sb_fs): add disk usage limit to tmp file system
nyannyacha Oct 30, 2024
87f0538
chore(sb_fs): add a dev dependency
nyannyacha Oct 31, 2024
7f08cdb
chore: update `Cargo.lock`
nyannyacha Oct 31, 2024
17f8c81
stamp: polishing tmp fs and adding more unit tests
nyannyacha Oct 31, 2024
5f85f9b
stamp: make clippy happy
nyannyacha Oct 31, 2024
52af82d
stamp: oops
nyannyacha Oct 31, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
stamp: polishing
  • Loading branch information
nyannyacha committed Nov 1, 2024
commit 28de6a0531ecb7cc955959b17b87ad49843b1857
179 changes: 107 additions & 72 deletions crates/sb_fs/fs/s3_fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -346,17 +346,7 @@ impl deno_fs::FileSystem for S3Fs {
}
}

if !errors.is_empty() {
let combined_message = errors
.into_iter()
.map(|it| it.to_string())
.collect::<Vec<_>>()
.join("\n");

return Err(io::Error::other(combined_message).into());
}

Ok(())
to_combined_message(errors)
}

fn chmod_sync(&self, _path: &Path, _mode: u32) -> FsResult<()> {
Expand Down Expand Up @@ -455,24 +445,14 @@ impl deno_fs::FileSystem for S3Fs {
}
}

if !errors.is_empty() {
let combined_message = errors
.into_iter()
.map(|it| {
format!(
"{}({}): {}",
it.key().unwrap_or("null"),
it.code().unwrap_or("unknown"),
it.message().unwrap_or("unknown error message")
)
})
.collect::<Vec<_>>()
.join("\n");

return Err(io::Error::other(combined_message).into());
}

return Ok(());
return to_combined_message(errors.into_iter().map(|it| {
format!(
"{}({}): {}",
it.key().unwrap_or("null"),
it.code().unwrap_or("unknown"),
it.message().unwrap_or("unknown error message")
)
}));
}

let _resp = self
Expand Down Expand Up @@ -807,58 +787,56 @@ struct S3MultiPartUploadMethod {
tasks: FuturesUnordered<BoxedUploadPartTask>,
}

#[derive(Debug, EnumAsInner)]
enum S3WriteUploadMethod {
PutObject,
MultiPartUpload(S3MultiPartUploadMethod),
}

impl S3WriteUploadMethod {
impl S3MultiPartUploadMethod {
async fn sync(&mut self) -> FsResult<()> {
let mut errors = vec![];

if let Self::MultiPartUpload(multi_part) = self {
while let Some(result) = multi_part.tasks.next().await {
match result {
Err(err) => errors.push(S3WriteErrorSubject::Join(err)),
Ok((part_idx, resp)) => match resp {
Ok(output) => {
let Some(e_tag) = output.e_tag else {
errors.push(S3WriteErrorSubject::MultiPartUploadTask((
part_idx,
io::Error::other(
"no e-tag field was found in upload part response",
),
)));

continue;
};
while let Some(result) = self.tasks.next().await {
match result {
Err(err) => errors.push(S3WriteErrorSubject::Join(err)),
Ok((part_idx, resp)) => match resp {
Ok(output) => {
let Some(e_tag) = output.e_tag else {
errors.push(S3WriteErrorSubject::MultiPartUploadTask((
part_idx,
io::Error::other(
"no e-tag field was found in upload part response",
),
)));

multi_part.parts.push(
CompletedPart::builder()
.e_tag(e_tag)
.part_number(part_idx)
.build(),
);
}
continue;
};

Err(err) => errors.push(S3WriteErrorSubject::MultiPartUploadTask((
part_idx,
io::Error::other(err),
))),
},
}
self.parts.push(
CompletedPart::builder()
.e_tag(e_tag)
.part_number(part_idx)
.build(),
);
}

Err(err) => errors.push(S3WriteErrorSubject::MultiPartUploadTask((
part_idx,
io::Error::other(err),
))),
},
}
}

if !errors.is_empty() {
let combined_message = errors
.into_iter()
.map(|it| it.to_string())
.collect::<Vec<_>>()
.join("\n");
to_combined_message(errors)
}
}

#[derive(Debug, EnumAsInner)]
enum S3WriteUploadMethod {
PutObject,
MultiPartUpload(S3MultiPartUploadMethod),
}

return Err(io::Error::other(combined_message).into());
impl S3WriteUploadMethod {
async fn sync(&mut self) -> FsResult<()> {
if let Self::MultiPartUpload(multi_part) = self {
multi_part.sync().await?
}

Ok(())
Expand All @@ -873,6 +851,48 @@ impl S3WriteUploadMethod {
) -> FsResult<()> {
match self {
Self::MultiPartUpload(multi_part) => {
if state.buf.cursor.get_ref().position() > 0 {
state.buf.raw.flush_async()?;
multi_part.tasks.push(
tokio::task::spawn({
let upload_id = multi_part.upload_id.clone();
let client = fs.client.clone();
let bucket_name = bucket_name.clone();
let key = key.clone();
let part_idx = multi_part.recent_part_idx;
let data = unsafe {
slice::from_raw_parts(
state.buf.raw.as_ptr(),
state.buf.cursor.get_ref().position() as usize,
)
};

multi_part.recent_part_idx += 1;

async move {
client
.upload_part()
.bucket(bucket_name)
.key(key)
.upload_id(upload_id)
.part_number(part_idx)
.body(ByteStream::new(data.into()))
.send()
.map(|it| (part_idx, it))
.await
}
.instrument(debug_span!(
"upload part",
last = true,
part = part_idx
))
})
.boxed(),
);

multi_part.sync().await?;
}

if multi_part.parts.is_empty() {
return Ok(());
}
Expand Down Expand Up @@ -1399,3 +1419,18 @@ fn to_msec(maybe_time: DateTime) -> Option<u64> {
Err(_) => None,
}
}

fn to_combined_message<I, E>(errors: I) -> FsResult<()>
where
I: IntoIterator<Item = E>,
E: ToString,
{
let iter = errors.into_iter();
let messages = iter.map(|err| err.to_string()).collect::<Vec<_>>();

if messages.is_empty() {
return Ok(());
}

Err(io::Error::other(messages.join("\n")).into())
}