#aws #kinesis #async #streaming #stream-processing #graceful-shutdown

go-zoom-kinesis

A robust AWS Kinesis stream processor with checkpointing and retry capabilities

10 releases (6 breaking)

new 0.15.0 Dec 11, 2024
0.14.0 Nov 14, 2024
0.13.0 Nov 1, 2024
0.12.0 Oct 31, 2024
0.9.3 Oct 30, 2024

#639 in Web programming

Download history 129/week @ 2024-10-23 583/week @ 2024-10-30 12/week @ 2024-11-06 116/week @ 2024-11-13 9/week @ 2024-11-20

139 downloads per month

MIT license

285KB
6.5K SLoC

go-zoom-kinesis 🐊

CI codecov Crates.io Documentation License: MIT

A robust, production-ready AWS Kinesis stream processor with checkpointing and retry capabilities. Built with reliability and performance in mind.

Features 🚀

  • ✨ Automatic checkpointing with multiple storage backends
  • 🔄 Configurable retry logic with exponential backoff
  • 🛼️ Comprehensive error handling
  • 😊 Multiple shard processing
  • 🕥 DynamoDB checkpoint storage support
  • 📘 Detailed tracing and monitoring
  • 📦 Graceful shutdown handling
  • 🢪 Production-ready with extensive test coverage
  • 🎧 Configurable stream position initialization
  • 🔄 Smart checkpoint recovery with fallback options

Basic Usage 📓

use go_zoom_kinesis::{
    KinesisProcessor, ProcessorConfig, RecordProcessor,
    processor::RecordMetadata, processor::InitialPosition,
    store::InMemoryCheckpointStore,
    monitoring::MonitoringConfig,
    error::{ProcessorError, ProcessingError},
};
use aws_sdk_kinesis::{Client, types::Record};
use std::time::Duration;
use async_trait::async_trait;

#[derive(Clone)]
struct MyProcessor;

#[async_trait]
impl RecordProcessor for MyProcessor {
    type Item = ();

    async fn process_record<'a>(
        &self,
        record: &'a Record,
        metadata: RecordMetadata<'a>,
    ) -> Result<Option<Self::Item>, ProcessingError> {
        println!("Processing record: {:?}", record);
        Ok(None)
    }
}

#[tokio::main]
async fn main() -> Result<(), ProcessorError> {
    let config = aws_config::load_defaults(aws_config::BehaviorVersion::latest()).await;
    let client = Client::new(&config);

    let config = ProcessorConfig {
        stream_name: "my-stream".to_string(),
        batch_size: 100,
        api_timeout: Duration::from_secs(30),
        processing_timeout: Duration::from_secs(300),
        max_retries: Some(3),
        shard_refresh_interval: Duration::from_secs(60),
        initial_position: InitialPosition::TrimHorizon,
        prefer_stored_checkpoint: true,
        monitoring: MonitoringConfig {
            enabled: true,
            ..Default::default()
        },
        ..Default::default()
    };

    let processor = MyProcessor;
    let store = InMemoryCheckpointStore::new();

    let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
    let (processor, _monitoring_rx) = KinesisProcessor::new(
        config,
        processor,
        client,
        store,
    );

    processor.run(shutdown_rx).await
}

Contributing 😪

Contributions are welcome! Please feel free to submit a Pull Request.

License 📒

This project is licensed under the MIT License - see the LICENSE file for details.

Support 🔠

If you have any questions or run into issues, please open an issue on GitHub.

Dependencies

~26–36MB
~493K SLoC