Skip to content

Commit

Permalink
doc: documents RecordHeader and Store in segmented_log::store
Browse files Browse the repository at this point in the history
  • Loading branch information
arindas committed May 24, 2024
1 parent 89e863c commit ec7874f
Showing 1 changed file with 38 additions and 3 deletions.
41 changes: 38 additions & 3 deletions src/storage/commit_log/segmented_log/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use futures_lite::StreamExt;
use std::{error::Error as StdError, hash::Hasher, marker::PhantomData, ops::Deref};

pub mod common {
//! Module providing common entities for all [`Store`](super::Store) implementations.
use std::{
hash::Hasher,
io::{ErrorKind::UnexpectedEof, Read, Write},
io::{self, ErrorKind::UnexpectedEof, Read, Write},
};

use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
Expand All @@ -24,14 +26,18 @@ pub mod common {
/// Number of bytes required for storing the record header.
pub const RECORD_HEADER_LENGTH: usize = 16;

/// Header containing the checksum and length of the bytes contained within a Record.
///
/// Used for maintaining data integrity of all persisted data.
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
pub struct RecordHeader {
pub checksum: u64,
pub length: u64,
}

impl RecordHeader {
pub fn read<R: Read>(source: &mut R) -> std::io::Result<RecordHeader> {
/// Reads a [`RecordHeader`] header instance from the given [`Read`] impl.
pub fn read<R: Read>(source: &mut R) -> io::Result<Self> {

Check warning on line 40 in src/storage/commit_log/segmented_log/store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/commit_log/segmented_log/store.rs#L40

Added line #L40 was not covered by tests
let checksum = source.read_u64::<LittleEndian>()?;
let length = source.read_u64::<LittleEndian>()?;

Expand All @@ -42,13 +48,16 @@ pub mod common {
}
}

pub fn write<W: Write>(&self, dest: &mut W) -> std::io::Result<()> {
/// Writes this [`RecordHeader`] instance to the given [`Write`] impl.
pub fn write<W: Write>(&self, dest: &mut W) -> io::Result<()> {

Check warning on line 52 in src/storage/commit_log/segmented_log/store.rs

View check run for this annotation

Codecov / codecov/patch

src/storage/commit_log/segmented_log/store.rs#L52

Added line #L52 was not covered by tests
dest.write_u64::<LittleEndian>(self.checksum)?;
dest.write_u64::<LittleEndian>(self.length)?;

Ok(())
}

/// Computes and returns the [`RecordHeader`] for a record containing the
/// given `record_bytes`.
pub fn compute<H>(record_bytes: &[u8]) -> Self
where
H: Hasher + Default,
Expand All @@ -65,6 +74,17 @@ pub mod common {
}
}

/// Unit of persistence within a [`Segment`](super::segment::Segment).
///
/// <p align="center">
/// <img src="https://raw.githubusercontent.com/arindas/laminarmq/assets/assets/diagrams/laminarmq-indexed-segmented-log-segment.drawio.png" alt="segmented_log_segment" />
/// </p>
/// <p align="center">
/// <b>Fig:</b> <code>Segment</code> diagram showing <code>Store</code>, persisting
/// record bytes at positions mapped out by the <code>Index</code> records.
/// </p>
///
/// A [`Store`] contains a backing [`Storage`] impl instance to persist record bytes.
pub struct Store<S, H> {
storage: S,

Expand All @@ -78,6 +98,7 @@ impl<S: Default, H> Default for Store<S, H> {
}

impl<S, H> Store<S, H> {
/// Creates a new [`Store`] instance from the given backing [`Storage`] instance.
pub fn new(storage: S) -> Self {
Self {
storage,
Expand All @@ -88,9 +109,17 @@ impl<S, H> Store<S, H> {

#[derive(Debug)]
pub enum StoreError<SE> {
/// Used to denote errors from the backing [`Storage`] implementation.
StorageError(SE),

/// Used when the type used for representing sizes is incompatible with [`u64`].
IncompatibleSizeType,

/// Used in the case of a data integrity error when the computed [`RecordHeader`]
/// doesn't match the designated [`RecordHeader`] for a given record.
RecordHeaderMismatch,

/// Used when reading from an empty [`Store`].
ReadOnEmptyStore,
}

Expand Down Expand Up @@ -126,6 +155,8 @@ where
S: Storage,
H: Hasher + Default,
{
/// Reads record bytes for a record persisted at the given `position` with the designated
/// [`RecordHeader`].
pub async fn read(
&self,
position: &S::Position,
Expand All @@ -151,6 +182,10 @@ where
Ok(record_bytes)
}

/// Appends the bytes for a new record at the end of this store.
///
/// Returns the computed [`RecordHeader`] for the provided record bytes along with the
/// position where the record was written.
pub async fn append<XBuf, X, XE>(
&mut self,
stream: X,
Expand Down

0 comments on commit ec7874f

Please sign in to comment.