Skip to content

Commit

Permalink
refactor(cached_object_store): continue seperate the files inside a m…
Browse files Browse the repository at this point in the history
…odule (#346)

this pr is a continue work on seperating the files in
cached_object_store after #295. all changes are code movements with no
logic modifications.

- moved LocalCacheStorage into `storage.rs`, while renamed
`fs_cache_storage.rs` to `storage_fs.rs`
- moved CachedObjectStore into `cached_object_store/object_store.rs`
- exposed the structs & traits in `mod.rs`
  • Loading branch information
flaneur2020 authored Nov 27, 2024
1 parent 0d9e437 commit b71af39
Show file tree
Hide file tree
Showing 6 changed files with 131 additions and 123 deletions.
8 changes: 8 additions & 0 deletions src/cached_object_store/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
mod object_store;
mod storage;
mod storage_fs;

pub(crate) use object_store::CachedObjectStore;
#[allow(unused_imports)]
pub use storage::{LocalCacheEntry, LocalCacheHead, LocalCacheStorage, PartID};
pub use storage_fs::FsCacheStorage;
124 changes: 7 additions & 117 deletions src/cached_object_store.rs → src/cached_object_store/object_store.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,14 @@
use std::{collections::HashMap, fmt::Display, ops::Range, sync::Arc};

use bytes::{Bytes, BytesMut};
use futures::{future::BoxFuture, stream, stream::BoxStream, StreamExt};
use object_store::{path::Path, GetOptions, GetResult, ObjectMeta, ObjectStore};
use object_store::{Attribute, Attributes, GetRange, GetResultPayload, PutResult};
use object_store::{Attributes, GetRange, GetResultPayload, PutResult};
use object_store::{ListResult, MultipartUpload, PutMultipartOpts, PutOptions, PutPayload};
use serde::{Deserialize, Serialize};
use std::{ops::Range, sync::Arc};

use crate::cached_object_store::storage::{LocalCacheStorage, PartID};
use crate::error::SlateDBError;
use crate::metrics::DbStats;

pub(crate) mod fs_cache_storage;

#[derive(Debug, Clone)]
pub(crate) struct CachedObjectStore {
object_store: Arc<dyn ObjectStore>,
Expand Down Expand Up @@ -423,73 +420,6 @@ impl ObjectStore for CachedObjectStore {
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
struct LocalCacheHead {
pub location: String,
pub last_modified: String,
pub size: usize,
pub e_tag: Option<String>,
pub version: Option<String>,
pub attributes: HashMap<String, String>,
}

impl LocalCacheHead {
pub fn meta(&self) -> ObjectMeta {
ObjectMeta {
location: self.location.clone().into(),
last_modified: self.last_modified.parse().unwrap_or_default(),
size: self.size,
e_tag: self.e_tag.clone(),
version: self.version.clone(),
}
}

pub fn attributes(&self) -> Attributes {
let mut attrs = Attributes::new();
for (key, value) in self.attributes.iter() {
let key = match key.as_str() {
"Cache-Control" => Attribute::CacheControl,
"Content-Disposition" => Attribute::ContentDisposition,
"Content-Encoding" => Attribute::ContentEncoding,
"Content-Language" => Attribute::ContentLanguage,
"Content-Type" => Attribute::ContentType,
_ => Attribute::Metadata(key.to_string().into()),
};
let value = value.to_string().into();
attrs.insert(key, value);
}
attrs
}
}

impl From<(&ObjectMeta, &Attributes)> for LocalCacheHead {
fn from((meta, attrs): (&ObjectMeta, &Attributes)) -> Self {
let mut attrs_map = HashMap::new();
for (key, value) in attrs.iter() {
let key = match key {
Attribute::CacheControl => "Cache-Control",
Attribute::ContentDisposition => "Content-Disposition",
Attribute::ContentEncoding => "Content-Encoding",
Attribute::ContentLanguage => "Content-Language",
Attribute::ContentType => "Content-Type",
Attribute::Metadata(key) => key,
_ => continue,
};
attrs_map.insert(key.to_string(), value.to_string());
}
LocalCacheHead {
location: meta.location.to_string(),
last_modified: meta.last_modified.to_rfc3339(),
size: meta.size,
e_tag: meta.e_tag.clone(),
version: meta.version.clone(),
attributes: attrs_map,
}
}
}

// it seems that object_store did not expose this error type, duplicate it here.
// TODO: raise a pr to expose this error type in object_store.
#[derive(Debug, thiserror::Error)]
pub(crate) enum InvalidGetRange {
#[error("Range start too large, requested: {requested}, length: {length}")]
Expand All @@ -499,58 +429,18 @@ pub(crate) enum InvalidGetRange {
Inconsistent { start: usize, end: usize },
}

#[async_trait::async_trait]
pub trait LocalCacheStorage: Send + Sync + std::fmt::Debug + Display + 'static {
fn entry(
&self,
location: &object_store::path::Path,
part_size: usize,
) -> Box<dyn LocalCacheEntry>;

async fn start_evictor(&self);
}

#[async_trait::async_trait]
pub trait LocalCacheEntry: Send + Sync + std::fmt::Debug + 'static {
async fn save_part(&self, part_number: PartID, buf: Bytes) -> object_store::Result<()>;

async fn read_part(
&self,
part_number: PartID,
range_in_part: Range<usize>,
) -> object_store::Result<Option<Bytes>>;

/// might be useful on rewriting GET request on the prefetch phase. the cached files are
/// expected to be in the same folder, so it'd be expected to be fast without expensive
/// globbing.
#[cfg(test)]
async fn cached_parts(&self) -> object_store::Result<Vec<PartID>>;

async fn save_head(&self, meta: (&ObjectMeta, &Attributes)) -> object_store::Result<()>;

async fn read_head(&self) -> object_store::Result<Option<(ObjectMeta, Attributes)>>;
}

pub(crate) type PartID = usize;

#[cfg(test)]
mod tests {
use std::sync::Arc;

use bytes::Bytes;
use object_store::{path::Path, GetOptions, GetRange, ObjectStore, PutPayload};
use rand::{thread_rng, Rng};
use rand::Rng;

use super::CachedObjectStore;
use crate::cached_object_store::fs_cache_storage::FsCacheStorage;
use crate::cached_object_store::{fs_cache_storage::FsCacheEntry, PartID};
use crate::cached_object_store::storage_fs::FsCacheStorage;
use crate::cached_object_store::{storage::PartID, storage_fs::FsCacheEntry};
use crate::metrics::DbStats;

pub(crate) fn gen_rand_bytes(n: usize) -> Bytes {
let mut rng = thread_rng();
let random_bytes: Vec<u8> = (0..n).map(|_| rng.gen()).collect();
Bytes::from(random_bytes)
}
use crate::test_utils::gen_rand_bytes;

fn new_test_cache_folder() -> std::path::PathBuf {
let mut rng = rand::thread_rng();
Expand Down
100 changes: 100 additions & 0 deletions src/cached_object_store/storage.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use async_trait::async_trait;
use bytes::Bytes;
use object_store::{path::Path, Attribute, Attributes, ObjectMeta};
use serde::{Deserialize, Serialize};
use std::{collections::HashMap, fmt::Display, ops::Range};

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalCacheHead {
pub location: String,
pub last_modified: String,
pub size: usize,
pub e_tag: Option<String>,
pub version: Option<String>,
pub attributes: HashMap<String, String>,
}

impl LocalCacheHead {
pub fn meta(&self) -> ObjectMeta {
ObjectMeta {
location: self.location.clone().into(),
last_modified: self.last_modified.parse().unwrap_or_default(),
size: self.size,
e_tag: self.e_tag.clone(),
version: self.version.clone(),
}
}

pub fn attributes(&self) -> Attributes {
let mut attrs = Attributes::new();
for (key, value) in self.attributes.iter() {
let key = match key.as_str() {
"Cache-Control" => Attribute::CacheControl,
"Content-Disposition" => Attribute::ContentDisposition,
"Content-Encoding" => Attribute::ContentEncoding,
"Content-Language" => Attribute::ContentLanguage,
"Content-Type" => Attribute::ContentType,
_ => Attribute::Metadata(key.to_string().into()),
};
let value = value.to_string().into();
attrs.insert(key, value);
}
attrs
}
}

impl From<(&ObjectMeta, &Attributes)> for LocalCacheHead {
fn from((meta, attrs): (&ObjectMeta, &Attributes)) -> Self {
let mut attrs_map = HashMap::new();
for (key, value) in attrs.iter() {
let key = match key {
Attribute::CacheControl => "Cache-Control",
Attribute::ContentDisposition => "Content-Disposition",
Attribute::ContentEncoding => "Content-Encoding",
Attribute::ContentLanguage => "Content-Language",
Attribute::ContentType => "Content-Type",
Attribute::Metadata(key) => key,
_ => continue,
};
attrs_map.insert(key.to_string(), value.to_string());
}
LocalCacheHead {
location: meta.location.to_string(),
last_modified: meta.last_modified.to_rfc3339(),
size: meta.size,
e_tag: meta.e_tag.clone(),
version: meta.version.clone(),
attributes: attrs_map,
}
}
}

#[async_trait]
pub trait LocalCacheStorage: Send + Sync + std::fmt::Debug + Display + 'static {
fn entry(&self, location: &Path, part_size: usize) -> Box<dyn LocalCacheEntry>;

async fn start_evictor(&self);
}

#[async_trait]
pub trait LocalCacheEntry: Send + Sync + std::fmt::Debug + 'static {
async fn save_part(&self, part_number: PartID, buf: Bytes) -> object_store::Result<()>;

async fn read_part(
&self,
part_number: PartID,
range_in_part: Range<usize>,
) -> object_store::Result<Option<Bytes>>;

/// might be useful on rewriting GET request on the prefetch phase. the cached files are
/// expected to be in the same folder, so it'd be expected to be fast without expensive
/// globbing.
#[cfg(test)]
async fn cached_parts(&self) -> object_store::Result<Vec<PartID>>;

async fn save_head(&self, meta: (&ObjectMeta, &Attributes)) -> object_store::Result<()>;

async fn read_head(&self) -> object_store::Result<Option<(ObjectMeta, Attributes)>>;
}

pub type PartID = usize;
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ use tokio::{
use tracing::{debug, warn};
use walkdir::WalkDir;

use crate::cached_object_store::{LocalCacheEntry, LocalCacheHead, LocalCacheStorage};
use crate::cached_object_store::storage::{LocalCacheEntry, LocalCacheHead, LocalCacheStorage};
use crate::metrics::DbStats;

#[derive(Debug)]
pub(crate) struct FsCacheStorage {
pub struct FsCacheStorage {
root_folder: std::path::PathBuf,
evictor: Option<Arc<FsCacheEvictor>>,
}
Expand Down Expand Up @@ -207,7 +207,9 @@ impl LocalCacheEntry for FsCacheEntry {
}

#[cfg(test)]
async fn cached_parts(&self) -> object_store::Result<Vec<crate::cached_object_store::PartID>> {
async fn cached_parts(
&self,
) -> object_store::Result<Vec<crate::cached_object_store::storage::PartID>> {
let head_path = Self::make_head_path(self.root_folder.clone(), &self.location);
let directory_path = match head_path.parent() {
Some(directory_path) => directory_path,
Expand Down Expand Up @@ -644,7 +646,7 @@ fn wrap_io_err(err: impl std::error::Error + Send + Sync + 'static) -> object_st
#[cfg(test)]
mod tests {
use super::*;
use crate::cached_object_store::tests::gen_rand_bytes;
use crate::test_utils::gen_rand_bytes;
use filetime::FileTime;
use std::{io::Write, sync::atomic::Ordering, time::SystemTime};

Expand Down
4 changes: 2 additions & 2 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use tokio::sync::mpsc::UnboundedSender;

use crate::batch::WriteBatch;
use crate::batch_write::{WriteBatchMsg, WriteBatchRequest};
use crate::cached_object_store::fs_cache_storage::FsCacheStorage;
use crate::cached_object_store::CachedObjectStore;
use crate::cached_object_store::FsCacheStorage;
use crate::compactor::Compactor;
use crate::config::ReadLevel::Uncommitted;
use crate::config::{
Expand Down Expand Up @@ -1128,7 +1128,7 @@ mod tests {
use tracing::info;

use super::*;
use crate::cached_object_store::fs_cache_storage::FsCacheStorage;
use crate::cached_object_store::FsCacheStorage;
use crate::config::{
CompactorOptions, ObjectStoreCacheOptions, SizeTieredCompactionSchedulerOptions,
DEFAULT_PUT_OPTIONS,
Expand Down
8 changes: 8 additions & 0 deletions src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use crate::config::Clock;
use crate::iter::KeyValueIterator;
use crate::types::{KeyValue, RowAttributes, ValueDeletable};
use bytes::{BufMut, Bytes, BytesMut};
use rand::Rng;
use std::sync::atomic::{AtomicI64, Ordering};

// this complains because we include these in the bencher feature but they are only
Expand Down Expand Up @@ -140,6 +141,13 @@ impl OrderedBytesGenerator {
}
}

#[allow(dead_code)]
pub(crate) fn gen_rand_bytes(n: usize) -> Bytes {
let mut rng = rand::thread_rng();
let random_bytes: Vec<u8> = (0..n).map(|_| rng.gen()).collect();
Bytes::from(random_bytes)
}

// it seems that insta still does not allow to customize the snapshot path in insta.yaml,
// we can remove this macro once insta supports it.
#[cfg(test)]
Expand Down

0 comments on commit b71af39

Please sign in to comment.