Gem used to send messages to Kafka in an easy way with an extra validation layer. It is a part of the Karafka ecosystem.
WaterDrop is based on Zendesks delivery_boy gem.
It is:
- Thread safe
- Supports sync and async producers
- Working with 0.11+ Kafka
gem install waterdrop
or add this to your Gemfile:
gem 'waterdrop'
and run
bundle install
WaterDrop is a complex tool, that contains multiple configuration options. To keep everything organized, all the configuration options were divided into two groups:
- WaterDrop options - options directly related to Karafka framework and it's components
- Ruby-Kafka driver options - options related to Ruby-Kafka/Delivery boy
To apply all those configuration options, you need to use the #setup
method:
WaterDrop.setup do |config|
config.deliver = true
config.kafka.seed_brokers = %w[kafka://localhost:9092]
end
Option | Description |
---|---|
client_id | This is how the client will identify itself to the Kafka brokers |
logger | Logger that we want to use |
deliver | Should we send messages to Kafka |
Note: We've listed here only the most important configuration options. If you're interested in all the options, please go to the config.rb file for more details.
Note: All the options are subject to validations. In order to check what is and what is not acceptable, please go to the config.rb validation schema file.
Option | Description |
---|---|
raise_on_buffer_overflow | Should we raise an exception, when messages can't be sent in an async way due to the message buffer overflow or should we just drop them |
delivery_interval | The number of seconds between background message deliveries. Disable timer-based background deliveries by setting this to 0. |
delivery_threshold | The number of buffered messages that will trigger a background message delivery. Disable buffer size based background deliveries by setting this to 0. |
required_acks | The number of Kafka replicas that must acknowledge messages before they're considered as successfully written. |
ack_timeout | A timeout executed by a broker when the client is sending messages to it. |
max_retries | The number of retries when attempting to deliver messages. |
retry_backoff | The number of seconds to wait after a failed attempt to send messages to a Kafka broker before retrying. |
max_buffer_bytesize | The maximum number of bytes allowed in the buffer before new messages are rejected. |
max_buffer_size | The maximum number of messages allowed in the buffer before new messages are rejected. |
max_queue_size | The maximum number of messages allowed in the queue before new messages are rejected. |
sasl_plain_username | The username used to authenticate. |
sasl_plain_password | The password used to authenticate. |
This configuration can be also placed in config/initializers and can vary based on the environment:
WaterDrop.setup do |config|
config.deliver = Rails.env.production?
config.kafka.seed_brokers = [Rails.env.production? ? 'kafka://prod-host:9091' : 'kafka://localhost:9092']
end
To send Kafka messages, just use one of the producers:
WaterDrop::SyncProducer.call('message', topic: 'my-topic')
# or for async
WaterDrop::AsyncProducer.call('message', topic: 'my-topic')
Both SyncProducer
and AsyncProducer
accept following options:
Option | Required | Value type | Description |
---|---|---|---|
topic |
true | String | The Kafka topic that should be written to |
key |
false | String | The key that should be set in the Kafka message |
partition |
false | Integer | A specific partition number that should be written to |
partition_key |
false | String | A string that can be used to deterministically select the partition |
create_time |
false | Time | The timestamp that should be set on the message |
headers |
false | Hash | Headers for the message |
Keep in mind, that message you want to send should be either binary or stringified (to_s, to_json, etc).
First, thank you for considering contributing to the Karafka ecosystem! It's people like you that make the open source community such a great community!
Each pull request must pass all the RSpec specs, integration tests and meet our quality requirements.
Fork it, update and wait for the Github Actions results.