Skip to content

Commit

Permalink
style: rewrites SegmentedLog::new to reduce Segment::new calls
Browse files Browse the repository at this point in the history
  • Loading branch information
arindas committed Feb 7, 2024
1 parent 3cdae60 commit 0d2f3fb
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 50 deletions.
86 changes: 55 additions & 31 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ order they need.
- [x] Locally persistent queue of records
- [ ] Single node, multi threaded, eBPF based request to thread routed message queue
- [ ] Service discovery with
[SWIM](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf).
[SWIM](https://www.cs.cornell.edu/projects/Quicksilver/public_pdfs/SWIM.pdf).
- [ ] Replication and consensus of replicated records with [Raft](https://raft.github.io/raft.pdf).

## Examples
Expand Down Expand Up @@ -76,8 +76,9 @@ partition_id_1 = (topic_id_0, partition_idx_1)
partition_id_2 = (topic_id_1, partition_idx_0)
```
>The exact numerical ids don't have any pattern with respect to partition_id and topic_id; there can
>be multiple topics, each of which can have multiple partitions (identified by partition_idx).

> The exact numerical ids don't have any pattern with respect to partition_id and topic_id; there can
> be multiple topics, each of which can have multiple partitions (identified by partition_idx).
… alternatively:

Expand All @@ -98,6 +99,7 @@ partition_id_2 = (topic_id_1, partition_idx_0)
└── ...other nodes
```

```text
[L] := leader; [F] := follower
```
Expand All @@ -119,6 +121,7 @@ have chosen to maintain a flat representation of topic partitions. We present an
commit-log API at the partition level.

Users may hence do the following:

- Directly interact with our message queue at the partition level
- Use client side load balancing between topic partitions

Expand Down Expand Up @@ -169,20 +172,21 @@ using Rendezvous hashing.

From the Wikipedia [article](https://en.wikipedia.org/wiki/Rendezvous_hashing):

>Rendezvous or highest random weight (HRW) hashing is an algorithm that allows clients to achieve
>distributed agreement on a set of _k_ options out of a possible set of _n_ options. A typical
>application is when clients need to agree on which sites (or proxies) objects are assigned to.
> Rendezvous or highest random weight (HRW) hashing is an algorithm that allows clients to achieve
> distributed agreement on a set of _k_ options out of a possible set of _n_ options. A typical
> application is when clients need to agree on which sites (or proxies) objects are assigned to.
In our case, we use rendezvous hashing to determine the subset of nodes to use for placing the
replicas of a partition.

For some hashing function `H`, some weight function `f(w, hash)` and partition id `P_x`, we proceed
as follows:

- For every node `N_i` in the cluster with a weight `w_i`, we compute `R_i = f(w_i, H(concat(P_x,
N_i)))`
- We rank all nodes `N_i` belonging to the set of nodes `N` with respect to their rank value `R_i`.
- For some replication factor `k`, we select the top `k` nodes to place the `k` replicas of the
partition with id `P_x`
partition with id `P_x`

(We assume `k <= |N|`; where `|N|` is the number of nodes and `k` is the number of replicas)

Expand All @@ -198,14 +202,14 @@ current leader of the replica set.
### Supported execution models

`laminarmq` supports two execution models:

- General async execution model used by various async runtimes in the Rust ecosystem (e.g `tokio`)
- Thread per core execution model

In the thread-per-core execution model individual processor cores are limited to single threads.
This model encourages design that minimizes inter-thread contention and locks, thereby improving
tail latencies in software services. Read: [The Impact of Thread per Core Architecture on
Application Tail Latency.](
https://helda.helsinki.fi//bitstream/handle/10138/313642/tpc_ancs19.pdf?sequence=1)
Application Tail Latency.](https://helda.helsinki.fi//bitstream/handle/10138/313642/tpc_ancs19.pdf?sequence=1)

In the thread per core execution model, we have to leverage application level partitioning such that
each individual thread is responsible for a subset of requests and/or responsibilities. We also have
Expand All @@ -231,9 +235,10 @@ model.
</p>

In our cluster, we have two kinds of requests:
- __membership requests__: used by the gossip style service discovery system for maintaining cluster
membership.
- __partition requests__: used to interact with `laminarmq` topic partitions.

- **membership requests**: used by the gossip style service discovery system for maintaining cluster
membership.
- **partition requests**: used to interact with `laminarmq` topic partitions.

We use an [eBPF](https://ebpf.io/what-is-ebpf/) XDP filter to classify request packets at the socket
layer into membership request packets and partition request packets. Next we use eBPF to route
Expand All @@ -243,24 +248,26 @@ subsystem in that node. The partition request packets are left to flow as is.
Next we have an "HTTP server", which parses the incoming partition request packets from the original
socket into valid `partition::*` requests. For every `partition::*` request, the HTTP server spawns
a future to handle it. This request handler future does the following:

- Create a new channel `(tx, rx)` for the request.
- Send the parsed partition request along with send end of the channel `(partition::*, tx)` to the
"Request Router" over the request router's receiving channel.
"Request Router" over the request router's receiving channel.
- Await on the recv. end of the channel created by this future for the response. `res = rx.await`
- When we receive the response from this future's channel, we serialize it and respond back to the
socket we had received the packets from.
socket we had received the packets from.

Next we have a "Request Router / Partition manager" responsible for routing various requests to the
partition serving futures. The request router unit receives both `membership::*` requests from the
membership subsystem and `partition::*` requests received from the "HTTP server" request handler
futures (also called request poller futures from here on since they poll for the response from the
channel recv. `rx` end). The request router unit routes requests as follows:

- `membership::*` requests are broadcast to all the partition serving futures
- `(partition::*_request(partition_id_x, …), tx)` tuples are routed to their destination partitions
using the `partition_id`.
using the `partition_id`.
- `(partition::create(partition_id_x, …), tx)` tuples are handled by the request router/ partition
manager itself. For this, the request router / partition manager creates a new partition serving
future, allocates the required storage units or it and sends and appropriate response on `tx`.
manager itself. For this, the request router / partition manager creates a new partition serving
future, allocates the required storage units or it and sends and appropriate response on `tx`.

Finally, the individual partition server futures receive both `membership::*` and `(partition::*,
tx)` requests as they come to our node and routed. They handle the requests as necessary and send a
Expand Down Expand Up @@ -338,7 +345,7 @@ The partition request handler handles the different requests as follows:
replicas, we initial the leadership election process with each replica as a candidate.

- `membership::leave(j)`: remove {node #j} from priority queue and Raft group if present. If `{node
#j}` was not present in the Raft group no further action is necessary. If it was present in the
#j}` was not present in the Raft group no further action is necessary. If it was present in the
Raft group, `pop()` another member from the priority queue, add it to the Raft group and proceed
similarly as in the case of `membership::join(j)`

Expand All @@ -352,6 +359,7 @@ The partition request handler handles the different requests as follows:
When a node goes down the appropriate `membership::leave(i)` message (where `i` is the node that
went down) is sent to all the nodes in the cluster. The partition replica controllers in each node
handle the membership request accordingly. In effect:

- For every leader partition in that node:
- if there are no other follower replicas in other nodes in it's Raft group, that partition goes
down.
Expand All @@ -369,6 +377,7 @@ In our system, we use different Raft groups for different data buckets (replica
different Raft groups for different data buckets on the same node as MultiRaft.

Read more here:

- <https://tikv.org/deep-dive/scalability/multi-raft/>
- <https://www.cockroachlabs.com/blog/scaling-raft/>

Expand All @@ -377,6 +386,7 @@ Every partition controller is backed by a `segmented_log` for persisting records
### Persistence mechanism

#### `segmented_log`: Persistent data structure for storing records in a partition

The segmented-log data structure for storing was originally described in the [Apache
Kafka](https://www.microsoft.com/en-us/research/wp-content/uploads/2017/09/Kafka.pdf) paper.

Expand All @@ -392,6 +402,7 @@ A segmented log is a collection of read segments and a single write segment. Eac
backed by a storage file on disk called "store".

The log is:

- "immutable", since only "append", "read" and "truncate" operations are allowed. It is not possible
to update or delete records from the middle of the log.
- "segmented", since it is composed of segments, where each segment services records from a
Expand All @@ -407,16 +418,19 @@ When reading from a particular offset, we linearly check which segment contains
segment. If a segment capable of servicing a read from the given offset is found, we read from that
segment. If no such segment is found among the read segments, we default to the write segment. The
following scenarios may occur when reading from the write segment in this case:

- The write segment has synced the messages including the message at the given offset. In this case
the record is read successfully and returned.
- The write segment hasn't synced the data at the given offset. In this case the read fails with a
segment I/O error.
- If the offset is out of bounds of even the write segment, we return an "out of bounds" error.

#### `laminarmq` specific enhancements to the `segmented_log` data structure

While the conventional `segmented_log` data structure is quite performant for a `commit_log`
implementation, it still requires the following properties to hold true for the record being
appended:

- We have the entire record in memory
- We know the record bytes' length and record bytes' checksum before the record is appended

Expand All @@ -443,6 +457,7 @@ records[i+1].position = records[i].position + records[i].record_header.length
// segment index invariants in segmented_log
segments[i+1].base_index = segments[i].highest_index = segments[i].index[index.len-1].index + 1
```

<p align="center">
<b>Fig:</b> Data organisation for persisting the <code>segmented_log</code> data structure on a
<code>*nix</code> file system.
Expand All @@ -459,6 +474,7 @@ record bytes to the store, we write it's corresponding `record_header` (containi
length), position and index as an `index_record` in the segment index.

This provides two quality of life enhancements:

- Allow asynchronous streaming writes, without having to concatenate intermediate byte buffers
- Records are accessed much more easily with easy to use indices

Expand Down Expand Up @@ -486,15 +502,16 @@ This execution model is based on the executor, reactor, waker abstractions used
runtimes. We don't have to specifically care about how and where a particular future is executed.

The data flow in this execution model is as follows:

- A HTTP server future parses HTTP requests from the request socket
- For every HTTP request it creates a new future to handle it
- The HTTP handler future sends the request and a response channel tx to the request router via a channel.
It also awaits on the response rx end.
It also awaits on the response rx end.
- The request router future maintains a map of partition_id to designated request channel tx for each
partition controller future.
partition controller future.
- For every partition request received it forwards the request on the appropriate partition request
channel tx. If a `partition::create(...)` request is received it creates a new partition controller
future.
channel tx. If a `partition::create(...)` request is received it creates a new partition controller
future.
- The partition controller future send back the response to the provided response channel tx.
- The response poller future received it and responds back with a serialized response to the socket.

Expand Down Expand Up @@ -523,6 +540,7 @@ than the one that runs tasks for persisting data to the disk.
We re-use the same constructs that we use in the general async runtime execution model. The only
difference being, we explicitly care about in which task queue a class of future's tasks are
executed. In our case, we have the following 4 task queues:

- Request router task queue
- HTTP server request parser task queue
- Partition replica controller task queue
Expand Down Expand Up @@ -573,6 +591,7 @@ ulimit -l
```

If the `memlock` resource limit (rlimit) is lesser than 512 KiB, you can increase it as follows:

```sh
sudo vi /etc/security/limits.conf
* hard memlock 512
Expand All @@ -582,15 +601,18 @@ sudo vi /etc/security/limits.conf
To make the new limits effective, you need to log in to the machine again. Verify whether the limits
have been reflected with `ulimit` as described above.

>(On old WSL versions, you might need to spawn a login shell every time for the limits to be
>reflected:
>```sh
>su ${USER} -l
>```
>The limits persist once inside the login shell. This is not necessary on the latest WSL2 version as
>of 22.12.2022)
> (On old WSL versions, you might need to spawn a login shell every time for the limits to be
> reflected:
>
> ```sh
> su ${USER} -l
> ```
>
> The limits persist once inside the login shell. This is not necessary on the latest WSL2 version as
> of 22.12.2022)
Finally, clone the repository and run the tests:
```sh
git clone https://github.com/arindas/laminarmq.git
cd laminarmq/
Expand All @@ -611,13 +633,15 @@ cargo bench
The complete latest benchmark reports are available at <https://arindas.github.io/laminarmq/bench/latest/report>.

All benchmarks in the reports have been run on a machine (HP Pavilion x360 Convertible 14-ba0xx) with:

- 4 core CPU (Intel(R) Core(TM) i5-7200U CPU @ 2.50GHz)
- 8GB RAM (SK Hynix HMA81GS6AFR8N-UH DDR4 2133 MT/s)
- 128GB SSD storage (SanDisk SD8SN8U-128G-1006)

### Selected Benchmark Reports

__Note__: We use the following names for different record sizes:
**Note**: We use the following names for different record sizes:

<table>
<tr>
<td><b>size_name</b></td>
Expand Down Expand Up @@ -652,7 +676,7 @@ All benchmarks in the reports have been run on a machine (HP Pavilion x360 Conve
<tr>
<td><code>blog</code></td>
<td><code>11760 bytes (11.76 KB)</code></td>
<td><i>4x</i> <code>linked_in_post</code></td>
<td><code>4X linked_in_post</code></td>
</tr>
</table>

Expand Down
33 changes: 14 additions & 19 deletions src/storage/commit_log/segmented_log/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ where
config: Config<Idx, S::Size>,
mut segment_storage_provider: SSP,
) -> Result<Self, LogError<S, SERP, C>> {
let mut segment_base_indices = segment_storage_provider
let segment_base_indices = segment_storage_provider
.obtain_base_indices_of_stored_segments()
.await
.map_err(SegmentedLogError::StorageError)?;
Expand All @@ -251,36 +251,31 @@ where
_ => Ok(()),
}?;

let write_segment_base_index = segment_base_indices.pop().unwrap_or(config.initial_index);
let (segment_base_indices, write_segment_base_index) =
match segment_base_indices.last().cloned() {
Some(last_index) => (segment_base_indices, last_index),
None => (vec![config.initial_index], config.initial_index),
};

let read_segment_base_indices = segment_base_indices;
let mut segments = Vec::with_capacity(segment_base_indices.len());

let mut read_segments = Vec::<Segment<S, M, H, Idx, S::Size, SERP>>::with_capacity(
read_segment_base_indices.len(),
);
for segment_base_index in segment_base_indices {
let cache_index_records_flag = (segment_base_index == write_segment_base_index)
|| config.num_index_cached_read_segments.is_none();

for segment_base_index in read_segment_base_indices {
read_segments.push(
segments.push(
Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag(
&mut segment_storage_provider,
config.segment_config,
segment_base_index,
config.num_index_cached_read_segments.is_none(),
cache_index_records_flag ,
)
.await
.map_err(SegmentedLogError::SegmentError)?,
);
}

let write_segment =
Segment::with_segment_storage_provider_config_base_index_and_cache_index_records_flag(
&mut segment_storage_provider,
config.segment_config,
write_segment_base_index,
true, // write segment is always cached
)
.await
.map_err(SegmentedLogError::SegmentError)?;
let write_segment = segments.pop().ok_or(SegmentedLogError::WriteSegmentLost)?;

let cache = match config.num_index_cached_read_segments {
Some(cache_capacity) => {
Expand All @@ -298,7 +293,7 @@ where

Ok(Self {
write_segment: Some(write_segment),
read_segments,
read_segments: segments,
config,
segments_with_cached_index: cache,
segment_storage_provider,
Expand Down

0 comments on commit 0d2f3fb

Please sign in to comment.