-
-
Notifications
You must be signed in to change notification settings - Fork 186
Ractor-based deserialization engine #2938
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This PR introduces a Ractor-based parallel deserialization engine for Karafka, targeting Ruby 4.0+. It renames the Deserializers feature to Deserializing throughout the codebase while maintaining backward compatibility through aliases. The implementation adds configurable parallel deserialization that dispatches work to a Ractor pool when message batches meet defined thresholds.
Key Changes:
- Renamed
DeserializerstoDeserializingwith backward-compatible aliases - Added parallel deserialization infrastructure using Ractor workers with configurable pool size, batch thresholds, and payload size thresholds
- Implemented early dispatch mechanism in executor to allow Ractors to work while jobs wait in the queue
Reviewed changes
Copilot reviewed 41 out of 45 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
lib/karafka/routing/features/deserializing.rb |
Renamed feature class from Deserializers to Deserializing, added post_setup to initialize Ractor pool |
lib/karafka/routing/features/deserializing/topic.rb |
Updated topic routing API to use deserializing method with backward-compatible deserializers alias |
lib/karafka/routing/features/deserializing/config.rb |
Added parallel field to Config struct and parallel? method for checking if parallel deserialization is enabled |
lib/karafka/routing/features/deserializing/contracts/topic.rb |
Updated validation contract to require parallel boolean field |
lib/karafka/deserializing/parallel/pool.rb |
Implements Ractor worker pool for parallel deserialization with work distribution via ports |
lib/karafka/deserializing/parallel/future.rb |
Represents pending async deserialization, collects results from Ractor workers |
lib/karafka/deserializing/parallel/immediate.rb |
No-op implementation when parallel deserialization is skipped |
lib/karafka/deserializing/parallel/injector.rb |
Injects pre-deserialized payloads back into Message objects |
lib/karafka/deserializing/deserializers/*.rb |
Moved deserializers from Karafka::Deserializers to Karafka::Deserializing::Deserializers with Base class that freezes instances |
lib/karafka/setup/config.rb |
Added deserializing.parallel configuration namespace with pool settings |
lib/karafka/setup/contracts/config.rb |
Added validation rules for parallel deserialization configuration |
lib/karafka/processing/executor.rb |
Added early dispatch logic in before_schedule_consume to send work to Ractor pool |
lib/karafka/processing/strategies/default.rb |
Added result retrieval and injection in handle_before_consume |
lib/karafka/messages/message.rb |
Added payload= setter for injecting pre-deserialized payloads |
lib/karafka/messages/batch_metadata.rb |
Added deserialization field to store Future/Immediate objects |
lib/karafka/messages/builders/batch_metadata.rb |
Initialize deserialization field with Immediate instance by default |
config/locales/errors.yml |
Updated error messages from deserializers to deserializing, added parallel config validation errors |
spec/**/* |
Updated all spec files to use new Deserializing namespace and added comprehensive tests for parallel components |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
No description provided.