Skip to content

Commit

Permalink
Merge pull request #23 from arindas/style/improve-readability
Browse files Browse the repository at this point in the history
style: improve readability
  • Loading branch information
arindas authored Nov 21, 2023
2 parents cdfbb2a + 4e9190f commit 0ea2395
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 56 deletions.
2 changes: 0 additions & 2 deletions src/bin/laminarmq_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ const THREAD_NAME: &str = "laminarmq_server_thread_0";

struct State;

#[cfg(not(tarpaulin_include))]
#[instrument(skip(_shared_state))]
async fn request_handler(
_shared_state: Rc<State>,
Expand All @@ -37,7 +36,6 @@ async fn request_handler(
}
}

#[cfg(not(tarpaulin_include))]
#[cfg(target_os = "linux")]
fn main() {
let fmt_subscriber = FmtSubscriber::builder()
Expand Down
1 change: 0 additions & 1 deletion src/common/split.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ pub trait SplitAt<T>: Deref<Target = [T]> + Sized {
fn split_at(self, at: usize) -> Option<(Self, Self)>;
}

#[cfg(not(tarpaulin_include))]
impl<T> SplitAt<T> for Vec<T> {
fn split_at(mut self, at: usize) -> Option<(Self, Self)> {
if at > self.len() {
Expand Down
45 changes: 20 additions & 25 deletions src/storage/commit_log/segmented_log/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ impl<SR: SizedRecord, const REPR_SIZE: usize> PersistentSizedRecord<SR, REPR_SIZ
where
S: Storage,
{
let index_record_bytes = source
let record_bytes = source
.read(
position,
&<S::Size as FromPrimitive>::from_usize(REPR_SIZE)
Expand All @@ -79,7 +79,7 @@ impl<SR: SizedRecord, const REPR_SIZE: usize> PersistentSizedRecord<SR, REPR_SIZ
.await
.map_err(IndexError::StorageError)?;

let mut cursor = Cursor::new(index_record_bytes.deref());
let mut cursor = Cursor::new(record_bytes.deref());

SR::read(&mut cursor).map(Self).map_err(IndexError::IoError)
}
Expand Down Expand Up @@ -228,13 +228,13 @@ where
pub fn estimated_index_records_len_in_storage(
storage: &S,
) -> Result<usize, IndexError<S::Error>> {
let index_record_storage_size = storage
let index_storage_size = storage
.size()
.to_usize()
.ok_or(IndexError::IncompatibleSizeType)?;
let estimated_index_records_len = index_record_storage_size
.saturating_sub(INDEX_BASE_MARKER_LENGTH)
/ INDEX_RECORD_LENGTH;

let estimated_index_records_len =
index_storage_size.saturating_sub(INDEX_BASE_MARKER_LENGTH) / INDEX_RECORD_LENGTH;

Ok(estimated_index_records_len)
}
Expand All @@ -258,9 +258,9 @@ where
) -> Result<Vec<IndexRecord>, IndexError<S::Error>> {
let mut position = INDEX_BASE_MARKER_LENGTH as u64;

let mut index_records = Vec::<IndexRecord>::with_capacity(
Self::estimated_index_records_len_in_storage(storage)?,
);
let estimated_index_records_len = Self::estimated_index_records_len_in_storage(storage)?;

let mut index_records = Vec::<IndexRecord>::with_capacity(estimated_index_records_len);

while let Ok(index_record) =
PersistentSizedRecord::<IndexRecord, INDEX_RECORD_LENGTH>::read_at(
Expand All @@ -275,8 +275,6 @@ where

index_records.shrink_to_fit();

let estimated_index_records_len = Self::estimated_index_records_len_in_storage(storage)?;

if index_records.len() != estimated_index_records_len {
Err(IndexError::InconsistentIndexSize)
} else {
Expand Down Expand Up @@ -390,12 +388,9 @@ impl<S: Storage, Idx> Sizable for Index<S, Idx> {

impl<S: Storage, Idx> Index<S, Idx> {
#[inline]
fn underlying_storage_position(
normalized_index: usize,
) -> Result<S::Position, IndexError<S::Error>> {
let storage_position =
(INDEX_BASE_MARKER_LENGTH + INDEX_RECORD_LENGTH * normalized_index) as u64;
u64_as_position!(storage_position, S::Position)
fn index_record_position(normalized_index: usize) -> Result<S::Position, IndexError<S::Error>> {
let position = (INDEX_BASE_MARKER_LENGTH + INDEX_RECORD_LENGTH * normalized_index) as u64;
u64_as_position!(position, S::Position)
}
}

Expand Down Expand Up @@ -442,10 +437,9 @@ where
.ok_or(IndexError::IndexGapEncountered)
.map(|&x| x)
} else {
let position = Self::underlying_storage_position(normalized_index)?;
PersistentSizedRecord::<IndexRecord, INDEX_RECORD_LENGTH>::read_at(
&self.storage,
&position,
&Self::index_record_position(normalized_index)?,
)
.await
.map(|x| x.into_inner())
Expand Down Expand Up @@ -496,7 +490,7 @@ where
let normalized_index = self.internal_normalized_index(idx)?;

self.storage
.truncate(&Self::underlying_storage_position(normalized_index)?)
.truncate(&Self::index_record_position(normalized_index)?)
.await
.map_err(IndexError::StorageError)?;

Expand Down Expand Up @@ -539,7 +533,7 @@ pub(crate) mod test {
};
use futures_lite::StreamExt;
use num::{CheckedSub, FromPrimitive, ToPrimitive, Unsigned, Zero};
use std::{future::Future, hash::Hasher, marker::PhantomData, ops::Deref};
use std::{future::Future, hash::Hasher, marker::PhantomData};

fn _test_records_provider<'a, const N: usize>(
record_source: &'a [&'a [u8; N]],
Expand All @@ -556,9 +550,9 @@ pub(crate) mod test {
where
H: Hasher + Default,
{
record_source
.map(|x| RecordHeader::compute::<H>(x.deref()))
.scan((0, 0), |(index, position), record_header| {
record_source.map(|x| RecordHeader::compute::<H>(x)).scan(
(0, 0),
|(index, position), record_header| {
let index_record =
IndexRecord::with_position_and_record_header::<u32>(*position, record_header)
.unwrap();
Expand All @@ -567,7 +561,8 @@ pub(crate) mod test {
*position += record_header.length as u32;

Some(index_record)
})
},
)
}

async fn _test_index_contains_records<S, Idx, I>(
Expand Down
31 changes: 15 additions & 16 deletions src/storage/commit_log/segmented_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -163,29 +163,28 @@ where
read_segment_base_indices.len(),
);

let index_cache_read_segments = config.num_index_cached_read_segments.is_none();

for segment_base_index in read_segment_base_indices {
read_segments.push(
Segment::with_segment_storage_provider_config_and_base_index(
Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag(
&mut segment_storage_provider,
config.segment_config,
segment_base_index,
index_cache_read_segments,
config.num_index_cached_read_segments.is_none(),
)
.await
.map_err(SegmentedLogError::SegmentError)?,
);
}

let write_segment = Segment::with_segment_storage_provider_config_and_base_index(
&mut segment_storage_provider,
config.segment_config,
write_segment_base_index,
true, // write segment is always cached
)
.await
.map_err(SegmentedLogError::SegmentError)?;
let write_segment =
Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag(
&mut segment_storage_provider,
config.segment_config,
write_segment_base_index,
true, // write segment is always cached
)
.await
.map_err(SegmentedLogError::SegmentError)?;

let cache = match config.num_index_cached_read_segments {
Some(cache_capacity) => {
Expand Down Expand Up @@ -213,7 +212,7 @@ where

macro_rules! new_write_segment {
($segmented_log:ident, $base_index:ident) => {
Segment::with_segment_storage_provider_config_and_base_index(
Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag(
&mut $segmented_log.segment_storage_provider,
$segmented_log.config.segment_config,
$base_index,
Expand Down Expand Up @@ -1011,7 +1010,7 @@ pub(crate) mod test {
let record_count = segmented_log_stream
.zip(futures_lite::stream::iter(expected_records))
.map(|(record, expected_record_value)| {
assert_eq!(record.value.deref(), expected_record_value.deref());
assert_eq!(record.value.deref(), expected_record_value);
Some(())
})
.count()
Expand All @@ -1030,8 +1029,8 @@ pub(crate) mod test {
.zip(segmented_log_stream_bounded)
.zip(futures_lite::stream::iter(expected_records))
.map(|((record_x, record_y), expected_record_value)| {
assert_eq!(record_x.value.deref(), expected_record_value.deref());
assert_eq!(record_y.value.deref(), expected_record_value.deref());
assert_eq!(record_x.value.deref(), expected_record_value);
assert_eq!(record_y.value.deref(), expected_record_value);
Some(())
})
.count()
Expand Down
39 changes: 27 additions & 12 deletions src/storage/commit_log/segmented_log/segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ pub enum SegmentError<StorageError, SerDeError> {
RecordMetadataNotFound,
InvalidAppendIdx,
InvalidIndexRecordGenerated,
UsizeU32Inconvertible,
SegmentMaxed,
}

Expand Down Expand Up @@ -149,17 +150,21 @@ where
.map_err(SegmentError::StoreError)?;

let metadata_bytes_len_bytes_len =
SERP::serialized_size(&0_usize).map_err(SegmentError::SerializationError)?;
SERP::serialized_size(&0_u32).map_err(SegmentError::SerializationError)?;

let (metadata_bytes_len_bytes, metadata_with_value) = record_content
.split_at(metadata_bytes_len_bytes_len)
.ok_or(SegmentError::RecordMetadataNotFound)?;

let metadata_size = SERP::deserialize(&metadata_bytes_len_bytes)
let metadata_bytes_len: u32 = SERP::deserialize(&metadata_bytes_len_bytes)
.map_err(SegmentError::SerializationError)?;

let metadata_bytes_len: usize = metadata_bytes_len
.try_into()
.map_err(|_| SegmentError::UsizeU32Inconvertible)?;

let (metadata_bytes, value) = metadata_with_value
.split_at(metadata_size)
.split_at(metadata_bytes_len)
.ok_or(SegmentError::RecordMetadataNotFound)?;

let metadata =
Expand Down Expand Up @@ -229,8 +234,13 @@ where
let metadata_bytes =
SERP::serialize(&metadata).map_err(SegmentError::SerializationError)?;

let metadata_bytes_len: u32 = metadata_bytes
.len()
.try_into()
.map_err(|_| SegmentError::UsizeU32Inconvertible)?;

let metadata_bytes_len_bytes =
SERP::serialize(&metadata_bytes.len()).map_err(SegmentError::SerializationError)?;
SERP::serialize(&metadata_bytes_len).map_err(SegmentError::SerializationError)?;

enum SBuf<XBuf, YBuf> {
XBuf(XBuf),
Expand Down Expand Up @@ -295,8 +305,13 @@ where
let metadata_bytes =
SERP::serialize(&metadata).map_err(SegmentError::SerializationError)?;

let metadata_bytes_len: u32 = metadata_bytes
.len()
.try_into()
.map_err(|_| SegmentError::UsizeU32Inconvertible)?;

let metadata_bytes_len_bytes =
SERP::serialize(&metadata_bytes.len()).map_err(SegmentError::SerializationError)?;
SERP::serialize(&metadata_bytes_len).map_err(SegmentError::SerializationError)?;

let stream = futures_lite::stream::iter([
Ok::<&[u8], Infallible>(metadata_bytes_len_bytes.deref()),
Expand Down Expand Up @@ -413,11 +428,11 @@ where
Idx: Unsigned + FromPrimitive + Copy + Eq,
SERP: SerializationProvider,
{
pub async fn with_segment_storage_provider_config_and_base_index<SSP>(
pub async fn with_segment_storage_provider_config_base_index_and_cache_index_records_flag<SSP>(
segment_storage_provider: &mut SSP,
config: Config<S::Size>,
base_index: Idx,
cache_index_records: bool,
cache_index_records_flag: bool,
) -> Result<Self, SegmentError<S::Error, SERP::Error>>
where
SSP: SegmentStorageProvider<S, Idx>,
Expand All @@ -427,7 +442,7 @@ where
.await
.map_err(SegmentError::StorageError)?;

let index = if cache_index_records {
let index = if cache_index_records_flag {
Index::with_storage_and_base_index(segment_storage.index, base_index).await
} else {
Index::with_storage_index_records_option_and_validated_base_index(
Expand Down Expand Up @@ -505,7 +520,7 @@ pub(crate) mod test {
Size: FromPrimitive,
SERP: SerializationProvider,
{
let metadata_len_serialized_size = SERP::serialized_size(&0_usize).ok()?;
let metadata_len_serialized_size = SERP::serialized_size(&0_u32).ok()?;

let metadata_serialized_size = SERP::serialized_size(&MetaWithIdx {
metadata: M::default(),
Expand Down Expand Up @@ -547,7 +562,7 @@ pub(crate) mod test {
let config =
_segment_config::<M, Idx, S::Size, SERP>(_RECORDS[0].len(), _RECORDS.len()).unwrap();

let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_and_base_index(
let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records_flag(
&mut _segment_storage_provider,
config,
segment_base_index,
Expand Down Expand Up @@ -578,7 +593,7 @@ pub(crate) mod test {

segment.close().await.unwrap();

let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_and_base_index(
let mut segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records_flag(
&mut _segment_storage_provider,
config,
segment_base_index,
Expand Down Expand Up @@ -654,7 +669,7 @@ pub(crate) mod test {

segment.remove().await.unwrap();

let segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_and_base_index(
let segment = Segment::<S, M, H, Idx, S::Size, SERP>::with_segment_storage_provider_config_base_index_and_cache_index_records_flag(
&mut _segment_storage_provider,
config,
segment_base_index,
Expand Down

0 comments on commit 0ea2395

Please sign in to comment.