Skip to content

Commit

Permalink
feat: adds segmented log read stream benchmark
Browse files Browse the repository at this point in the history
  • Loading branch information
arindas committed Jul 18, 2023
1 parent d3e38a3 commit 1a5296d
Show file tree
Hide file tree
Showing 2 changed files with 256 additions and 0 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,3 +54,7 @@ criterion = { version = "0.5", features = ["html_reports", "async_futures", "asy
[[bench]]
name = "commit_log_append"
harness = false

[[bench]]
name = "segmented_log_read_stream"
harness = false
252 changes: 252 additions & 0 deletions benches/segmented_log_read_stream.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
use criterion::{
async_executor::FuturesExecutor, black_box, criterion_group, criterion_main, BenchmarkId,
Criterion, Throughput,
};
use futures_lite::{stream, StreamExt};
use laminarmq::{
common::serde_compat::bincode,
storage::{
commit_log::{
segmented_log::{segment::Config as SegmentConfig, Config, MetaWithIdx, SegmentedLog},
CommitLog, Record,
},
impls::{
common::DiskBackedSegmentStorageProvider,
in_mem::{segment::InMemSegmentStorageProvider, storage::InMemStorage},
tokio::storage::{StdFileStorage, StdFileStorageProvider},
},
},
};
use std::{convert::Infallible, ops::Deref, path::Path};

fn infallible<T>(t: T) -> Result<T, Infallible> {
Ok(t)
}

fn record<X, Idx>(stream: X) -> Record<MetaWithIdx<(), Idx>, X> {
Record {
metadata: MetaWithIdx {
metadata: (),
index: None,
},
value: stream,
}
}

const LOREM_140: [&[u8]; 4] = [
b"Donec neque velit, pulvinar in sed.",
b"Pellentesque sodales, felis sit et.",
b"Sed lobortis magna sem, eu laoreet.",
b"Praesent quis varius diam. Nunc at.",
];

fn criterion_benchmark_with_record_content<X, XBuf, XE>(
c: &mut Criterion,
record_content: X,
record_content_size: u64,
record_size_group_name: &str,
) where
X: stream::Stream<Item = Result<XBuf, XE>> + Clone + Unpin,
XBuf: Deref<Target = [u8]>,
{
let mut group = c.benchmark_group(record_size_group_name);

for num_appends in (1000..=10000).step_by(1000) {
group
.throughput(Throughput::Bytes(record_content_size * num_appends as u64))
.sample_size(10);

group.bench_with_input(
BenchmarkId::new("in_memory_segmented_log", num_appends),
&num_appends,
|b, &num_appends| {
b.to_async(FuturesExecutor).iter_custom(|_| async {
let config = Config {
segment_config: SegmentConfig {
max_store_size: 1048576,
max_store_overflow: 524288,
max_index_size: 1048576,
},
initial_index: 0,
};

let mut segmented_log = SegmentedLog::<
InMemStorage,
(),
crc32fast::Hasher,
u32,
usize,
bincode::BinCode,
_,
>::new(
config, InMemSegmentStorageProvider::<u32>::default()
)
.await
.unwrap();

for _ in 0..num_appends {
segmented_log
.append(record(record_content.clone()))
.await
.unwrap();
}

let start = std::time::Instant::now();

black_box(segmented_log.stream_unbounded().count().await);

let time_taken = start.elapsed();

drop(segmented_log);

time_taken
});
},
);

group.bench_with_input(
BenchmarkId::new("tokio_segmented_log", num_appends),
&num_appends,
|b, &num_appends| {
b.to_async(tokio::runtime::Runtime::new().unwrap())
.iter_custom(|_| async {
let config = Config {
segment_config: SegmentConfig {
max_store_size: 10000000,
max_store_overflow: 10000000 / 2,
max_index_size: 10000000,
},
initial_index: 0,
};

const TEST_DISK_BACKED_STORAGE_PROVIDER_STORAGE_DIRECTORY: &str =
"/tmp/laminarmq_bench_tokio_std_file_segmented_log_read_stream";

if Path::new(TEST_DISK_BACKED_STORAGE_PROVIDER_STORAGE_DIRECTORY).exists() {
let directory_path =
TEST_DISK_BACKED_STORAGE_PROVIDER_STORAGE_DIRECTORY;
tokio::fs::remove_dir_all(directory_path).await.unwrap();
}

let disk_backed_storage_provider = DiskBackedSegmentStorageProvider::<
_,
_,
u32,
>::with_storage_directory_path_and_provider(
TEST_DISK_BACKED_STORAGE_PROVIDER_STORAGE_DIRECTORY,
StdFileStorageProvider,
)
.unwrap();

let mut segmented_log =
SegmentedLog::<
StdFileStorage,
(),
crc32fast::Hasher,
u32,
u64,
bincode::BinCode,
_,
>::new(config, disk_backed_storage_provider)
.await
.unwrap();

for _ in 0..num_appends {
segmented_log
.append(record(record_content.clone()))
.await
.unwrap();
}

let start = tokio::time::Instant::now();

black_box(segmented_log.stream_unbounded().count().await);

let time_taken = start.elapsed();

drop(segmented_log);

time_taken
});
},
);
}
}

fn benchmark_tiny_message_read_stream(c: &mut Criterion) {
// 12 bytes
let tiny_message = stream::once(infallible(b"Hello World!" as &[u8]));

criterion_benchmark_with_record_content(
c,
tiny_message,
12,
"segmented_log_read_stream_with_tiny_message",
);
}

fn benchmark_tweet_read_stream(c: &mut Criterion) {
// 140 bytes: pre-2017 Twitter tweet limit
let tweet = stream::iter(LOREM_140.iter().cloned().map(infallible));

criterion_benchmark_with_record_content(c, tweet, 140, "segmented_log_read_stream_with_tweet");
}

fn benchmark_half_k_message_read_stream(c: &mut Criterion) {
// 2940 bytes: within 3000 character limit on LinkedIn posts
let k_message = stream::iter(LOREM_140.iter().cloned().cycle().take(4).map(infallible));

criterion_benchmark_with_record_content(
c,
k_message,
2940,
"segmented_log_read_stream_with_half_k_message",
);
}

fn benchmark_k_message_read_stream(c: &mut Criterion) {
// 2940 bytes: within 3000 character limit on LinkedIn posts
let k_message = stream::iter(LOREM_140.iter().cloned().cycle().take(8).map(infallible));

criterion_benchmark_with_record_content(
c,
k_message,
2940,
"segmented_log_read_stream_with_k_message",
);
}

fn benchmark_linked_in_post_read_stream(c: &mut Criterion) {
// 2940 bytes: within 3000 character limit on LinkedIn posts
let linked_in_post = stream::iter(LOREM_140.iter().cloned().cycle().take(21).map(infallible));

criterion_benchmark_with_record_content(
c,
linked_in_post,
2940,
"segmented_log_read_stream_with_linked_in_post",
);
}

fn benchmark_blog_post_read_stream(c: &mut Criterion) {
// 2940 * 4 bytes
let linked_in_post = stream::iter(LOREM_140.iter().cloned().cycle().take(84).map(infallible));

criterion_benchmark_with_record_content(
c,
linked_in_post,
2940,
"segmented_log_read_stream_with_blog_post",
);
}

criterion_group!(
benches,
benchmark_tiny_message_read_stream,
benchmark_tweet_read_stream,
benchmark_half_k_message_read_stream,
benchmark_k_message_read_stream,
benchmark_linked_in_post_read_stream,
benchmark_blog_post_read_stream
);
criterion_main!(benches);

0 comments on commit 1a5296d

Please sign in to comment.