Skip to content

A scalable, distributed message queue powered by a segmented, partitioned, replicated and immutable log.

License

Notifications You must be signed in to change notification settings

arindas/laminarmq

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

laminarmq

A scalable, distributed message queue powered by a segmented, partitioned, replicated and immutable log.
This is currently a work in progress.

Usage

laminarmq provides a library crate and two binaries for managing laminarmq deployments. In order to use laminarmq as a library, add the following to your Cargo.toml:

[dependencies]
laminarmq = "0.0.4"

The current implementation based on glommio runs only on linux. glommio requires io_uring support in the linux kernel.

Glommio requires a kernel with a recent enough io_uring support, at least current enough to run discovery probes. The minimum version at this time is 5.8.

Please also note Glommio requires at least 512 KiB of locked memory for io_uring to work. You can increase the memlock resource limit (rlimit) as follows:

$ vi /etc/security/limits.conf
*    hard    memlock        512
*    soft    memlock        512

Please note that 512 KiB is the minimum needed to spawn a single executor. Spawning multiple executors may require you to raise the limit accordingly. To make the new limits effective, you need to log in to the machine again. You can verify that the limits are updated by running the following:

$ ulimit -l
512

Refer to latest git API Documentation or Crate Documentation for more details. There's also a book being written to further describe design decisions, implementation details and recipes.

laminarmq presents an elementary commit-log abstraction (a series of records ordered by offsets), on top of which several message queue semantics such as publish subscribe or even full blown protocols like MQTT could be implemented. Users are free to read the messages with offsets in any order they need.

Major milestones for laminarmq

  • Single node, single threaded message queue with RPC server
  • Single node, multi threaded, eBPF based request to thread routed message queue
  • Service discovery with SWIM.
  • Replication and consensus of replicated records with Raft.

Design

This section describes the internal design of laminarmq.

Execution Model

execution-model

Fig: Execution model depicting application level partitioning and request level parallelism.

laminarmq uses the thread-per-core execution model where individual processor cores are limited to single threads. This model encourages design that minimizes inter-thread contention and locks, thereby improving tail latencies in software services. Read: The Impact of Thread per Core Architecture on Application Tail Latency.

In our case, each thread is responsible for servicing only a subset of the partitions. Requests pertaining to a specific partition are always routed to the same thread. This greatly increases locality of requests. The routing mechanism could be implemented in a several ways:

  • Each thread listens on a unique port. We have a reverse proxy of sorts to forward requests to specific ports.
  • An eBPF based packet filter is used to route packets to appropriate threads based on the topic and partition of the request to which the packet belongs to.

Since each processor core is limited to a single thread, tasks in a thread need to be scheduled efficiently. Hence each worker thread runs their own task scheduler. Tasks can be scheduled on different task queues and different task queues can be provisioned with specific fractions of CPU time shares.

The current implementation only provides a single node, single threaded server; request routing is yet to be implemented. However, the eBPF route is more desirable since it doesn't require any additional dependency on the user's end.

laminarmq is generic enough to be implemented with different async runtimes. However, for our initial release we have elected glommio as our thread-per-core runtime. glommio leverages the new linux 5.x io_uring API which facilitates true asynchronous IO for both networking and disk interfaces. (Other async runtimes such as tokio make blocking system calls for disk IO operations in a thread-pool.)

io_uring also has the advantage of being able to queue together multiple system calls together and then asynchronously wait for their completion by making a maximum of one context switch. It is also possible to avoid context switches altogether. This is achieved with a pair of ring buffers called the submission-queue and the completion-queue. Once the queues are set up, user can queue multiple system calls on the submission queue. The linux kernel processes the system calls and places the results in the completion queue. The user can then freely read the results from the completion-queue. This entire process after setting up the queues doesn't require any additional context switch.

Read more: https://man.archlinux.org/man/io_uring.7.en

glommio presents additional abstractions on top of io_uring in the form of an async runtime, with support for networking, disk IO, channels, single threaded locks and more.

Read more: https://www.datadoghq.com/blog/engineering/introducing-glommio/

Architecture

This section describes the planned architecture for making our message queue distributed across multiple nodes.

Storage Hierarchy

Data is persisted in laminarmq with the following hierarchy:

[cluster]
├── node#001
│   ├── (topic#001 -> partition#001) [L]
│   │   └── segmented_log{[segment#001, segment#002, ...]}
│   ├── (topic#001 -> partition#002) [L]
│   │   └── segmented_log{[segment#001, segment#002, ...]}
│   └── (topic#002 -> partition#001) [F]
│       └── segmented_log{[segment#001, segment#002, ...]}
├── node#002
│   ├── (topic#001 -> partition#002) [F]
│   │   └── segmented_log{[segment#001, segment#002, ...]}
│   └── (topic#002 -> partition#001) [L]
│       └── segmented_log{[segment#001, segment#002, ...]}
└── ...other nodes

Every "partition" is backed by a persistent, segmented log. A log is an append only collection of "message"(s). Messages in a "partition" are accessed using their "offset" i.e. location of the "message"'s bytes in the log.

segmented_log: Persistent data structure for storing records in a partition

The segmented-log data structure for storing records is inspired from Apache Kafka.

segmented_log

Fig: File organisation for persisting the segmented_log data structure on a *nix file system.

A segmented log is a collection of read segments and a single write segment. Each "segment" is backed by a storage file on disk called "store".

The log is:

  • "immutable", since only "append", "read" and "truncate" operations are allowed. It is not possible to update or delete records from the middle of the log.
  • "segmented", since it is composed of segments, where each segment services records from a particular range of offsets.

All writes go to the write segment. A new record is written at offset = write_segment.next_offset in the write segment. When we max out the capacity of the write segment, we close the write segment and reopen it as a read segment. The re-opened segment is added to the list of read segments. A new write segment is then created with base_offset equal to the next_offset of the previous write segment.

When reading from a particular offset, we linearly check which segment contains the given read segment. If a segment capable of servicing a read from the given offset is found, we read from that segment. If no such segment is found among the read segments, we default to the write segment. The following scenarios may occur when reading from the write segment in this case:

  • The write segment has synced the messages including the message at the given offset. In this case the record is read successfully and returned.
  • The write segment hasn't synced the data at the given offset. In this case the read fails with a segment I/O error.
  • If the offset is out of bounds of even the write segment, we return an "out of bounds" error.

laminarmq specific enhancements to the segmented_log data structure

While the conventional segmented_log data structure is quite performant for a commit_log implementation, it still requires the following properties to hold true for the record being appended:

  • We have the entire record in memory
  • We know the record bytes' length and record bytes' checksum before the record is appended

It's not possible to know this information when the record bytes are read from an asynchronous stream of bytes. Without the enhancements, we would have to concatenate intermediate byte buffers to a vector. This would not only incur more allocations, but also slow down our system.

Hence, to accommodate this use case, we introduced an intermediate indexing layer to our design.

segmented_log

Fig: Data organisation for persisting the segmented_log data structure on a *nix file system.

In the new design, instead of referring to records with a raw offset, we refer to them with indices. The index in each segment translates the record indices to raw file position in the store file.

Now, the store append operation accepts an asynchronous stream of bytes instead of a contiguously laid out slice of bytes. We use this operation to write the record bytes, and at the time of writing the record bytes, we calculate the record bytes' length and checksum. Once we are done writing the record bytes to the store, we writes it's corresponding record_header (containing the checksum and length), position and index as an index_record in the index.

This provides two quality of life enhancements:

  • Allow asynchronous streaming writes, without having to concatenate intermediate byte buffers
  • Records are accessed much more easily with easy to use indices

Now, to prevent a malicious user from overloading our storage capacity and memory with a maliciously crafted request which infinitely loops over some data and sends it to our server, we have provided an optional append_threshold parameter to all append operations. When provided, it prevents streaming append writes to write more bytes than the provided append_threshold.

At the segment level, this requires us to keep a segment overflow capacity. All segment append operations now use segment_capacity - segment.size + segment_overflow_capacity as the append_threshold value. A good segment_overflow_capacity value could be segment_capacity / 2.

Replication and Partitioning (or redundancy and horizontal scaling)

A particular "node" contains some or all "partition"(s) of a "topic". Hence a "topic" is both partitioned and replicated within the nodes. The data is partitioned with the dividing of the data among the "partition"(s), and replicated by replicating these "partition"(s) among the other "node"(s).

Each partition is part of a Raft group; e.g each replica of (topic#001 -> partition#001) is part of a Raft group, while each replica of (topic#002 -> partition#002) are part of a different Raft group. A particular "node" might host some Raft leader "partition"(s) as well as Raft follower "partition"(s). For instance in the above example data persistence hierarchy, the [L] denote leaders, and the [F] denote followers.

If a node goes down:

  • For every leader partition in that node:
    • if there are no other follower replicas in other nodes in it's Raft group, that partition goes down.
    • if there are other follower replicas in other nodes, there are leader elections among them and after a leader is elected, reads and writes for that partition proceed normally
  • For every follower partition in that node:
    • the remaining replicas in the same raft group continue to function in accordance with Raft's mechanisms.

CockroachDB and Tikv call this manner of using different Raft groups for different data buckets on the same node as MultiRaft.

Read more here:

Service Discovery

Now we maintain a "member-list" abstraction of all "node"(s) which states which nodes are online in real time. This "member-list" abstraction is able to respond to events such as a new node joining or leaving the cluster. (It internally uses a gossip protocol for membership information dissemination.) This abstraction can also handle queries like the different "partition"(s) hosted by a particular node. Using this we do the following:

  • Create new replicas of partitions or rebalance partitions when a node joins or leaves the cluster.
  • Identify which node holds which partition in real time. This information can be used for client side load balancing when reading or writing to a particular partition.

Data Retention SLA

A "segment_age" configuration is made available to configure the maximum allowed age of segments. Since all partitions maintain consistency using Raft consensus, they have completely identical message-segment distribution. At regular intervals, segments with age greater than the specified "segment_age" are removed and the messages stored in these segments are lost. A good value for "segment_age" could be 7 days.

Testing

laminarmq is currently implemented with the glommio async runtime which requires an updated linux kernel (at least 5.8) with io_uring support. glommio also requires at least 512 KiB of locked memory for io_uring to work. (Note: 512 KiB is the minimum needed to spawn a single executor. Spawning multiple executors may require you to raise the limit accordingly. I recommend 8192 KiB on a 8 GiB RAM machine.)

First, check the current memlock limit:

ulimit -l

# 512 ## sample output

If the memlock resource limit (rlimit) is lesser than 512 KiB, you can increase it as follows:

sudo vi /etc/security/limits.conf
*    hard    memlock        512
*    soft    memlock        512

To make the new limits effective, you need to log in to the machine again. Verify whether the limits have been reflected with ulimit as described above.

(On old WSL versions, you might need to spawn a login shell every time for the limits to be reflected:

su ${USER} -l

The limits persist once inside the login shell.

This is not necessary on the latest WSL2 version as of 22.12.2022)

Finally, clone the repository and run the tests:

git clone https://github.com/arindas/laminarmq.git
cd laminarmq/
cargo test

License

laminarmq is licensed under the MIT License. See License for more details.

About

A scalable, distributed message queue powered by a segmented, partitioned, replicated and immutable log.

Topics

Resources

License

Stars

Watchers

Forks

Packages

No packages published

Languages