PostgreSQL table function
BETA
¶The Tinybird postgresql
table function is currently in public beta.
The Tinybird postgresql()
table function allows you to read data from your existing PostgreSQL database into Tinybird, then schedule a regular Copy Pipe to orchestrate synchronization. You can load full tables, and every run performs a full replace on the Data Source.
To use it, define a Node using standard SQL and the postgresql
function keyword, then publish the Node as a Copy Pipe that does a sync on every run.
Set up¶
Prerequisites¶
Your postgres database needs to be open and public (exposed to the internet, with publicly-signed certs), so you can connect it to Tinybird via the hostname and port using your username and password.
You'll also need familiarity with making cURL requests to manage your secrets.
Type support and inference¶
Here's a detailed conversion table:
PostgreSQL data type | Tinybird data type |
---|---|
BOOLEAN | UInt8 or Bool |
SMALLINT | Int16 |
INTEGER | Int32 |
BIGINT | Int64 |
REAL | Float32 |
DOUBLE PRECISION | Float64 |
NUMERIC or DECIMAL | Decimal(p, s) |
CHAR(n) | FixedString(n) |
VARCHAR (n) | String |
TEXT | String |
BYTEA | String |
TIMESTAMP | DateTime |
TIMESTAMP WITH TIME ZONE | DateTime (with appropriate timezone handling) |
DATE | Date |
TIME | String (since there is no direct TIME type) |
TIME WITH TIME ZONE | String |
INTERVAL | String |
UUID | UUID |
ARRAY | Array(T) where T is the array element type |
JSON | String or JSON |
JSONB | String |
INET | String |
CIDR | String |
MACADDR | String |
ENUM | Enum8 or Enum16 |
GEOMETRY | String |
Notes:
- Tinybird does not support all PostgreSQL types directly, so some types are mapped to String in Tinybird, which is the most flexible type for arbitrary data.
- For the NUMERIC and DECIMAL types, Decimal(p, s) in Tinybird requires specifying precision (p) and scale (s).
- Time zone support in Tinybird's DateTime can be managed via additional functions or by ensuring consistent storage and retrieval time zones.
- Some types like INTERVAL do not have a direct equivalent in Tinybird and are usually stored as String or decomposed into separate fields.
About secrets¶
The Environment Variables API is currently only accessible at API level. UI support will be released in the near future.
Pasting your credentials into a Pipe Node or .datafile
as plain text is a security risk. Instead, use the Environment Variables API to create two new secrets for your postgres username and password. In the next step, you'll then be ready to interpolate your new secrets using the tb_secret
function:
{{tb_secret('pg_username')}} {{tb_secret('pg_password')}}
Load a PostgreSQL table¶
In the Tinybird UI, create a new Pipe Node. Call the postgresql
table function and pass the hostname & port, database, table, user, and password:
Example Node logic with actual values
SELECT * FROM postgresql( 'aws-0-eu-central-1.TODO.com:3866', 'postgres', 'orders', {{tb_secret('pg_username')}}, {{tb_secret('pg_password')}}, )
Publish this Node as a Copy Pipe, thereby running the query manually. You can choose to append only new data, or replace all data.
Alternative: Use datafiles¶
As well as using the UI, you can also define Node logic in Pipe .datafile
files. An example for an ecommerce orders_backfill
scenario, with a Node called all_orders
, would be:
NODE all_orders SQL > % SELECT * FROM postgresql( 'aws-0-eu-central-1.TODO.com:3866', 'postgres', 'orders', {{tb_secret('pg_username')}}, {{tb_secret('pg_password')}}, ) TYPE copy TARGET_DATASOURCE orders COPY_SCHEDULE @on-demand COPY_MODE replace
Include filters¶
You can use a source column in postgres and filter by a value in Tinybird, for example:
Example Copy Pipe with postgresql function and filters
SELECT * FROM postgresql( 'aws-0-eu-central-1.TODO.com:3866', 'postgres', 'orders', {{tb_secret('pg_username')}}, {{tb_secret('pg_password')}}, ) WHERE orderDate > (select max(orderDate) from orders)
Schedule runs¶
When publishing as a Copy Pipe, most users set it to run at a frequent interval using a cron expression.
It's also possible to trigger manually:
curl -H "Authorization: Bearer <PIPE:READ token>" \ -X POST "https:/tinybird.co/api/v0/pipes/<pipe_id>/run"
Having manual Pipes in your Workspace is helpful, as you can run a full sync manually any time you need it - sometimes delta updates are not 100% accurate. Some users also leverage them for weekly full syncs.
Synchronization strategies¶
When copying data from PostgreSQL to Tinybird you can use one of the following strategies:
- Use
COPY_MODE replace
to synchronize small dimensions tables, up to a few million rows, in a frequent schedule (1 to 5 minutes). - Use
COPY_MODE append
to do incremental appends. For example, you can append events data tagged with a timestamp. Combine it withCOPY_SCHEDULE
and filters in the Copy Pipe SQL to sync the new events.
Timeouts¶
When synchronizing dimensions tables with COPY_MODE replace
and 1 minute schedule, the copy job might timeout because it can't ingest the whole table in the defined schedule.
Timeouts depend on several factors:
- The
statement_timeout
configured in PostgreSQL. - The PostgreSQL database load.
- Network connectivity, for example when copying data from different cloud regions.
Follow these steps to avoid timeouts using incremental appends:
- Make sure your PostgreSQL dimensions rows are tagged with an updated timestamp.
Use the column to filter the copy Pipe SQL. In the following example, the column is updated_at
:
CREATE TABLE users ( created_at TIMESTAMPTZ(6) NOT NULL, updated_at TIMESTAMPTZ(6) NOT NULL, name TEXT, user_id TEXT PRIMARY KEY );
- Create the target Data Source as a ReplacingMergeTree using a unique or primary key as the
ENGINE_SORTING_KEY
in the Postgres table. Rows with the sameENGINE_SORTING_KEY
are deduplicated.
SCHEMA > `created_at` DateTime64(6), `updated_at` DateTime64(6), `name` String, `user_id` String ENGINE "ReplacingMergeTree" ENGINE_SORTING_KEY "user_id"
- Configure the Copy Pipe with an incremental append strategy and 1 minute schedule. That way you make sure only new records in the last minute are ingested, thus optimizing the copy job duration.
NODE copy_pg_users_rmt_0 SQL > % SELECT * FROM postgresql( 'aws-0-eu-central-1.TODO.com:6543', 'postgres', 'users', {{ tb_secret('pg_username') }}, {{ tb_secret('pg_password') }} ) WHERE updated_at > (SELECT max(updated_at) FROM pg_users_rmt)::String TYPE copy TARGET_DATASOURCE pg_users_rmt COPY_MODE append COPY_SCHEDULE * * * * *
Optionally, you can create an index in the PostgreSQL table to speed up filtering:
-- Create an index on updated_at for faster queries CREATE INDEX idx_updated_at ON users (updated_at);
- A Data Source with
ReplacingMergeTree
engine deduplicates records based on the sorting key in batch mode. As you can't ensure when deduplication is going to happen, use theFINAL
keyword when querying the Data Source to force deduplication at query time.
SELECT * FROM pg_users FINAL
- You can combine this approach with an hourly or daily replacement to get rid of deleted rows. Learn about how to handle deleted rows when using
ReplacingMergeTree
.
Learn more about how to migrate from Postgres to Tinybird.
Observability¶
Job executions are logged in the datasources_ops_log
Service Data Source. This log can be checked directly in the Data Source view page in the UI. Filter by datasource_id
to monitor ingestion through the PostgreSQL table function from the datasources_ops_log
:
Example query to the datasources_ops_log Service Data Source
SELECT timestamp, event_type, result, error, job_id FROM tinybird.datasources_ops_log WHERE datasource_id = 't_1234' AND event_type = 'copy' ORDER BY timestamp DESC
Limits¶
The table function inherits all the limits of Copy Pipes.
Secrets are created at a Workspace level, so you will be able to connect one PostgreSQL database per Tinybird Workspace.
Check the limits page for limits on ingestion, queries, API Endpoints, and more.
Billing¶
When set up, this functionality is a Copy Pipe with a query (processed data). There are no additional or specific costs for the table function itself. See the billing docs for more information on data operations and how they're charged.