Load data with Delta Live Tables
You can load data from any data source supported by Apache Spark on Databricks using Delta Live Tables. You can define datasets (tables and views) in Delta Live Tables against any query that returns a Spark DataFrame, including streaming DataFrames and Pandas for Spark DataFrames. For data ingestion tasks, Databricks recommends using streaming tables for most use cases. Streaming tables are good for ingesting data from cloud object storage using Auto Loader or from message buses like Kafka. The examples below demonstrate some common patterns.
Important
Not all data sources have SQL support. You can mix SQL and Python notebooks in a Delta Live Tables pipeline to use SQL for all operations beyond ingestion.
For details on working with libraries not packaged in Delta Live Tables by default, see Manage Python dependencies for Delta Live Tables pipelines.
Load files from cloud object storage
Databricks recommends using Auto Loader with Delta Live Tables for most data ingestion tasks from cloud object storage. Auto Loader and Delta Live Tables are designed to incrementally and idempotently load ever-growing data as it arrives in cloud storage. The following examples use Auto Loader to create datasets from CSV and JSON files:
Note
To load files with Auto Loader in a Unity Catalog enabled pipeline, you must use external locations. To learn more about using Unity Catalog with Delta Live Tables, see Use Unity Catalog with your Delta Live Tables pipelines.
@dlt.table
def customers():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "csv")
.load("/databricks-datasets/retail-org/customers/")
)
@dlt.table
def sales_orders_raw():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load("/databricks-datasets/retail-org/sales_orders/")
)
CREATE OR REFRESH STREAMING TABLE customers
AS SELECT * FROM read_files("/databricks-datasets/retail-org/customers/", "csv")
CREATE OR REFRESH STREAMING TABLE sales_orders_raw
AS SELECT * FROM read_files("/databricks-datasets/retail-org/sales_orders/", "json")
See What is Auto Loader? and Auto Loader SQL syntax.
Warning
If you use Auto Loader with file notifications and run a full refresh for your pipeline or streaming table, you must manually clean up your resources. You can use the CloudFilesResourceManager in a notebook to perform cleanup.
Load data from a message bus
You can configure Delta Live Tables pipelines to ingest data from message buses with streaming tables. Databricks recommends combining streaming tables with continuous execution and enhanced autoscaling to provide the most efficient ingestion for low-latency loading from message buses. See Optimize the cluster utilization of Delta Live Tables pipelines with enhanced autoscaling.
For example, the following code configures a streaming table to ingest data from Kafka:
import dlt
@dlt.table
def kafka_raw():
return (
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "<server:ip>")
.option("subscribe", "topic1")
.option("startingOffsets", "latest")
.load()
)
You can write downstream operations in pure SQL to perform streaming transformations on this data, as in the following example:
CREATE OR REFRESH STREAMING TABLE streaming_silver_table
AS SELECT
*
FROM
STREAM(LIVE.kafka_raw)
WHERE ...
For an example of working with Event Hubs, see Use Azure Event Hubs as a Delta Live Tables data source.
Load data from external systems
Delta Live Tables supports loading data from any data source supported by Databricks. See Connect to data sources. You can also load external data using Lakehouse Federation for supported data sources. Because Lakehouse Federation requires Databricks Runtime 13.3 LTS or above, to use Lakehouse Federation your pipeline must be configured to use the preview channel.
Some data sources do not have equivalent support in SQL. If you cannot use Lakehouse Federation with one of these data sources, you can use a Python notebook to ingest data from the source. You can add Python and SQL source code to the same Delta Live Tables pipeline. The following example declares a materialized view to access the current state of data in a remote PostgreSQL table:
import dlt
@dlt.table
def postgres_raw():
return (
spark.read
.format("postgresql")
.option("dbtable", table_name)
.option("host", database_host_url)
.option("port", 5432)
.option("database", database_name)
.option("user", username)
.option("password", password)
.load()
)
Load small or static datasets from cloud object storage
You can load small or static datasets using Apache Spark load syntax. Delta Live Tables supports all of the file formats supported by Apache Spark on Databricks. For a full list, see Data format options.
The following examples demonstrate loading JSON to create Delta Live Tables tables:
@dlt.table
def clickstream_raw():
return (spark.read.format("json").load("/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json"))
CREATE OR REFRESH MATERIALIZED VIEW clickstream_raw
AS SELECT * FROM json.`/databricks-datasets/wikipedia-datasets/data-001/clickstream/raw-uncompressed-json/2015_2_clickstream.json`;
Note
The SELECT * FROM format.`path`;
SQL construct is common to all SQL environments on Databricks. It is the recommended pattern for direct file access using SQL with Delta Live Tables.
Securely access storage credentials with secrets in a pipeline
You can use Databricks secrets to store credentials such as access keys or passwords. To configure the secret in your pipeline, use a Spark property in the pipeline settings cluster configuration. See Configure compute for a Delta Live Tables pipeline.
The following example uses a secret to store an access key required to read input data from an Azure Data Lake Storage Gen2 (ADLS Gen2) storage account using Auto Loader. You can use this same method to configure any secret required by your pipeline, for example, AWS keys to access S3, or the password to an Apache Hive metastore.
To learn more about working with Azure Data Lake Storage Gen2, see Connect to Azure Data Lake Storage Gen2 and Blob Storage.
Note
You must add the spark.hadoop.
prefix to the spark_conf
configuration key that sets the secret value.
{
"id": "43246596-a63f-11ec-b909-0242ac120002",
"clusters": [
{
"spark_conf": {
"spark.hadoop.fs.azure.account.key.<storage-account-name>.dfs.core.windows.net": "{{secrets/<scope-name>/<secret-name>}}"
},
"autoscale": {
"min_workers": 1,
"max_workers": 5,
"mode": "ENHANCED"
}
}
],
"development": true,
"continuous": false,
"libraries": [
{
"notebook": {
"path": "/Users/[email protected]/DLT Notebooks/Delta Live Tables quickstart"
}
}
],
"name": "DLT quickstart using ADLS2"
}
Replace
<storage-account-name>
with the ADLS Gen2 storage account name.<scope-name>
with the Databricks secret scope name.<secret-name>
with the name of the key containing the Azure storage account access key.
import dlt
json_path = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<path-to-input-dataset>"
@dlt.create_table(
comment="Data ingested from an ADLS2 storage account."
)
def read_from_ADLS2():
return (
spark.readStream.format("cloudFiles")
.option("cloudFiles.format", "json")
.load(json_path)
)
Replace
<container-name>
with the name of the Azure storage account container that stores the input data.<storage-account-name>
with the ADLS Gen2 storage account name.<path-to-input-dataset>
with the path to the input dataset.
Load data from Azure Event Hubs
Azure Event Hubs is a data streaming service that provides an Apache Kafka compatible interface. You can use the Structured Streaming Kafka connector, included in the Delta Live Tables runtime, to load messages from Azure Event Hubs. To learn more about loading and processing messages from Azure Event Hubs, see Use Azure Event Hubs as a Delta Live Tables data source.