Kafka connector¶
You can set up a Kafka connector to stream data from a Kafka topic into Tinybird. For that, you will need to create a connection file, and configure a data source file to use that connection.
Compatibility¶
The connector is compatible with Apache Kafka, and will therefore work with any compatible implementations and vendors. The following are tried and tested:
- Apache Kafka
- Confluent Platform and Confluent Cloud
- Redpanda
- AWS MSK
- Azure Event Hubs
- Estuary
Set up the connector¶
To set up the Kafka connector, follow these steps.
Create a Kafka connection¶
Create a .connection file with the required credentials stored in secrets. For example:
kafka_sample.connection
TYPE kafka KAFKA_BOOTSTRAP_SERVERS {{ tb_secret("KAFKA_SERVERS") }} KAFKA_SECURITY_PROTOCOL SASL_SSL KAFKA_SASL_MECHANISM PLAIN KAFKA_KEY {{ tb_secret("KAFKA_USERNAME") }} KAFKA_SECRET {{ tb_secret("KAFKA_PASSWORD") }}
Set the values of the secrets like this: tb [--cloud] secret set KAFKA_SERVERS kafka:9092
Secrets are only replaced in your resources when you deploy. If you change a secret, you need to deploy for the changes to take effect.
See tb secret to learn more bout how to manage secrets, and Connection files for more details on how to create a connection file.
For a complete list of Kafka connection settings, see Settings.
Create a Kafka data source¶
Create a .datasource file using tb create --prompt
or manually.
Define the data source schema as with any non-Kafka datasource, and specify the required Kafka settings. Note that the value of KAFKA_CONNECTION_NAME
must match the name of the .connection file you created in the previous step.
For example:
kafka_sample.datasource
SCHEMA > `timestamp` DateTime(3) `json:$.timestamp`, `session_id` String `json:$.session_id`, `action` LowCardinality(String) `json:$.action`, `version` LowCardinality(String) `json:$.version`, `payload` String `json:$.payload` ENGINE "MergeTree" ENGINE_PARTITION_KEY "toYYYYMM(timestamp)" ENGINE_SORTING_KEY "timestamp" KAFKA_CONNECTION_NAME kafka_sample # The name of the .connection file KAFKA_TOPIC test_topic KAFKA_GROUP_ID {{ tb_secret("KAFKA_GROUP_ID") }}
Besides the columns specified explicitly in SCHEMA
, Kafka data sources have additional columns that store metadata of the messages ingested. See Kafka meta columns for more information.
For a complete list of Kafka data source settings, see Settings.
Use different consumer groups for different environments, so consumers and their committed offset are isolated.
Connectivity check¶
After you've defined your Kafka data source and connection, you can validate the setup running a deploy check:
tb --cloud deploy --check
This will check that the kafka broker is reachable and that Tinybird can connect to it with the provided credentials. Remember to set any secrets used by the connection.
.datasource settings¶
Instruction | Required | Description |
---|---|---|
KAFKA_CONNECTION_NAME | Yes | The name of the configured Kafka connection in Tinybird. It must match the name of the connection file, without the extension. |
KAFKA_TOPIC | Yes | Name of the Kafka topic to consume from. |
KAFKA_GROUP_ID | Yes | Consumer Group ID to use when consuming from Kafka. |
KAFKA_AUTO_OFFSET_RESET | No | Offset to use when no previous offset can be found, for example when creating a new consumer. Supported values are latest , earliest . Default: latest . |
KAFKA_STORE_HEADERS | No | Adds a __headers Map(String, String) column to the data source, and stores Kafka headers in it for later processing. Default value is 'False' . |
KAFKA_STORE_RAW_VALUE | No | Stores the raw message in its entirety in the __value column. Default: False . |
KAFKA_KEY_FORMAT | No | Format of the message value. Valid values are avro , json_with_schema , and json_without_schema . Using avro or json_with_schema requires KAFKA_SCHEMA_REGISTRY_URL to be set in the connection file used by the data source. |
KAFKA_VALUE_FORMAT | No | Format of the message value. Valid values are avro , json_with_schema , and json_without_schema . Using avro or json_with_schema requires KAFKA_SCHEMA_REGISTRY_URL to be set in the connection file used by the data source. |
.connection settings¶
Instruction | Required | Description |
---|---|---|
KAFKA_BOOTSTRAP_SERVERS | Yes | Comma-separated list of one or more Kafka brokers, including Port numbers. |
KAFKA_KEY | Yes | Key used to authenticate with Kafka. Sometimes called Key, Client Key, or Username, depending on the Kafka distribution. |
KAFKA_SECRET | Yes | Secret used to authenticate with Kafka. Sometimes called Secret, Secret Key, or Password, depending on the Kafka distribution. |
KAFKA_SECURITY_PROTOCOL | No | Security protocol for the connection. Accepted values are PLAINTEXT and SASL_SSL . Default value is SASL_SSL . |
KAFKA_SASL_MECHANISM | No | SASL mechanism to use for authentication. Supported values are 'PLAIN' , 'SCRAM-SHA-256' , 'SCRAM-SHA-512' . Default value is 'PLAIN' . |
KAFKA_SCHEMA_REGISTRY_URL | No | URL of the Kafka schema registry. Used for avro and json_with_schema deserialization of keys and values. If Basic Auth is required, it must be included in the URL as in https://user:password@registry_url |
Kafka connector in the local environment¶
When using the Kafka connector in the local environment, which runs in a Docker container, you can consume messages from a local Kafka server or a Kafka server in the cloud.
Local Kafka server¶
When using a local Kafka server, make sure that the Tinybird Local container can access your local Kafka server. If you are running Kafka using Docker, you can use the following command to connect your local Kafka server to the Tinybird Local container:
docker network connect local-kafka-network tinybird-local
After this is done, create the required secrets in your local environment. For example:
tb --local secret set KAFKA_KEY "kafka-local-username"
You can also use the tb_secret()
default value. For example:
kafkasample.connection
KAFKA_KEY {{ tb_secret("KAFKA_KEY", "kafka-local-username") }}
Kafka server in the cloud¶
When using a Kafka server in the cloud that's visible on your network, you can create the required secrets in your local environment. For example:
tb --local secret set KAFKA_KEY "kafka-cloud-username"
Don't use the tb_secret()
default value in this case, as it might expose credentials in your code.
Kafka meta columns¶
When you connect a data source to Kafka, the following columns are added to store metadata from Kafka messages.
name | type | description |
---|---|---|
__value | String | A String representing the entire unparsed value of the Kafka message. It is only populated if KAFKA_STORE_RAW_VALUE is set to True |
__topic | LowCardinality(String) | The Kafka topic that the message was read from. |
__partition | Int16 | The kafka partition that the message was read from. |
__offset | Int16 | The Kafka offset of the message. |
__timestamp | Datetime | The Kafka timestamp of the message. |
__key | String | The key of the Kafka message. |
Optionally, when KAFKA_STORE_HEADERS
is set to True
, the following column is added and populated:
name | type | description |
---|---|---|
__headers | Map(String, String) | Kafka headers of the message. |
When you iterate your Kafka data source, you might need to use the meta columns in the FORWARD_QUERY. A valid forward query will be suggested to you when you need one, which you can then tweak to get the desired values for each column.
Kafka logs¶
You can find global logs in the datasources_ops_log
Service Data Source. Filter by datasource_id
to select the correct datasource, and by event_type='append-kafka'
.
For example, to select all Kafka releated logs in the last day, run the following query:
SELECT * FROM tinybird.datasources_ops_log WHERE datasource_id = 't_1234' AND event_type = 'append-kafka' AND timestamp > now() - INTERVAL 1 day ORDER BY timestamp DESC
If you can't find logs in datasources_ops_log
, the kafka_ops_log
Service Data Source contains more detailed logs. Filter by datasource_id
to select the correct datasource, and use msg_type
to select the desired log level (info
, warning
, or error
).
SELECT * FROM tinybird.kafka_ops_log WHERE datasource_id = 't_1234' AND timestamp > now() - interval 1 day AND msg_type IN ['info', 'warning', 'error']
Troubleshooting¶
Each combination of KAFKA_TOPIC
and KAFKA_GROUP_ID
can only be used in one data source, otherwise the offsets commited by the consumers of the different data sources will clash.
If you connect a data source to Kafka using a KAFKA_TOPIC
and KAFKA_GROUP_ID
that were previously used by another data source in your workspace, the data source will only receive data from the last committed offset, even if KAFKA_AUTO_OFFSET_RESET
is set to earliest
.
To prevent these issues, we recommend always using different KAFKA_GROUP_ID
s when testing Kafka data sources.
See Kafka logs to learn how to diagnose any other issues
Compressed messages¶
Tinybird can consume from Kafka topics where Kafka compression is enabled, as decompressing the message is a standard function of the Kafka consumer. If you compressed the message before passing it through the Kafka producer, Tinybird can't do post-consumer processing to decompress the message.
For example, if you compressed a JSON message through gzip and produced it to a Kafka topic as a bytes
message, it would be ingested by Tinybird as bytes
. If you produced a JSON message to a Kafka topic with the Kafka producer setting compression.type=gzip
, while it would be stored in Kafka as compressed bytes, it would be decoded on ingestion and arrive to Tinybird as JSON.
Connecting an existing data source to Kafka¶
You only need to add the desired Kafka settings to the data source file, and a forward query to provide default values for the Kafka meta columns
Disconnecting a data source from Kafka¶
You only need to remove the Kafka settings from the data source file. If you want to keep any of the Kafka meta columns, you will need to add them to the schema, most likely with a default value, and adjust the FORWARD_QUERY
accordingly.