Amazon MSK

Use the Kafka Connector to ingest data streams from Amazon Managed Streaming for Apache Kafka (MSK) into Tinybird so that you can quickly turn them into high-concurrency, low-latency REST APIs.

The Kafka Connector is fully managed and requires no additional tooling. Connect Tinybird to your Kafka cluster, select a topic, and Tinybird automatically begins consuming messages from Kafka. You can transform or enrich your Kafka topics with JOINs using serverless Data Pipes. Auth tokens control access to API endpoints.

Secure connections through AWS PrivateLink or Multi-VPC for MSK are available for Enterprise customers on a Dedicated infrastructure plan. Reach out to [email protected] for more information.

Prerequisites

Grant READ permissions to both the Topic and the Consumer Group to ingest data from Kafka into Tinybird.

You must secure your Kafka brokers with SSL/TLS and SASL. Tinybird uses SASL_SSL as the security protocol for the Kafka consumer. Connections are rejected if the brokers only support PLAINTEXT or SASL_PLAINTEXT.

Kafka Schema Registry is supported only for decoding Avro messages.

Add a Kafka connection

You can create a connection to Kafka using the Tinybird CLI or the UI.

Using the CLI

Run the following commands to add a Kafka connection:

Adding a Kafka connection in the main Workspace
tb auth # Use the main Workspace admin Token
tb connection create kafka --bootstrap-servers <server> --key <key> --secret <secret> --connection-name <name> --ssl-ca-pem <ca_cert_value_or_path>

Using the UI

Follow these steps to add a new connection using the UI:

  1. Go to Data Project.
  2. Select the + icon, then select Data Source.
  3. Select Kafka.
  4. Follow the steps to configure the connection.

Add a CA certificate

You can add a CA certificate in PEM format when configuring your Kafka connection from the UI. Tinybird checks the certificate for issues before creating the connection.

To add a CA certificate using the Tinybird CLI, pass the --ssl-ca-pem <ca_cert> argument to tb connection create, where <ca_cert> is the location or value of the CA certificate.

CA certificates don't work with Kafka Sinks and Streaming Queries.

Update a Kafka connection

You can update your credentials or cluster details only from the Tinybird UI. Follow these steps:

  1. Go to Data Project, select the + icon, then select Data Source.
  2. Select Kafka and then the connection you want to edit or delete using the three-dots menu.

Any Data Source that depends on this connection is affected by updates.

Use .datasource files

You can configure the Kafka Connector using .datasource files. See the datafiles documentation.

The following is an example of Kafka .datasource file for an already existing connection:

Example data source for Kafka Connector
SCHEMA >
  `__value` String,
  `__topic` LowCardinality(String),
  `__partition` Int16,
  `__offset` Int64,
  `__timestamp` DateTime,
  `__key` String
  `__headers` Map(String,String)

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

# Connection is already available. If you
# need to create one, add the required fields
# on an include file with the details.
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id
KAFKA_STORE_HEADERS true

To add connection details in an INCLUDE file, see Use INCLUDE to store connection settings.

Columns of the Data Source

When you connect a Kafka producer to Tinybird, Tinybird consumes optional metadata columns from that Kafka record and writes them to the Data Source.

The following fields represent the raw data received from Kafka:

  • __value: A String representing the entire unparsed Kafka record inserted.
  • __topic: The Kafka topic that the message belongs to.
  • __partition: The kafka partition that the message belongs to.
  • __offset: The Kafka offset of the message.
  • __timestamp: The timestamp stored in the Kafka message received by Tinybird.
  • __key: The key of the kafka message.
  • __headers: Headers parsed from the incoming topic messages. See Using custom Kafka headers for advanced message processing.

Metadata fields are optional. Omit the fields you don't need to reduce your data storage.

Use INCLUDE to store connection settings

To avoid configuring the same connection settings across many files, or to prevent leaking sensitive information, you can store connection details in an external file and use INCLUDE to import them into one or more .datasource files.

You can find more information about INCLUDE in the Advanced Templates documentation.

For example, you might have two Kafka .datasource files that reuse the same Kafka connection. You can create an include file which stores the Kafka connection details.

The Tinybird project would use the following structure:

Tinybird data project file structure
ecommerce_data_project/
    datasources/
        connections/
          my_connector_name.incl
          ca.pem # CA certificate (optional)
        my_kafka_datasource.datasource
        another_datasource.datasource
    endpoints/
    pipes/

Where the file my_connector_name.incl has the following content:

Include file containing Kafka connection details
KAFKA_CONNECTION_NAME my_connection_name
KAFKA_BOOTSTRAP_SERVERS my_server:9092
KAFKA_KEY my_username
KAFKA_SECRET my_password
KAFKA_SSL_CA_PEM ca.pem # CA certificate (optional)

And the Kafka .datasource files look like the following:

Data Source using includes for Kafka connection details
SCHEMA >
  `__value` String,
  `__topic` LowCardinality(String),
  `__partition` Int16,
  `__offset` Int64,
  `__timestamp` DateTime,
  `__key` String

ENGINE "MergeTree"
ENGINE_PARTITION_KEY "toYYYYMM(timestamp)"
ENGINE_SORTING_KEY "timestamp"

INCLUDE "connections/my_connection_name.incl"

KAFKA_TOPIC my_topic
KAFKA_GROUP_ID my_group_id

When using tb pull to pull a Kafka Data Source using the CLI, the KAFKA_KEY, KAFKA_SECRET, KAFKA_SASL_MECHANISM and KAFKA_SSL_CA_PEM settings aren't included in the file to avoid exposing credentials.

Iterate a Kafka Data Source

The following instructions use Branches. Be sure you're familiar with the behavior of Branches in Tinybird when using the Kafka Connector - see Prerequisites.

Use Branches to test different Kafka connections and settings. See Branches.

Connections created using the UI are created in the main Workspace. so if you create a new Branch from a Workspace with existing Kafka Data Sources, the Branch Data Sources don't receive that streaming data automatically. Use the CLI to recreate the Kafka Data Source.

Update a Kafka Data Source

When you create a Branch that has existing Kafka Data Sources, the Data Sources in the Branch aren't connected to Kafka.

Therefore, if you want to update the schema, you need to recreate the Kafka Data Source in the Branch.

In branches, Tinybird automatically appends _{BRANCH} to the Kafka group ID to prevent collisions. It also forces the consumers in Branches to always consume the latest messages, to reduce the performance impact.

Add a new Kafka Data Source

To create and test a Kafka Data Source in a Branch, start by using an existing connection. You can create and use existing connections from the Branch using the UI: these connections are always created in the main Workspace.

You can create a Kafka Data Source in a Branch as in production. This Data Source doesn't have any connection details internally, so you it's useful for testing purposes.

Define the connection in the .datafile and Kafka parameters that are used in production. To move the Data Source to production, include the connection settings in the Data Source .datafile, as explained in the .datafiles documentation.

Delete a Kafka Data Source

If you've created a Data Source in a Branch, the Data Source is active until the Data Source is removed from the Branch or when the entire Branch is removed.

If you delete an existing Kafka Data Source in a Branch, it isn't deleted in the main Workspace. To delete a Kafka Data Source, do it against the main Workspace. You can also use the CLI and include it in the CI/CD workflows as necessary.

MSK logs

You can find global logs in the datasources_ops_log Service Data Source. Filter by datasource_id to select the correct datasource, and set event_type to append-kafka.

To select all Kafka releated logs in the last day, run the following query:

SELECT *
FROM tinybird.datasources_ops_log
WHERE datasource_id = 't_1234'
  AND event_type = 'append-kafka'
  AND timestamp > now() - INTERVAL 1 day
ORDER BY timestamp DESC

If you can't find logs in datasources_ops_log, the kafka_ops_log Service Data Source contains more detailed logs. Filter by datasource_id to select the correct datasource, and use msg_type to select the desired log level (info, warning, or error).

SELECT *
FROM tinybird.kafka_ops_log
WHERE datasource_id = 't_1234'
  AND timestamp > now() - interval 1 day
  AND msg_type IN ['info', 'warning', 'error']

Limits

The limits for the Kafka connector are:

  • Minimum flush time: 4 seconds
  • Throughput (uncompressed): 20MB/s
  • Up to 3 connections per Workspace

If you're regularly hitting these limits, contact [email protected] for support.

Troubleshooting

If you aren't receiving data

When Kafka commits a message for a topic and a group id, it always sends data from the latest committed offset. In Tinybird, each Kafka Data Source receives data from a topic and uses a group id. The combination of topic and group id must be unique.

If you remove a Kafka Data Source and you recreate it again with the same settings after having received data, you' only get data from the latest committed offset, even if KAFKA_AUTO_OFFSET_RESET is set to earliest.

This happens both in the main Workspace and in Branches, if you're using them, because connections are always created in the main Workspace and are shared across Branches.

Recommended next steps:

  • Use always a different group id when testing Kafka Data Sources.
  • Check in the tinybird.kafka_ops_log Service Data Source to see if you've already used a group id to ingest data from a topic.

Compressed messages

Tinybird can consume from Kafka topics where Kafka compression is enabled, as decompressing the message is a standard function of the Kafka Consumer. If you compressed the message before passing it through the Kafka Producer, Tinybird can't do post-Consumer processing to decompress the message.

For example, if you compressed a JSON message through gzip and produced it to a Kafka topic as a bytes message, it would be ingested by Tinybird as bytes. If you produced a JSON message to a Kafka topic with the Kafka Producer setting compression.type=gzip, while it would be stored in Kafka as compressed bytes, it would be decoded on ingestion and arrive to Tinybird as JSON.

Was this page helpful?
Updated